Fixed request id checking for threaded inserts.

This commit is contained in:
Kyle Banker 2010-12-15 12:12:51 -05:00
parent 81fff198fe
commit 39b9656fc4
3 changed files with 22 additions and 22 deletions

View File

@ -357,7 +357,8 @@ module Mongo
# @return [Integer] number of bytes sent # @return [Integer] number of bytes sent
def send_message(operation, message, log_message=nil) def send_message(operation, message, log_message=nil)
begin begin
packed_message = add_message_headers(operation, message).to_s add_message_headers(message, operation)
packed_message = message.to_s
socket = checkout_writer socket = checkout_writer
send_message_on_socket(packed_message, socket) send_message_on_socket(packed_message, socket)
ensure ensure
@ -378,16 +379,19 @@ module Mongo
# #
# @return [Hash] The document returned by the call to getlasterror. # @return [Hash] The document returned by the call to getlasterror.
def send_message_with_safe_check(operation, message, db_name, log_message=nil, last_error_params=false) def send_message_with_safe_check(operation, message, db_name, log_message=nil, last_error_params=false)
request_id = get_request_id docs = num_received = cursor_id = ''
message_with_headers = add_message_headers(operation, message, request_id) add_message_headers(message, operation)
message_with_check = last_error_message(db_name, last_error_params)
last_error_message = BSON::ByteBuffer.new
build_last_error_message(last_error_message, db_name, last_error_params)
last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY)
packed_message = message.append!(last_error_message).to_s
begin begin
sock = checkout_writer sock = checkout_writer
packed_message = message_with_headers.append!(message_with_check).to_s
docs = num_received = cursor_id = ''
@safe_mutexes[sock].synchronize do @safe_mutexes[sock].synchronize do
send_message_on_socket(packed_message, sock) send_message_on_socket(packed_message, sock)
docs, num_received, cursor_id = receive(sock, request_id + 1) docs, num_received, cursor_id = receive(sock, last_error_id)
end end
ensure ensure
checkin_writer(sock) checkin_writer(sock)
@ -411,8 +415,8 @@ module Mongo
# An array whose indexes include [0] documents returned, [1] number of document received, # An array whose indexes include [0] documents returned, [1] number of document received,
# and [3] a cursor_id. # and [3] a cursor_id.
def receive_message(operation, message, log_message=nil, socket=nil, command=false) def receive_message(operation, message, log_message=nil, socket=nil, command=false)
request_id = get_request_id request_id = add_message_headers(message, operation)
packed_message = add_message_headers(operation, message, request_id).to_s packed_message = message.to_s
begin begin
sock = socket || (command ? checkout_writer : checkout_reader) sock = socket || (command ? checkout_writer : checkout_reader)
@ -684,8 +688,7 @@ module Mongo
# Constructs a getlasterror message. This method is used exclusively by # Constructs a getlasterror message. This method is used exclusively by
# Connection#send_message_with_safe_check. # Connection#send_message_with_safe_check.
def last_error_message(db_name, opts) def build_last_error_message(message, db_name, opts)
message = BSON::ByteBuffer.new
message.put_int(0) message.put_int(0)
BSON::BSON_RUBY.serialize_cstr(message, "#{db_name}.$cmd") BSON::BSON_RUBY.serialize_cstr(message, "#{db_name}.$cmd")
message.put_int(0) message.put_int(0)
@ -697,18 +700,20 @@ module Mongo
cmd.merge!(opts) cmd.merge!(opts)
end end
message.put_binary(BSON::BSON_CODER.serialize(cmd, false).to_s) message.put_binary(BSON::BSON_CODER.serialize(cmd, false).to_s)
add_message_headers(Mongo::Constants::OP_QUERY, message) nil
end end
# Prepares a message for transmission to MongoDB by # Prepares a message for transmission to MongoDB by
# constructing a valid message header. # constructing a valid message header.
def add_message_headers(operation, message, request_id=get_request_id) #
# @returns [Integer] the request id used in the header
def add_message_headers(message, operation)
headers = [ headers = [
# Message size. # Message size.
16 + message.size, 16 + message.size,
# Unique request id. # Unique request id.
request_id, request_id = get_request_id,
# Response id. # Response id.
0, 0,
@ -718,6 +723,8 @@ module Mongo
].pack('VVVV') ].pack('VVVV')
message.prepend!(headers) message.prepend!(headers)
request_id
end end
# Low-level method for sending a message on a socket. # Low-level method for sending a message on a socket.

View File

@ -34,13 +34,6 @@ module Mongo
# Mutex for synchronizing pool access # Mutex for synchronizing pool access
@connection_mutex = Mutex.new @connection_mutex = Mutex.new
# Global safe option. This is false by default.
@safe = options[:safe] || false
# Create a mutex when a new key, in this case a socket,
# is added to the hash.
@safe_mutexes = Hash.new { |h, k| h[k] = Mutex.new }
# Condition variable for signal and wait # Condition variable for signal and wait
@queue = ConditionVariable.new @queue = ConditionVariable.new

View File

@ -6,7 +6,7 @@ class TestThreadingLargePool < Test::Unit::TestCase
include Mongo include Mongo
@@db = standard_connection(:pool_size => 50, :timeout => 60).db(MONGO_TEST_DB) @@db = standard_connection(:pool_size => 1, :timeout => 60).db(MONGO_TEST_DB)
@@coll = @@db.collection('thread-test-collection') @@coll = @@db.collection('thread-test-collection')
def set_up_safe_data def set_up_safe_data