RUBY-312 Enable exhaust-mode queries.

This commit is contained in:
Kyle Banker 2011-09-07 10:02:10 -04:00
parent 75ff1aa633
commit 1c439df278
3 changed files with 75 additions and 13 deletions

View File

@ -462,11 +462,13 @@ module Mongo
# @param [Socket] socket a socket to use in lieu of checking out a new one.
# @param [Boolean] command (false) indicate whether this is a command. If this is a command,
# the message will be sent to the primary node.
# @param [Boolean] command (false) indicate whether the cursor should be exhausted. Set
# this to true only when the OP_QUERY_EXHAUST flag is set.
#
# @return [Array]
# An array whose indexes include [0] documents returned, [1] number of document received,
# and [3] a cursor_id.
def receive_message(operation, message, log_message=nil, socket=nil, command=false, read=:primary)
def receive_message(operation, message, log_message=nil, socket=nil, command=false, read=:primary, exhaust=false)
request_id = add_message_headers(message, operation)
packed_message = message.to_s
begin
@ -489,7 +491,7 @@ module Mongo
result = ''
@safe_mutexes[sock].synchronize do
send_message_on_socket(packed_message, sock)
result = receive(sock, request_id)
result = receive(sock, request_id, exhaust)
end
ensure
if should_checkin
@ -785,21 +787,38 @@ module Mongo
## Low-level connection methods.
def receive(sock, expected_response)
def receive(sock, cursor_id, exhaust)
begin
receive_header(sock, expected_response)
number_received, cursor_id = receive_response_header(sock)
read_documents(number_received, cursor_id, sock)
if exhaust
docs = []
num_received = 0
while(cursor_id != 0) do
receive_header(sock, cursor_id, exhaust)
number_received, cursor_id = receive_response_header(sock)
new_docs, n = read_documents(number_received, sock)
docs += new_docs
num_received += n
end
return [docs, num_received, cursor_id]
else
receive_header(sock, cursor_id, exhaust)
number_received, cursor_id = receive_response_header(sock)
docs, num_received = read_documents(number_received, sock)
return [docs, num_received, cursor_id]
end
rescue Mongo::ConnectionFailure => ex
close
raise ex
end
end
def receive_header(sock, expected_response)
def receive_header(sock, expected_response, exhaust=false)
header = receive_message_on_socket(16, sock)
size, request_id, response_to = header.unpack('VVV')
if expected_response != response_to
if !exhaust && expected_response != response_to
raise Mongo::ConnectionFailure, "Expected response #{expected_response} but got #{response_to}"
end
@ -831,7 +850,7 @@ module Mongo
end
end
def read_documents(number_received, cursor_id, sock)
def read_documents(number_received, sock)
docs = []
number_remaining = number_received
while number_remaining > 0 do
@ -841,7 +860,7 @@ module Mongo
number_remaining -= 1
docs << BSON::BSON_CODER.deserialize(buf)
end
[docs, number_received, cursor_id]
[docs, number_received]
end
# Constructs a getlasterror message. This method is used exclusively by

View File

@ -115,7 +115,14 @@ module Mongo
#
# @return [Hash, Nil] the next document or Nil if no documents remain.
def next
refresh if @cache.length == 0
if @cache.length == 0
if @query_run && (@options & OP_QUERY_EXHAUST != 0)
close
return nil
else
refresh
end
end
doc = @cache.shift
if doc && doc['$err']
@ -428,7 +435,15 @@ module Mongo
# Return the number of documents remaining for this cursor.
def num_remaining
refresh if @cache.length == 0
if @cache.length == 0
if @query_run && (@options & OP_QUERY_EXHAUST != 0)
close
return 0
else
refresh
end
end
@cache.length
end
@ -470,7 +485,8 @@ module Mongo
message = construct_query_message
@connection.instrument(:find, instrument_payload) do
results, @n_received, @cursor_id = @connection.receive_message(
Mongo::Constants::OP_QUERY, message, nil, @socket, @command, @read_preference)
Mongo::Constants::OP_QUERY, message, nil, @socket, @command,
@read_preference, @options & OP_QUERY_EXHAUST != 0)
@returned += @n_received
@cache += results
@query_run = true

View File

@ -50,6 +50,33 @@ class CursorTest < Test::Unit::TestCase
end
end
def test_exhaust
if @@version >= "2.0"
@@coll.remove
data = "1" * 100_000
10_000.times do |n|
@@coll.insert({:n => n, :data => data})
end
c = Cursor.new(@@coll)
c.add_option(OP_QUERY_EXHAUST)
assert_equal @@coll.count, c.to_a.size
assert c.closed?
c = Cursor.new(@@coll)
c.add_option(OP_QUERY_EXHAUST)
9999.times do
c.next
end
assert c.has_next?
assert c.next
assert !c.has_next?
assert c.closed?
@@coll.remove
end
end
def test_inspect
selector = {:a => 1}
cursor = @@coll.find(selector)