From 1e4728a77f32c4c339417729ea14377aa9425e73 Mon Sep 17 00:00:00 2001 From: Mike Dirolf Date: Mon, 10 Aug 2009 16:10:52 -0400 Subject: [PATCH] fix a deadlock with threaded queries --- lib/mongo/cursor.rb | 14 +++++++++----- lib/mongo/db.rb | 18 ++++++++++-------- mongo-ruby-driver.gemspec | 1 + tests/test_threading.rb | 37 +++++++++++++++++++++++++++++++++++++ 4 files changed, 57 insertions(+), 13 deletions(-) create mode 100644 tests/test_threading.rb diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index e6dc1d3..be6e110 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -194,8 +194,10 @@ module XGen def refill_via_get_more send_query_if_needed return if @cursor_id == 0 - @db.send_to_db(GetMoreMessage.new(@admin ? 'admin' : @db.name, @collection.name, @cursor_id)) - read_all + @db._synchronize { + @db.send_to_db(GetMoreMessage.new(@admin ? 'admin' : @db.name, @collection.name, @cursor_id)) + read_all + } end def object_from_stream @@ -212,9 +214,11 @@ module XGen def send_query_if_needed # Run query first time we request an object from the wire unless @query_run - @db.send_query_message(QueryMessage.new(@admin ? 'admin' : @db.name, @collection.name, @query)) - @query_run = true - read_all + @db._synchronize { + @db.send_query_message(QueryMessage.new(@admin ? 'admin' : @db.name, @collection.name, @query)) + @query_run = true + read_all + } end end diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index 6931424..1021217 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -345,15 +345,13 @@ module XGen # Used by a Cursor to lazily send the query to the database. def send_query_message(query_message) - @semaphore.synchronize { - send_to_db(query_message) - } + send_to_db(query_message) end # Remove the records that match +selector+ from +collection_name+. # Normally called by Collection#remove or Collection#clear. def remove_from_db(collection_name, selector) - @semaphore.synchronize { + _synchronize { send_to_db(RemoveMessage.new(@name, collection_name, selector)) } end @@ -361,7 +359,7 @@ module XGen # Update records in +collection_name+ that match +selector+ by # applying +obj+ as an update. Normally called by Collection#replace. def replace_in_db(collection_name, selector, obj) - @semaphore.synchronize { + _synchronize { send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, false)) } end @@ -373,7 +371,7 @@ module XGen # applying +obj+ as an update. If no match, inserts (???). Normally # called by Collection#repsert. def repsert_in_db(collection_name, selector, obj) - @semaphore.synchronize { + _synchronize { obj = @pk_factory.create_pk(obj) if @pk_factory send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, true)) obj @@ -469,7 +467,7 @@ module XGen :key => field_h, :unique => unique } - @semaphore.synchronize { + _synchronize { send_to_db(InsertMessage.new(@name, SYSTEM_INDEX_COLLECTION, false, sel)) } name @@ -479,7 +477,7 @@ module XGen # Collection#insert. Returns a new array containing +objects+, # possibly modified by @pk_factory. def insert_into_db(collection_name, objects) - @semaphore.synchronize { + _synchronize { if @pk_factory objects.collect! { |o| @pk_factory.create_pk(o) @@ -532,6 +530,10 @@ module XGen query(Collection.new(self, SYSTEM_COMMAND_COLLECTION), q, use_admin_db).next_object end + def _synchronize &block + @semaphore.synchronize &block + end + private def hash_password(username, plaintext) diff --git a/mongo-ruby-driver.gemspec b/mongo-ruby-driver.gemspec index a1b07ff..9ceda49 100644 --- a/mongo-ruby-driver.gemspec +++ b/mongo-ruby-driver.gemspec @@ -75,6 +75,7 @@ TEST_FILES = ['tests/mongo-qa/_common.rb', 'tests/test_mongo.rb', 'tests/test_objectid.rb', 'tests/test_ordered_hash.rb', + 'tests/test_threading.rb', 'tests/test_round_trip.rb'] Gem::Specification.new do |s| diff --git a/tests/test_threading.rb b/tests/test_threading.rb new file mode 100644 index 0000000..7679a41 --- /dev/null +++ b/tests/test_threading.rb @@ -0,0 +1,37 @@ +$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') +require 'mongo' +require 'test/unit' + +class TestThreading < Test::Unit::TestCase + + include XGen::Mongo::Driver + + @@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost' + @@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT + @@db = Mongo.new(@@host, @@port).db('ruby-mongo-test') + @@coll = @@db.collection('thread-test-collection') + + def test_threading + @@coll.clear + + 1000.times do |i| + @@coll.insert("x" => i) + end + + threads = [] + + 10.times do |i| + threads[i] = Thread.new{ + sum = 0 + @@coll.find().each { |document| + sum += document["x"] + } + assert_equal 499500, sum + } + end + + 10.times do |i| + threads[i].join + end + end +end