diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index a120a40..acb7b03 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -18,9 +18,6 @@ require 'mongo/message' require 'mongo/util/byte_buffer' require 'mongo/util/bson' -require 'logger' -LOG = Logger.new('recv_file.log', 'daily') - module XGen module Mongo module Driver @@ -163,15 +160,12 @@ module XGen end def read_message_header - MessageHeader.new.read_header(@db.socket) + MessageHeader.new.read_header(@db) end def read_response_header header_buf = ByteBuffer.new - read = @db.socket.recv(RESPONSE_HEADER_SIZE) - header_buf.put_array(read.unpack("C*")) - LOG.debug "resp head: #{read.inspect}\n" - raise "BAD SIZE" unless read.length == RESPONSE_HEADER_SIZE + header_buf.put_array(@db.receive_full(RESPONSE_HEADER_SIZE).unpack("C*")) raise "Short read for DB response header; expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}" unless header_buf.length == RESPONSE_HEADER_SIZE header_buf.rewind @result_flags = header_buf.get_int @@ -206,21 +200,10 @@ module XGen def object_from_stream buf = ByteBuffer.new - read = @db.socket.recv(4) - LOG.debug "size: #{read.inspect}\n" - raise "BAD SIZE" unless read.length == 4 - buf.put_array(read.unpack("C*")) + buf.put_array(@db.receive_full(4).unpack("C*")) buf.rewind size = buf.get_int - # TODO debugging here for a bit - begin - blah = @db.socket.recv(size-4) - buf.put_array(blah.unpack("C*"), 4) - LOG.debug "body: #{blah.inspect}\n" - raise "BAD SIZE" unless blah.length == size-4 - rescue => ex - raise "#{ex.class}: #{ex.message} ***size was #{size}*size string was #{read.inspect}*res_flags #{@result_flags}*cursor_id #{@cursor_id}*starting_from #{@starting_from}*n_returned #{@n_returned}***" - end + buf.put_array(@db.receive_full(size - 4).unpack("C*"), 4) @n_remaining -= 1 buf.rewind BSON.new(@db).deserialize(buf) diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index 6fa7329..212c1ec 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -301,6 +301,16 @@ module XGen @socket != nil end + def receive_full(length) + message = "" + while message.length < length do + chunk = @socket.recv(length - message.length) + raise "connection closed" unless chunk.length > 0 + message += chunk + end + message + end + # Send a MsgMessage to the database. def send_message(msg) send_to_db(MsgMessage.new(msg)) diff --git a/lib/mongo/message/message_header.rb b/lib/mongo/message/message_header.rb index 8696c0b..cf1ffc0 100644 --- a/lib/mongo/message/message_header.rb +++ b/lib/mongo/message/message_header.rb @@ -28,12 +28,9 @@ module XGen @buf = ByteBuffer.new end - def read_header(socket) + def read_header(db) @buf.rewind - read = socket.recv(HEADER_SIZE) - @buf.put_array(read.unpack("C*")) - LOG.debug "header: #{read.inspect}\n" - raise "BAD SIZE" unless read.length == HEADER_SIZE + @buf.put_array(db.receive_full(HEADER_SIZE).unpack("C*")) raise "Short read for DB response header: expected #{HEADER_SIZE} bytes, saw #{@buf.size}" unless @buf.size == HEADER_SIZE @buf.rewind @size = @buf.get_int