fix a deadlock with threaded queries

This commit is contained in:
Mike Dirolf 2009-08-10 16:10:52 -04:00
parent 691e65f684
commit 1e4728a77f
4 changed files with 57 additions and 13 deletions

View File

@ -194,8 +194,10 @@ module XGen
def refill_via_get_more
send_query_if_needed
return if @cursor_id == 0
@db.send_to_db(GetMoreMessage.new(@admin ? 'admin' : @db.name, @collection.name, @cursor_id))
read_all
@db._synchronize {
@db.send_to_db(GetMoreMessage.new(@admin ? 'admin' : @db.name, @collection.name, @cursor_id))
read_all
}
end
def object_from_stream
@ -212,9 +214,11 @@ module XGen
def send_query_if_needed
# Run query first time we request an object from the wire
unless @query_run
@db.send_query_message(QueryMessage.new(@admin ? 'admin' : @db.name, @collection.name, @query))
@query_run = true
read_all
@db._synchronize {
@db.send_query_message(QueryMessage.new(@admin ? 'admin' : @db.name, @collection.name, @query))
@query_run = true
read_all
}
end
end

View File

@ -345,15 +345,13 @@ module XGen
# Used by a Cursor to lazily send the query to the database.
def send_query_message(query_message)
@semaphore.synchronize {
send_to_db(query_message)
}
send_to_db(query_message)
end
# Remove the records that match +selector+ from +collection_name+.
# Normally called by Collection#remove or Collection#clear.
def remove_from_db(collection_name, selector)
@semaphore.synchronize {
_synchronize {
send_to_db(RemoveMessage.new(@name, collection_name, selector))
}
end
@ -361,7 +359,7 @@ module XGen
# Update records in +collection_name+ that match +selector+ by
# applying +obj+ as an update. Normally called by Collection#replace.
def replace_in_db(collection_name, selector, obj)
@semaphore.synchronize {
_synchronize {
send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, false))
}
end
@ -373,7 +371,7 @@ module XGen
# applying +obj+ as an update. If no match, inserts (???). Normally
# called by Collection#repsert.
def repsert_in_db(collection_name, selector, obj)
@semaphore.synchronize {
_synchronize {
obj = @pk_factory.create_pk(obj) if @pk_factory
send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, true))
obj
@ -469,7 +467,7 @@ module XGen
:key => field_h,
:unique => unique
}
@semaphore.synchronize {
_synchronize {
send_to_db(InsertMessage.new(@name, SYSTEM_INDEX_COLLECTION, false, sel))
}
name
@ -479,7 +477,7 @@ module XGen
# Collection#insert. Returns a new array containing +objects+,
# possibly modified by @pk_factory.
def insert_into_db(collection_name, objects)
@semaphore.synchronize {
_synchronize {
if @pk_factory
objects.collect! { |o|
@pk_factory.create_pk(o)
@ -532,6 +530,10 @@ module XGen
query(Collection.new(self, SYSTEM_COMMAND_COLLECTION), q, use_admin_db).next_object
end
def _synchronize &block
@semaphore.synchronize &block
end
private
def hash_password(username, plaintext)

View File

@ -75,6 +75,7 @@ TEST_FILES = ['tests/mongo-qa/_common.rb',
'tests/test_mongo.rb',
'tests/test_objectid.rb',
'tests/test_ordered_hash.rb',
'tests/test_threading.rb',
'tests/test_round_trip.rb']
Gem::Specification.new do |s|

37
tests/test_threading.rb Normal file
View File

@ -0,0 +1,37 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
require 'test/unit'
class TestThreading < Test::Unit::TestCase
include XGen::Mongo::Driver
@@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
@@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT
@@db = Mongo.new(@@host, @@port).db('ruby-mongo-test')
@@coll = @@db.collection('thread-test-collection')
def test_threading
@@coll.clear
1000.times do |i|
@@coll.insert("x" => i)
end
threads = []
10.times do |i|
threads[i] = Thread.new{
sum = 0
@@coll.find().each { |document|
sum += document["x"]
}
assert_equal 499500, sum
}
end
10.times do |i|
threads[i].join
end
end
end