minor: Cursor cleanup
This commit is contained in:
parent
1c439df278
commit
75941ad2a3
|
@ -25,7 +25,7 @@ module Mongo
|
||||||
attr_reader :collection, :selector, :fields,
|
attr_reader :collection, :selector, :fields,
|
||||||
:order, :hint, :snapshot, :timeout,
|
:order, :hint, :snapshot, :timeout,
|
||||||
:full_collection_name, :transformer,
|
:full_collection_name, :transformer,
|
||||||
:options
|
:options, :cursor_id
|
||||||
|
|
||||||
# Create a new cursor.
|
# Create a new cursor.
|
||||||
#
|
#
|
||||||
|
@ -282,11 +282,8 @@ module Mongo
|
||||||
# puts doc['user']
|
# puts doc['user']
|
||||||
# end
|
# end
|
||||||
def each
|
def each
|
||||||
#num_returned = 0
|
while doc = self.next
|
||||||
#while has_next? && (@limit <= 0 || num_returned < @limit)
|
yield doc
|
||||||
while doc = next_document
|
|
||||||
yield doc #next_document
|
|
||||||
#num_returned += 1
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -345,7 +342,9 @@ module Mongo
|
||||||
# Is this cursor closed?
|
# Is this cursor closed?
|
||||||
#
|
#
|
||||||
# @return [Boolean]
|
# @return [Boolean]
|
||||||
def closed?; @closed; end
|
def closed?
|
||||||
|
@closed
|
||||||
|
end
|
||||||
|
|
||||||
# Returns an integer indicating which query options have been selected.
|
# Returns an integer indicating which query options have been selected.
|
||||||
#
|
#
|
||||||
|
@ -413,7 +412,7 @@ module Mongo
|
||||||
# Clean output for inspect.
|
# Clean output for inspect.
|
||||||
def inspect
|
def inspect
|
||||||
"<Mongo::Cursor:0x#{object_id.to_s(16)} namespace='#{@db.name}.#{@collection.name}' " +
|
"<Mongo::Cursor:0x#{object_id.to_s(16)} namespace='#{@db.name}.#{@collection.name}' " +
|
||||||
"@selector=#{@selector.inspect}>"
|
"@selector=#{@selector.inspect} @cursor_id=#{@cursor_id}>"
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
@ -447,8 +446,31 @@ module Mongo
|
||||||
@cache.length
|
@cache.length
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Refresh the documents in @cache. This means either
|
||||||
|
# sending the initial query or sending a GET_MORE operation.
|
||||||
def refresh
|
def refresh
|
||||||
return if send_initial_query || @cursor_id.zero?
|
if !@query_run
|
||||||
|
send_initial_query
|
||||||
|
elsif !@cursor_id.zero?
|
||||||
|
send_get_more
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_initial_query
|
||||||
|
message = construct_query_message
|
||||||
|
payload = instrument_payload if @connection.logger
|
||||||
|
@connection.instrument(:find, payload) do
|
||||||
|
results, @n_received, @cursor_id = @connection.receive_message(
|
||||||
|
Mongo::Constants::OP_QUERY, message, nil, @socket, @command,
|
||||||
|
@read_preference, @options & OP_QUERY_EXHAUST != 0)
|
||||||
|
@returned += @n_received
|
||||||
|
@cache += results
|
||||||
|
@query_run = true
|
||||||
|
close_cursor_if_query_complete
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_get_more
|
||||||
message = BSON::ByteBuffer.new([0, 0, 0, 0])
|
message = BSON::ByteBuffer.new([0, 0, 0, 0])
|
||||||
|
|
||||||
# DB name.
|
# DB name.
|
||||||
|
@ -475,27 +497,6 @@ module Mongo
|
||||||
close_cursor_if_query_complete
|
close_cursor_if_query_complete
|
||||||
end
|
end
|
||||||
|
|
||||||
# Run query the first time we request an object from the wire
|
|
||||||
# TODO: should we be calling instrument_payload even if logging
|
|
||||||
# is disabled?
|
|
||||||
def send_initial_query
|
|
||||||
if @query_run
|
|
||||||
false
|
|
||||||
else
|
|
||||||
message = construct_query_message
|
|
||||||
@connection.instrument(:find, instrument_payload) do
|
|
||||||
results, @n_received, @cursor_id = @connection.receive_message(
|
|
||||||
Mongo::Constants::OP_QUERY, message, nil, @socket, @command,
|
|
||||||
@read_preference, @options & OP_QUERY_EXHAUST != 0)
|
|
||||||
@returned += @n_received
|
|
||||||
@cache += results
|
|
||||||
@query_run = true
|
|
||||||
close_cursor_if_query_complete
|
|
||||||
end
|
|
||||||
true
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def construct_query_message
|
def construct_query_message
|
||||||
message = BSON::ByteBuffer.new
|
message = BSON::ByteBuffer.new
|
||||||
message.put_int(@options)
|
message.put_int(@options)
|
||||||
|
@ -510,10 +511,10 @@ module Mongo
|
||||||
|
|
||||||
def instrument_payload
|
def instrument_payload
|
||||||
log = { :database => @db.name, :collection => @collection.name, :selector => selector }
|
log = { :database => @db.name, :collection => @collection.name, :selector => selector }
|
||||||
log[:fields] = @fields if @fields
|
log[:fields] = @fields if @fields
|
||||||
log[:skip] = @skip if @skip && (@skip != 0)
|
log[:skip] = @skip if @skip && (@skip != 0)
|
||||||
log[:limit] = @limit if @limit && (@limit != 0)
|
log[:limit] = @limit if @limit && (@limit != 0)
|
||||||
log[:order] = @order if @order
|
log[:order] = @order if @order
|
||||||
log
|
log
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -536,10 +537,6 @@ module Mongo
|
||||||
@order || @explain || @hint || @snapshot
|
@order || @explain || @hint || @snapshot
|
||||||
end
|
end
|
||||||
|
|
||||||
def to_s
|
|
||||||
"DBResponse(flags=#@result_flags, cursor_id=#@cursor_id, start=#@starting_from)"
|
|
||||||
end
|
|
||||||
|
|
||||||
def close_cursor_if_query_complete
|
def close_cursor_if_query_complete
|
||||||
if @limit > 0 && @returned >= @limit
|
if @limit > 0 && @returned >= @limit
|
||||||
close
|
close
|
||||||
|
|
|
@ -81,7 +81,7 @@ class CursorTest < Test::Unit::TestCase
|
||||||
selector = {:a => 1}
|
selector = {:a => 1}
|
||||||
cursor = @@coll.find(selector)
|
cursor = @@coll.find(selector)
|
||||||
assert_equal "<Mongo::Cursor:0x#{cursor.object_id.to_s(16)} namespace='#{@@db.name}.#{@@coll.name}' " +
|
assert_equal "<Mongo::Cursor:0x#{cursor.object_id.to_s(16)} namespace='#{@@db.name}.#{@@coll.name}' " +
|
||||||
"@selector=#{selector.inspect}>", cursor.inspect
|
"@selector=#{selector.inspect} @cursor_id=#{cursor.cursor_id}>", cursor.inspect
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_explain
|
def test_explain
|
||||||
|
|
Loading…
Reference in New Issue