diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index f17abe9..683c110 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -462,11 +462,13 @@ module Mongo # @param [Socket] socket a socket to use in lieu of checking out a new one. # @param [Boolean] command (false) indicate whether this is a command. If this is a command, # the message will be sent to the primary node. + # @param [Boolean] command (false) indicate whether the cursor should be exhausted. Set + # this to true only when the OP_QUERY_EXHAUST flag is set. # # @return [Array] # An array whose indexes include [0] documents returned, [1] number of document received, # and [3] a cursor_id. - def receive_message(operation, message, log_message=nil, socket=nil, command=false, read=:primary) + def receive_message(operation, message, log_message=nil, socket=nil, command=false, read=:primary, exhaust=false) request_id = add_message_headers(message, operation) packed_message = message.to_s begin @@ -489,7 +491,7 @@ module Mongo result = '' @safe_mutexes[sock].synchronize do send_message_on_socket(packed_message, sock) - result = receive(sock, request_id) + result = receive(sock, request_id, exhaust) end ensure if should_checkin @@ -785,21 +787,38 @@ module Mongo ## Low-level connection methods. - def receive(sock, expected_response) + def receive(sock, cursor_id, exhaust) begin - receive_header(sock, expected_response) - number_received, cursor_id = receive_response_header(sock) - read_documents(number_received, cursor_id, sock) + if exhaust + docs = [] + num_received = 0 + + while(cursor_id != 0) do + receive_header(sock, cursor_id, exhaust) + number_received, cursor_id = receive_response_header(sock) + new_docs, n = read_documents(number_received, sock) + docs += new_docs + num_received += n + end + + return [docs, num_received, cursor_id] + else + receive_header(sock, cursor_id, exhaust) + number_received, cursor_id = receive_response_header(sock) + docs, num_received = read_documents(number_received, sock) + + return [docs, num_received, cursor_id] + end rescue Mongo::ConnectionFailure => ex close raise ex end end - def receive_header(sock, expected_response) + def receive_header(sock, expected_response, exhaust=false) header = receive_message_on_socket(16, sock) size, request_id, response_to = header.unpack('VVV') - if expected_response != response_to + if !exhaust && expected_response != response_to raise Mongo::ConnectionFailure, "Expected response #{expected_response} but got #{response_to}" end @@ -831,7 +850,7 @@ module Mongo end end - def read_documents(number_received, cursor_id, sock) + def read_documents(number_received, sock) docs = [] number_remaining = number_received while number_remaining > 0 do @@ -841,7 +860,7 @@ module Mongo number_remaining -= 1 docs << BSON::BSON_CODER.deserialize(buf) end - [docs, number_received, cursor_id] + [docs, number_received] end # Constructs a getlasterror message. This method is used exclusively by diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 7941747..75332a2 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -115,7 +115,14 @@ module Mongo # # @return [Hash, Nil] the next document or Nil if no documents remain. def next - refresh if @cache.length == 0 + if @cache.length == 0 + if @query_run && (@options & OP_QUERY_EXHAUST != 0) + close + return nil + else + refresh + end + end doc = @cache.shift if doc && doc['$err'] @@ -428,7 +435,15 @@ module Mongo # Return the number of documents remaining for this cursor. def num_remaining - refresh if @cache.length == 0 + if @cache.length == 0 + if @query_run && (@options & OP_QUERY_EXHAUST != 0) + close + return 0 + else + refresh + end + end + @cache.length end @@ -470,7 +485,8 @@ module Mongo message = construct_query_message @connection.instrument(:find, instrument_payload) do results, @n_received, @cursor_id = @connection.receive_message( - Mongo::Constants::OP_QUERY, message, nil, @socket, @command, @read_preference) + Mongo::Constants::OP_QUERY, message, nil, @socket, @command, + @read_preference, @options & OP_QUERY_EXHAUST != 0) @returned += @n_received @cache += results @query_run = true diff --git a/test/cursor_test.rb b/test/cursor_test.rb index c78d4ce..b4c6267 100644 --- a/test/cursor_test.rb +++ b/test/cursor_test.rb @@ -50,6 +50,33 @@ class CursorTest < Test::Unit::TestCase end end + def test_exhaust + if @@version >= "2.0" + @@coll.remove + data = "1" * 100_000 + 10_000.times do |n| + @@coll.insert({:n => n, :data => data}) + end + + c = Cursor.new(@@coll) + c.add_option(OP_QUERY_EXHAUST) + assert_equal @@coll.count, c.to_a.size + assert c.closed? + + c = Cursor.new(@@coll) + c.add_option(OP_QUERY_EXHAUST) + 9999.times do + c.next + end + assert c.has_next? + assert c.next + assert !c.has_next? + assert c.closed? + + @@coll.remove + end + end + def test_inspect selector = {:a => 1} cursor = @@coll.find(selector)