refactoring: moved receive methods from cursor to db class.

This commit is contained in:
Kyle Banker 2009-11-04 17:46:15 -05:00
parent 20ed768460
commit 41c95a0392
2 changed files with 104 additions and 111 deletions

View File

@ -20,11 +20,8 @@ module Mongo
# A cursor over query results. Returned objects are hashes. # A cursor over query results. Returned objects are hashes.
class Cursor class Cursor
include Mongo::Conversions include Mongo::Conversions
include Enumerable include Enumerable
RESPONSE_HEADER_SIZE = 20
attr_reader :collection, :selector, :admin, :fields, attr_reader :collection, :selector, :admin, :fields,
:order, :hint, :snapshot, :timeout, :order, :hint, :snapshot, :timeout,
:full_collection_name :full_collection_name
@ -250,14 +247,14 @@ module Mongo
# the selector will be used in a $where clause. # the selector will be used in a $where clause.
# See http://www.mongodb.org/display/DOCS/Server-side+Code+Execution # See http://www.mongodb.org/display/DOCS/Server-side+Code+Execution
def convert_selector_for_query(selector) def convert_selector_for_query(selector)
case selector case selector
when Hash when Hash
selector selector
when nil when nil
{} {}
when String when String
{"$where" => Code.new(selector)} {"$where" => Code.new(selector)}
when Code when Code
{"$where" => selector} {"$where" => selector}
end end
end end
@ -267,47 +264,6 @@ module Mongo
@order || @explain || @hint || @snapshot @order || @explain || @hint || @snapshot
end end
def read_all
read_message_header
read_response_header
read_objects_off_wire
end
def read_objects_off_wire
while doc = next_object_on_wire
@cache << doc
end
end
def read_message_header
message = ByteBuffer.new
message.put_array(@db.receive_full(16).unpack("C*"))
unless message.size == 16 #HEADER_SIZE
raise "Short read for DB response header: expected #{16} bytes, saw #{message.size}"
end
message.rewind
size = message.get_int
request_id = message.get_int
response_to = message.get_int
op = message.get_int
end
def read_response_header
header_buf = ByteBuffer.new
header_buf.put_array(@db.receive_full(RESPONSE_HEADER_SIZE).unpack("C*"))
raise "Short read for DB response header; expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}" unless header_buf.length == RESPONSE_HEADER_SIZE
header_buf.rewind
@result_flags = header_buf.get_int
@cursor_id = header_buf.get_long
@starting_from = header_buf.get_int
@n_remaining = header_buf.get_int
if @n_received
@n_received += @n_remaining
else
@n_received = @n_remaining
end
end
def num_remaining def num_remaining
refill_via_get_more if @cache.length == 0 refill_via_get_more if @cache.length == 0
@cache.length @cache.length
@ -320,59 +276,36 @@ module Mongo
num_remaining > 0 num_remaining > 0
end end
def next_object_on_wire
# if @n_remaining is 0 but we have a non-zero cursor, there are more
# to fetch, so do a GetMore operation, but don't do it here - do it
# when someone pulls an object out of the cache and it's empty
return nil if @n_remaining == 0
object_from_stream
end
def refill_via_get_more def refill_via_get_more
return if send_query_if_needed || @cursor_id.zero? return if send_query_if_needed || @cursor_id.zero?
@db._synchronize { message = ByteBuffer.new
message = ByteBuffer.new # Reserved.
# Reserved. message.put_int(0)
message.put_int(0)
# DB name. # DB name.
db_name = @admin ? 'admin' : @db.name db_name = @admin ? 'admin' : @db.name
BSON.serialize_cstr(message, "#{db_name}.#{@collection.name}") BSON.serialize_cstr(message, "#{db_name}.#{@collection.name}")
# Number of results to return; db decides for now. # Number of results to return; db decides for now.
message.put_int(0) message.put_int(0)
# Cursor id. # Cursor id.
message.put_long(@cursor_id) message.put_long(@cursor_id)
@db.send_message_with_operation_without_synchronize(Mongo::Constants::OP_GET_MORE, message, "cursor.get_more()") results, @n_received, @cursor_id = @db.receive_message_with_operation(Mongo::Constants::OP_GET_MORE, message, "cursor.get_more()")
read_all @cache += results
}
close_cursor_if_query_complete close_cursor_if_query_complete
end end
def object_from_stream # Run query first time we request an object from the wire
buf = ByteBuffer.new
buf.put_array(@db.receive_full(4).unpack("C*"))
buf.rewind
size = buf.get_int
buf.put_array(@db.receive_full(size - 4).unpack("C*"), 4)
@n_remaining -= 1
buf.rewind
BSON.new.deserialize(buf)
end
def send_query_if_needed def send_query_if_needed
# Run query first time we request an object from the wire
if @query_run if @query_run
false false
else else
message = construct_query_message message = construct_query_message
@db._synchronize { results, @n_received, @cursor_id = @db.receive_message_with_operation(Mongo::Constants::OP_QUERY, message,
@db.send_message_with_operation_without_synchronize(Mongo::Constants::OP_QUERY, message,
(query_log_message if @db.logger)) (query_log_message if @db.logger))
@query_run = true @cache += results
read_all @query_run = true
}
close_cursor_if_query_complete close_cursor_if_query_complete
true true
end end

