Test request and response ids match

This commit is contained in:
Kyle Banker 2010-12-13 17:54:28 -05:00
parent a17455da27
commit 6db5bb2f51
2 changed files with 21 additions and 17 deletions

View File

@ -424,7 +424,8 @@ module Mongo
#
# @return [Hash] The document returned by the call to getlasterror.
def send_message_with_safe_check(operation, message, db_name, log_message=nil, last_error_params=false)
message_with_headers = add_message_headers(operation, message)
request_id = get_request_id
message_with_headers = add_message_headers(operation, message, request_id)
message_with_check = last_error_message(db_name, last_error_params)
begin
sock = checkout_writer
@ -432,7 +433,7 @@ module Mongo
docs = num_received = cursor_id = ''
@safe_mutexes[sock].synchronize do
send_message_on_socket(packed_message, sock)
docs, num_received, cursor_id = receive(sock)
docs, num_received, cursor_id = receive(sock, request_id + 1)
end
ensure
checkin_writer(sock)
@ -456,14 +457,15 @@ module Mongo
# An array whose indexes include [0] documents returned, [1] number of document received,
# and [3] a cursor_id.
def receive_message(operation, message, log_message=nil, socket=nil, command=false)
packed_message = add_message_headers(operation, message).to_s
request_id = get_request_id
packed_message = add_message_headers(operation, message, request_id).to_s
begin
sock = socket || (command ? checkout_writer : checkout_reader)
result = ''
@safe_mutexes[sock].synchronize do
send_message_on_socket(packed_message, sock)
result = receive(sock)
result = receive(sock, request_id)
end
ensure
command ? checkin_writer(sock) : checkin_reader(sock)
@ -747,26 +749,28 @@ module Mongo
@nodes_to_try = new_nodes - @nodes_tried
end
def receive(sock)
receive_and_discard_header(sock)
def receive(sock, expected_response)
receive_header(sock, expected_response)
number_received, cursor_id = receive_response_header(sock)
read_documents(number_received, cursor_id, sock)
end
def receive_header(sock)
header = BSON::ByteBuffer.new
header.put_binary(receive_message_on_socket(16, sock))
def receive_header(sock, expected_response)
header = receive_message_on_socket(16, sock)
size, request_id, response_to = header.unpack('VVV')
if expected_response != response_to
raise Mongo::ConnectionFailure, "Expected response #{expected_response} but got #{response_to}"
end
unless header.size == STANDARD_HEADER_SIZE
raise "Short read for DB response header: " +
"expected #{STANDARD_HEADER_SIZE} bytes, saw #{header.size}"
end
header.rewind
size = header.get_int
request_id = header.get_int
response_to = header.get_int
op = header.get_int
nil
end
# This is an optimized version of receive_header
# TODO: modify this to check response_id
def receive_and_discard_header(sock)
bytes_read = receive_and_discard_message_on_socket(16, sock)
unless bytes_read == STANDARD_HEADER_SIZE
@ -830,13 +834,13 @@ module Mongo
# Prepares a message for transmission to MongoDB by
# constructing a valid message header.
def add_message_headers(operation, message)
def add_message_headers(operation, message, request_id=get_request_id)
headers = [
# Message size.
16 + message.size,
# Unique request id.
get_request_id,
request_id,
# Response id.
0,

View File

@ -26,7 +26,7 @@ unless defined? MONGO_TEST_DB
end
unless defined? TEST_PORT
TEST_PORT = ENV['MONGO_RUBY_DRIVER_PORT'].to_i || Connection::DEFAULT_PORT
TEST_PORT = ENV['MONGO_RUBY_DRIVER_PORT'] ? ENV['MONGO_RUBY_DRIVER_PORT'].to_i : Mongo::Connection::DEFAULT_PORT
end
unless defined? TEST_HOST