diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index 45917ea..b1c01bc 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -219,7 +219,7 @@ module Mongo BSON.serialize_cstr(message, "#{@db.name}.#{@name}") message.put_int(0) message.put_array(BSON_SERIALIZER.serialize(selector, false).unpack("C*")) - @connection.send_message_with_operation(Mongo::Constants::OP_DELETE, message, + @connection.send_message(Mongo::Constants::OP_DELETE, message, "db.#{@db.name}.remove(#{selector.inspect})") end @@ -254,7 +254,7 @@ module Mongo @connection.send_message_with_safe_check(Mongo::Constants::OP_UPDATE, message, @db.name, "db.#{@name}.update(#{selector.inspect}, #{document.inspect})") else - @connection.send_message_with_operation(Mongo::Constants::OP_UPDATE, message, + @connection.send_message(Mongo::Constants::OP_UPDATE, message, "db.#{@name}.update(#{selector.inspect}, #{document.inspect})") end end @@ -481,7 +481,7 @@ EOS @connection.send_message_with_safe_check(Mongo::Constants::OP_INSERT, message, @db.name, "db.#{collection_name}.insert(#{documents.inspect})") else - @connection.send_message_with_operation(Mongo::Constants::OP_INSERT, message, + @connection.send_message(Mongo::Constants::OP_INSERT, message, "db.#{collection_name}.insert(#{documents.inspect})") end documents.collect { |o| o[:_id] || o['_id'] } diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 3cfd02f..6b2d4b5 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -43,7 +43,7 @@ module Mongo # # In all cases, the default host is "localhost" and the default port, is 27017. # - # When specifying, pair_or_host, is a hash with two keys: :left and :right. Each key maps to either + # When specifying a pair, pair_or_host, is a hash with two keys: :left and :right. Each key maps to either # * a server name, in which case port is 27017, # * a port number, in which case the server is "localhost", or # * an array containing [server_name, port_number] @@ -86,9 +86,6 @@ module Mongo # Lock for request ids. @id_lock = Mutex.new - # Lock for checking master. - @master_lock = Mutex.new - # Pool size and timeout. @size = options[:pool_size] || 1 @timeout = options[:timeout] || 1.0 @@ -201,21 +198,6 @@ module Mongo ## Connections and pooling ## - # Sends a message to MongoDB. - # - # Takes a MongoDB opcode, +operation+, a message of class ByteBuffer, - # +message+, and an optional formatted +log_message+. - def send_message(operation, message, log_message=nil) - @logger.debug(" MONGODB #{log_message || message}") if @logger - - packed_message = pack_message(operation, message) - socket = checkout - send_message_on_socket(packed_message, socket) - checkin(socket) - end - - # Sends a message to MongoDB and returns the response. - # # Takes a MongoDB opcode, +operation+, a message of class ByteBuffer, # +message+, an optional formatted +log_message+, and an optional # socket. @@ -240,7 +222,7 @@ module Mongo # Takes a MongoDB opcode, +operation+, a message of class ByteBuffer, # +message+, and an optional formatted +log_message+. # Sends the message to the databse, adding the necessary headers. - def send_message_with_operation(operation, message, log_message=nil) + def send_message(operation, message, log_message=nil) @logger.debug(" MONGODB #{log_message || message}") if @logger packed_message = pack_message(operation, message) socket = checkout @@ -250,6 +232,10 @@ module Mongo # Sends a message to the database, waits for a response, and raises # and exception if the operation has failed. + # + # Takes a MongoDB opcode, +operation+, a message of class ByteBuffer, + # +message+, the +db_name+, and an optional formatted +log_message+. + # Sends the message to the databse, adding the necessary headers. def send_message_with_safe_check(operation, message, db_name, log_message=nil) message_with_headers = add_message_headers(operation, message) message_with_check = last_error_message(db_name) @@ -265,8 +251,12 @@ module Mongo [docs, num_received, cursor_id] end - # Send a message to the database and waits for the response. - def receive_message_with_operation(operation, message, log_message=nil, socket=nil) + # Sends a message to the database and waits for the response. + # + # Takes a MongoDB opcode, +operation+, a message of class ByteBuffer, + # +message+, and an optional formatted +log_message+. This method + # also takes an options socket for internal use with #connect_to_master. + def receive_message(operation, message, log_message=nil, socket=nil) message_with_headers = add_message_headers(operation, message).to_s @logger.debug(" MONGODB #{log_message || message}") if @logger sock = socket || checkout @@ -326,6 +316,8 @@ module Mongo @reserved_connections.clear end + private + # Get a socket from the pool, mapped to the current thread. def checkout #return @socket ||= checkout_new_socket if @size == 1 @@ -560,5 +552,6 @@ module Mongo a end end + end end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 7140636..af75a34 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -201,7 +201,7 @@ module Mongo message.put_int(0) message.put_int(1) message.put_long(@cursor_id) - @connection.send_message_with_operation(Mongo::Constants::OP_KILL_CURSORS, message, "cursor.close()") + @connection.send_message(Mongo::Constants::OP_KILL_CURSORS, message, "cursor.close()") end @cursor_id = 0 @closed = true @@ -296,7 +296,7 @@ module Mongo # Cursor id. message.put_long(@cursor_id) - results, @n_received, @cursor_id = @connection.receive_message_with_operation(Mongo::Constants::OP_GET_MORE, message, "cursor.get_more()", @socket) + results, @n_received, @cursor_id = @connection.receive_message(Mongo::Constants::OP_GET_MORE, message, "cursor.get_more()", @socket) @cache += results close_cursor_if_query_complete end @@ -307,7 +307,7 @@ module Mongo false else message = construct_query_message - results, @n_received, @cursor_id = @connection.receive_message_with_operation(Mongo::Constants::OP_QUERY, message, + results, @n_received, @cursor_id = @connection.receive_message(Mongo::Constants::OP_QUERY, message, (query_log_message if @connection.logger), @socket) @cache += results @query_run = true diff --git a/test/unit/collection_test.rb b/test/unit/collection_test.rb index 0289d39..6ead170 100644 --- a/test/unit/collection_test.rb +++ b/test/unit/collection_test.rb @@ -11,7 +11,7 @@ class ConnectionTest < Test::Unit::TestCase @conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false) @db = @conn['testing'] @coll = @db.collection('books') - @conn.expects(:send_message_with_operation).with do |op, msg, log| + @conn.expects(:send_message).with do |op, msg, log| op == 2001 && log.include?("db.books.update") end @coll.update({}, {:title => 'Moby Dick'}) @@ -21,7 +21,7 @@ class ConnectionTest < Test::Unit::TestCase @conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false) @db = @conn['testing'] @coll = @db.collection('books') - @conn.expects(:send_message_with_operation).with do |op, msg, log| + @conn.expects(:send_message).with do |op, msg, log| op == 2002 && log.include?("db.books.insert") end @coll.insert({:title => 'Moby Dick'}) diff --git a/test/unit/connection_test.rb b/test/unit/connection_test.rb index 4177ba3..a4a7239 100644 --- a/test/unit/connection_test.rb +++ b/test/unit/connection_test.rb @@ -12,6 +12,11 @@ class ConnectionTest < Test::Unit::TestCase db = Object.new end + # Make a few methods public for these tests. + class Mongo::Connection + public :checkin, :checkout, :clear_stale_cached_connections! + end + context "Initialization: " do context "given a single node" do