diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index ba1279f..1287daa 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -15,6 +15,7 @@ # ++ require 'socket' +require 'mutex_m' require 'mongo/mongo' require 'mongo/collection' require 'mongo/message' @@ -61,6 +62,8 @@ module XGen @name, @host, @port = db_name, host, port @socket = TCPSocket.new(@host, @port) @strict = false + @semaphore = Object.new + @semaphore.extend Mutex_m end # Returns an array of collection names. Each name is of the form @@ -161,23 +164,26 @@ module XGen # Send a Query to +collection_name+ and return a Cursor over the # results. def query(collection_name, query) - # TODO synchronize - send_to_db(QueryMessage.new(@name, collection_name, query)) - return Cursor.new(self, collection_name, query.number_to_return) + @semaphore.synchronize { + send_to_db(QueryMessage.new(@name, collection_name, query)) + Cursor.new(self, collection_name, query.number_to_return) + } end # Remove the records that match +selector+ from +collection_name+. # Normally called by Collection#remove or Collection#clear. def remove_from_db(collection_name, selector) - # TODO synchronize - send_to_db(RemoveMessage.new(@name, collection_name, selector)) + @semaphore.synchronize { + send_to_db(RemoveMessage.new(@name, collection_name, selector)) + } end # Update records in +collection_name+ that match +selector+ by # applying +obj+ as an update. Normally called by Collection#replace. def replace_in_db(collection_name, selector, obj) - # TODO synchronize - send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, false)) + @semaphore.synchronize { + send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, false)) + } end # Alias for #replace_in_db. Normally called by Collection.modify. @@ -188,9 +194,10 @@ module XGen # called by Collection#repsert. def repsert_in_db(collection_name, selector, obj) # TODO if PKInjector, inject - # TODO synchronize - send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, true)) - obj + @semaphore.synchronize { + send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, true)) + obj + } end # Return the number of records in +collection_name+ that match @@ -226,7 +233,6 @@ module XGen # :ns :: Namespace; same as +collection_name+. def index_information(collection_name) sel = {:ns => full_coll_name(collection_name)} - # TODO synchronize query(SYSTEM_INDEX_COLLECTION, Query.new(sel)).collect { |row| h = {:name => row['name']} raise "Name of index on return from db was nil. Coll = #{full_coll_name(collection_name)}" unless h[:name] @@ -251,15 +257,17 @@ module XGen field_h = {} fields.each { |f| field_h[f] = 1 } sel[:key] = field_h - # TODO synchronize - send_to_db(InsertMessage.new(@name, SYSTEM_INDEX_COLLECTION, sel)) + @semaphore.synchronize { + send_to_db(InsertMessage.new(@name, SYSTEM_INDEX_COLLECTION, sel)) + } end # Insert +objects+ into +collection_name+. Normally called by # Collection#insert. def insert_into_db(collection_name, objects) - # TODO synchronize - objects.each { |o| send_to_db(InsertMessage.new(@name, collection_name, o)) } + @semaphore.synchronize { + objects.each { |o| send_to_db(InsertMessage.new(@name, collection_name, o)) } + } end def send_to_db(message) @@ -288,7 +296,6 @@ module XGen end end - # TODO synchronize q = Query.new(selector) q.number_to_return = 1 query(SYSTEM_COMMAND_COLLECTION, q).next_object