From 39b9656fc4a802a7dc73d947dd4092caa5bb831c Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Wed, 15 Dec 2010 12:12:51 -0500 Subject: [PATCH] Fixed request id checking for threaded inserts. --- lib/mongo/connection.rb | 35 ++++++++++++--------- lib/mongo/util/pool.rb | 7 ----- test/threading/test_threading_large_pool.rb | 2 +- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 4cdd6bc..a31864c 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -357,7 +357,8 @@ module Mongo # @return [Integer] number of bytes sent def send_message(operation, message, log_message=nil) begin - packed_message = add_message_headers(operation, message).to_s + add_message_headers(message, operation) + packed_message = message.to_s socket = checkout_writer send_message_on_socket(packed_message, socket) ensure @@ -378,16 +379,19 @@ module Mongo # # @return [Hash] The document returned by the call to getlasterror. def send_message_with_safe_check(operation, message, db_name, log_message=nil, last_error_params=false) - request_id = get_request_id - message_with_headers = add_message_headers(operation, message, request_id) - message_with_check = last_error_message(db_name, last_error_params) + docs = num_received = cursor_id = '' + add_message_headers(message, operation) + + last_error_message = BSON::ByteBuffer.new + build_last_error_message(last_error_message, db_name, last_error_params) + last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY) + + packed_message = message.append!(last_error_message).to_s begin sock = checkout_writer - packed_message = message_with_headers.append!(message_with_check).to_s - docs = num_received = cursor_id = '' @safe_mutexes[sock].synchronize do send_message_on_socket(packed_message, sock) - docs, num_received, cursor_id = receive(sock, request_id + 1) + docs, num_received, cursor_id = receive(sock, last_error_id) end ensure checkin_writer(sock) @@ -411,8 +415,8 @@ module Mongo # An array whose indexes include [0] documents returned, [1] number of document received, # and [3] a cursor_id. def receive_message(operation, message, log_message=nil, socket=nil, command=false) - request_id = get_request_id - packed_message = add_message_headers(operation, message, request_id).to_s + request_id = add_message_headers(message, operation) + packed_message = message.to_s begin sock = socket || (command ? checkout_writer : checkout_reader) @@ -684,8 +688,7 @@ module Mongo # Constructs a getlasterror message. This method is used exclusively by # Connection#send_message_with_safe_check. - def last_error_message(db_name, opts) - message = BSON::ByteBuffer.new + def build_last_error_message(message, db_name, opts) message.put_int(0) BSON::BSON_RUBY.serialize_cstr(message, "#{db_name}.$cmd") message.put_int(0) @@ -697,18 +700,20 @@ module Mongo cmd.merge!(opts) end message.put_binary(BSON::BSON_CODER.serialize(cmd, false).to_s) - add_message_headers(Mongo::Constants::OP_QUERY, message) + nil end # Prepares a message for transmission to MongoDB by # constructing a valid message header. - def add_message_headers(operation, message, request_id=get_request_id) + # + # @returns [Integer] the request id used in the header + def add_message_headers(message, operation) headers = [ # Message size. 16 + message.size, # Unique request id. - request_id, + request_id = get_request_id, # Response id. 0, @@ -718,6 +723,8 @@ module Mongo ].pack('VVVV') message.prepend!(headers) + + request_id end # Low-level method for sending a message on a socket. diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index c4f98d3..2b567af 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -34,13 +34,6 @@ module Mongo # Mutex for synchronizing pool access @connection_mutex = Mutex.new - # Global safe option. This is false by default. - @safe = options[:safe] || false - - # Create a mutex when a new key, in this case a socket, - # is added to the hash. - @safe_mutexes = Hash.new { |h, k| h[k] = Mutex.new } - # Condition variable for signal and wait @queue = ConditionVariable.new diff --git a/test/threading/test_threading_large_pool.rb b/test/threading/test_threading_large_pool.rb index 5df349a..9c3b6b2 100644 --- a/test/threading/test_threading_large_pool.rb +++ b/test/threading/test_threading_large_pool.rb @@ -6,7 +6,7 @@ class TestThreadingLargePool < Test::Unit::TestCase include Mongo - @@db = standard_connection(:pool_size => 50, :timeout => 60).db(MONGO_TEST_DB) + @@db = standard_connection(:pool_size => 1, :timeout => 60).db(MONGO_TEST_DB) @@coll = @@db.collection('thread-test-collection') def set_up_safe_data