do all recv's in a loop. remove debugging cruft
This commit is contained in:
parent
8b18502cd0
commit
79b3df3c9e
|
@ -18,9 +18,6 @@ require 'mongo/message'
|
|||
require 'mongo/util/byte_buffer'
|
||||
require 'mongo/util/bson'
|
||||
|
||||
require 'logger'
|
||||
LOG = Logger.new('recv_file.log', 'daily')
|
||||
|
||||
module XGen
|
||||
module Mongo
|
||||
module Driver
|
||||
|
@ -163,15 +160,12 @@ module XGen
|
|||
end
|
||||
|
||||
def read_message_header
|
||||
MessageHeader.new.read_header(@db.socket)
|
||||
MessageHeader.new.read_header(@db)
|
||||
end
|
||||
|
||||
def read_response_header
|
||||
header_buf = ByteBuffer.new
|
||||
read = @db.socket.recv(RESPONSE_HEADER_SIZE)
|
||||
header_buf.put_array(read.unpack("C*"))
|
||||
LOG.debug "resp head: #{read.inspect}\n"
|
||||
raise "BAD SIZE" unless read.length == RESPONSE_HEADER_SIZE
|
||||
header_buf.put_array(@db.receive_full(RESPONSE_HEADER_SIZE).unpack("C*"))
|
||||
raise "Short read for DB response header; expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}" unless header_buf.length == RESPONSE_HEADER_SIZE
|
||||
header_buf.rewind
|
||||
@result_flags = header_buf.get_int
|
||||
|
@ -206,21 +200,10 @@ module XGen
|
|||
|
||||
def object_from_stream
|
||||
buf = ByteBuffer.new
|
||||
read = @db.socket.recv(4)
|
||||
LOG.debug "size: #{read.inspect}\n"
|
||||
raise "BAD SIZE" unless read.length == 4
|
||||
buf.put_array(read.unpack("C*"))
|
||||
buf.put_array(@db.receive_full(4).unpack("C*"))
|
||||
buf.rewind
|
||||
size = buf.get_int
|
||||
# TODO debugging here for a bit
|
||||
begin
|
||||
blah = @db.socket.recv(size-4)
|
||||
buf.put_array(blah.unpack("C*"), 4)
|
||||
LOG.debug "body: #{blah.inspect}\n"
|
||||
raise "BAD SIZE" unless blah.length == size-4
|
||||
rescue => ex
|
||||
raise "#{ex.class}: #{ex.message} ***size was #{size}*size string was #{read.inspect}*res_flags #{@result_flags}*cursor_id #{@cursor_id}*starting_from #{@starting_from}*n_returned #{@n_returned}***"
|
||||
end
|
||||
buf.put_array(@db.receive_full(size - 4).unpack("C*"), 4)
|
||||
@n_remaining -= 1
|
||||
buf.rewind
|
||||
BSON.new(@db).deserialize(buf)
|
||||
|
|
|
@ -301,6 +301,16 @@ module XGen
|
|||
@socket != nil
|
||||
end
|
||||
|
||||
def receive_full(length)
|
||||
message = ""
|
||||
while message.length < length do
|
||||
chunk = @socket.recv(length - message.length)
|
||||
raise "connection closed" unless chunk.length > 0
|
||||
message += chunk
|
||||
end
|
||||
message
|
||||
end
|
||||
|
||||
# Send a MsgMessage to the database.
|
||||
def send_message(msg)
|
||||
send_to_db(MsgMessage.new(msg))
|
||||
|
|
|
@ -28,12 +28,9 @@ module XGen
|
|||
@buf = ByteBuffer.new
|
||||
end
|
||||
|
||||
def read_header(socket)
|
||||
def read_header(db)
|
||||
@buf.rewind
|
||||
read = socket.recv(HEADER_SIZE)
|
||||
@buf.put_array(read.unpack("C*"))
|
||||
LOG.debug "header: #{read.inspect}\n"
|
||||
raise "BAD SIZE" unless read.length == HEADER_SIZE
|
||||
@buf.put_array(db.receive_full(HEADER_SIZE).unpack("C*"))
|
||||
raise "Short read for DB response header: expected #{HEADER_SIZE} bytes, saw #{@buf.size}" unless @buf.size == HEADER_SIZE
|
||||
@buf.rewind
|
||||
@size = @buf.get_int
|
||||
|
|
Loading…
Reference in New Issue