View File

@ -26,6 +26,8 @@ module Mongo
# A Mongo database. # A Mongo database.
class DB class DB
STANDARD_HEADER_SIZE = 16
RESPONSE_HEADER_SIZE = 20
SYSTEM_NAMESPACE_COLLECTION = "system.namespaces" SYSTEM_NAMESPACE_COLLECTION = "system.namespaces"
SYSTEM_INDEX_COLLECTION = "system.indexes" SYSTEM_INDEX_COLLECTION = "system.indexes"
SYSTEM_PROFILE_COLLECTION = "system.profile" SYSTEM_PROFILE_COLLECTION = "system.profile"
@ -49,6 +51,8 @@ module Mongo
# The name of the database. # The name of the database.
attr_reader :name attr_reader :name
attr_reader :connection
# Host to which we are currently connected. # Host to which we are currently connected.
attr_reader :host attr_reader :host
# Port to which we are currently connected. # Port to which we are currently connected.
@ -133,6 +137,8 @@ module Mongo
raise InvalidName, "database name cannot be the empty string" raise InvalidName, "database name cannot be the empty string"
end end
@connection = options[:connection]
@name, @nodes = db_name, nodes @name, @nodes = db_name, nodes
@strict = options[:strict] @strict = options[:strict]
@pk_factory = options[:pk] @pk_factory = options[:pk]
@ -437,28 +443,79 @@ module Mongo
# +message+, and an optional formatted +log_message+. # +message+, and an optional formatted +log_message+.
# Sends the message to the databse, adding the necessary headers. # Sends the message to the databse, adding the necessary headers.
def send_message_with_operation(operation, message, log_message=nil) def send_message_with_operation(operation, message, log_message=nil)
message_with_headers = add_message_headers(operation, message).to_s
@logger.debug(" MONGODB #{log_message || message}") if @logger
@semaphore.synchronize do @semaphore.synchronize do
connect_to_master if !connected? && @auto_reconnect send_message_on_socket(message_with_headers)
begin
message_with_headers = add_message_headers(operation, message)
@logger.debug(" MONGODB #{log_message || message}") if @logger
@socket.print(message_with_headers.to_s)
@socket.flush
rescue => ex
close
raise ex
end
end end
end end
# Note: this is a temporary method. Will be removed in an upcoming # Note: this method is a stub. Will be completed in an upcoming refactoring.
# refactoring. def receive_message_with_operation(operation, message, log_message=nil)
def send_message_with_operation_without_synchronize(operation, message, log_message=nil) message_with_headers = add_message_headers(operation, message).to_s
@logger.debug(" MONGODB #{log_message || message}") if @logger
@semaphore.synchronize do
send_message_on_socket(message_with_headers)
receive
end
end
def receive
receive_header
number_received, cursor_id = receive_response_header
read_documents(number_received, cursor_id)
end
def receive_header
header = ByteBuffer.new
header.put_array(receive_data_on_socket(16).unpack("C*"))
unless header.size == STANDARD_HEADER_SIZE
raise "Short read for DB response header: " +
"expected #{STANDARD_HEADER_SIZE} bytes, saw #{header.size}"
end
header.rewind
size = header.get_int
request_id = header.get_int
response_to = header.get_int
op = header.get_int
end
def receive_response_header
header_buf = ByteBuffer.new
header_buf.put_array(receive_data_on_socket(RESPONSE_HEADER_SIZE).unpack("C*"))
if header_buf.length != RESPONSE_HEADER_SIZE
raise "Short read for DB response header; " +
"expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}"
end
header_buf.rewind
result_flags = header_buf.get_int
cursor_id = header_buf.get_long
starting_from = header_buf.get_int
number_remaining = header_buf.get_int
[number_remaining, cursor_id]
end
def read_documents(number_received, cursor_id)
docs = []
number_remaining = number_received
while number_remaining > 0 do
buf = ByteBuffer.new
buf.put_array(receive_data_on_socket(4).unpack("C*"))
buf.rewind
size = buf.get_int
buf.put_array(receive_data_on_socket(size - 4).unpack("C*"), 4)
number_remaining -= 1
buf.rewind
docs << BSON.new.deserialize(buf)
end
[docs, number_received, cursor_id]
end
# Sending a message on socket.
def send_message_on_socket(message_with_headers)
connect_to_master if !connected? && @auto_reconnect connect_to_master if !connected? && @auto_reconnect
begin begin
message_with_headers = add_message_headers(operation, message) @socket.print(message_with_headers)
@logger.debug(" MONGODB #{log_message || message}") if @logger
@socket.print(message_with_headers.to_s)
@socket.flush @socket.flush
rescue => ex rescue => ex
close close
@ -466,12 +523,15 @@ module Mongo
end end
end end
# Note: this method is a stub. Will be completed in an upcoming refactoring. # Receive data of specified length on socket.
def receive_message_with_operation(operation, message) def receive_data_on_socket(length)
@semaphore.synchronize do message = ""
while message.length < length do
chunk = @socket.recv(length - message.length)
raise "connection closed" unless chunk.length > 0
message += chunk
end end
message
end end
# Return +true+ if +doc+ contains an 'ok' field with the value 1. # Return +true+ if +doc+ contains an 'ok' field with the value 1.