From 41c95a0392bcbbdbd177021b3be75da3c2d4bc60 Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Wed, 4 Nov 2009 17:46:15 -0500 Subject: [PATCH] refactoring: moved receive methods from cursor to db class. --- lib/mongo/cursor.rb | 113 +++++++++----------------------------------- lib/mongo/db.rb | 102 +++++++++++++++++++++++++++++++-------- 2 files changed, 104 insertions(+), 111 deletions(-) diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 508ccbc..7e49330 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -20,11 +20,8 @@ module Mongo # A cursor over query results. Returned objects are hashes. class Cursor include Mongo::Conversions - include Enumerable - RESPONSE_HEADER_SIZE = 20 - attr_reader :collection, :selector, :admin, :fields, :order, :hint, :snapshot, :timeout, :full_collection_name @@ -250,14 +247,14 @@ module Mongo # the selector will be used in a $where clause. # See http://www.mongodb.org/display/DOCS/Server-side+Code+Execution def convert_selector_for_query(selector) - case selector - when Hash - selector - when nil - {} - when String + case selector + when Hash + selector + when nil + {} + when String {"$where" => Code.new(selector)} - when Code + when Code {"$where" => selector} end end @@ -267,47 +264,6 @@ module Mongo @order || @explain || @hint || @snapshot end - def read_all - read_message_header - read_response_header - read_objects_off_wire - end - - def read_objects_off_wire - while doc = next_object_on_wire - @cache << doc - end - end - - def read_message_header - message = ByteBuffer.new - message.put_array(@db.receive_full(16).unpack("C*")) - unless message.size == 16 #HEADER_SIZE - raise "Short read for DB response header: expected #{16} bytes, saw #{message.size}" - end - message.rewind - size = message.get_int - request_id = message.get_int - response_to = message.get_int - op = message.get_int - end - - def read_response_header - header_buf = ByteBuffer.new - header_buf.put_array(@db.receive_full(RESPONSE_HEADER_SIZE).unpack("C*")) - raise "Short read for DB response header; expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}" unless header_buf.length == RESPONSE_HEADER_SIZE - header_buf.rewind - @result_flags = header_buf.get_int - @cursor_id = header_buf.get_long - @starting_from = header_buf.get_int - @n_remaining = header_buf.get_int - if @n_received - @n_received += @n_remaining - else - @n_received = @n_remaining - end - end - def num_remaining refill_via_get_more if @cache.length == 0 @cache.length @@ -320,59 +276,36 @@ module Mongo num_remaining > 0 end - def next_object_on_wire - # if @n_remaining is 0 but we have a non-zero cursor, there are more - # to fetch, so do a GetMore operation, but don't do it here - do it - # when someone pulls an object out of the cache and it's empty - return nil if @n_remaining == 0 - object_from_stream - end - def refill_via_get_more return if send_query_if_needed || @cursor_id.zero? - @db._synchronize { - message = ByteBuffer.new - # Reserved. - message.put_int(0) + message = ByteBuffer.new + # Reserved. + message.put_int(0) - # DB name. - db_name = @admin ? 'admin' : @db.name - BSON.serialize_cstr(message, "#{db_name}.#{@collection.name}") + # DB name. + db_name = @admin ? 'admin' : @db.name + BSON.serialize_cstr(message, "#{db_name}.#{@collection.name}") - # Number of results to return; db decides for now. - message.put_int(0) + # Number of results to return; db decides for now. + message.put_int(0) - # Cursor id. - message.put_long(@cursor_id) - @db.send_message_with_operation_without_synchronize(Mongo::Constants::OP_GET_MORE, message, "cursor.get_more()") - read_all - } + # Cursor id. + message.put_long(@cursor_id) + results, @n_received, @cursor_id = @db.receive_message_with_operation(Mongo::Constants::OP_GET_MORE, message, "cursor.get_more()") + @cache += results close_cursor_if_query_complete end - def object_from_stream - buf = ByteBuffer.new - buf.put_array(@db.receive_full(4).unpack("C*")) - buf.rewind - size = buf.get_int - buf.put_array(@db.receive_full(size - 4).unpack("C*"), 4) - @n_remaining -= 1 - buf.rewind - BSON.new.deserialize(buf) - end - + # Run query first time we request an object from the wire def send_query_if_needed - # Run query first time we request an object from the wire if @query_run false else message = construct_query_message - @db._synchronize { - @db.send_message_with_operation_without_synchronize(Mongo::Constants::OP_QUERY, message, + results, @n_received, @cursor_id = @db.receive_message_with_operation(Mongo::Constants::OP_QUERY, message, (query_log_message if @db.logger)) - @query_run = true - read_all - } + @cache += results + @query_run = true close_cursor_if_query_complete true end diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index f8e4f2d..5e9e5a0 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -26,6 +26,8 @@ module Mongo # A Mongo database. class DB + STANDARD_HEADER_SIZE = 16 + RESPONSE_HEADER_SIZE = 20 SYSTEM_NAMESPACE_COLLECTION = "system.namespaces" SYSTEM_INDEX_COLLECTION = "system.indexes" SYSTEM_PROFILE_COLLECTION = "system.profile" @@ -49,6 +51,8 @@ module Mongo # The name of the database. attr_reader :name + attr_reader :connection + # Host to which we are currently connected. attr_reader :host # Port to which we are currently connected. @@ -133,6 +137,8 @@ module Mongo raise InvalidName, "database name cannot be the empty string" end + @connection = options[:connection] + @name, @nodes = db_name, nodes @strict = options[:strict] @pk_factory = options[:pk] @@ -437,28 +443,79 @@ module Mongo # +message+, and an optional formatted +log_message+. # Sends the message to the databse, adding the necessary headers. def send_message_with_operation(operation, message, log_message=nil) + message_with_headers = add_message_headers(operation, message).to_s + @logger.debug(" MONGODB #{log_message || message}") if @logger @semaphore.synchronize do - connect_to_master if !connected? && @auto_reconnect - begin - message_with_headers = add_message_headers(operation, message) - @logger.debug(" MONGODB #{log_message || message}") if @logger - @socket.print(message_with_headers.to_s) - @socket.flush - rescue => ex - close - raise ex - end + send_message_on_socket(message_with_headers) end end - # Note: this is a temporary method. Will be removed in an upcoming - # refactoring. - def send_message_with_operation_without_synchronize(operation, message, log_message=nil) + # Note: this method is a stub. Will be completed in an upcoming refactoring. + def receive_message_with_operation(operation, message, log_message=nil) + message_with_headers = add_message_headers(operation, message).to_s + @logger.debug(" MONGODB #{log_message || message}") if @logger + @semaphore.synchronize do + send_message_on_socket(message_with_headers) + receive + end + end + + def receive + receive_header + number_received, cursor_id = receive_response_header + read_documents(number_received, cursor_id) + end + + def receive_header + header = ByteBuffer.new + header.put_array(receive_data_on_socket(16).unpack("C*")) + unless header.size == STANDARD_HEADER_SIZE + raise "Short read for DB response header: " + + "expected #{STANDARD_HEADER_SIZE} bytes, saw #{header.size}" + end + header.rewind + size = header.get_int + request_id = header.get_int + response_to = header.get_int + op = header.get_int + end + + def receive_response_header + header_buf = ByteBuffer.new + header_buf.put_array(receive_data_on_socket(RESPONSE_HEADER_SIZE).unpack("C*")) + if header_buf.length != RESPONSE_HEADER_SIZE + raise "Short read for DB response header; " + + "expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}" + end + header_buf.rewind + result_flags = header_buf.get_int + cursor_id = header_buf.get_long + starting_from = header_buf.get_int + number_remaining = header_buf.get_int + [number_remaining, cursor_id] + end + + def read_documents(number_received, cursor_id) + docs = [] + number_remaining = number_received + while number_remaining > 0 do + buf = ByteBuffer.new + buf.put_array(receive_data_on_socket(4).unpack("C*")) + buf.rewind + size = buf.get_int + buf.put_array(receive_data_on_socket(size - 4).unpack("C*"), 4) + number_remaining -= 1 + buf.rewind + docs << BSON.new.deserialize(buf) + end + [docs, number_received, cursor_id] + end + + # Sending a message on socket. + def send_message_on_socket(message_with_headers) connect_to_master if !connected? && @auto_reconnect begin - message_with_headers = add_message_headers(operation, message) - @logger.debug(" MONGODB #{log_message || message}") if @logger - @socket.print(message_with_headers.to_s) + @socket.print(message_with_headers) @socket.flush rescue => ex close @@ -466,12 +523,15 @@ module Mongo end end - # Note: this method is a stub. Will be completed in an upcoming refactoring. - def receive_message_with_operation(operation, message) - @semaphore.synchronize do - - + # Receive data of specified length on socket. + def receive_data_on_socket(length) + message = "" + while message.length < length do + chunk = @socket.recv(length - message.length) + raise "connection closed" unless chunk.length > 0 + message += chunk end + message end # Return +true+ if +doc+ contains an 'ok' field with the value 1.