RUBY-180 Fix batch size
This commit is contained in:
parent
b339871eb9
commit
36c71ef48e
|
@ -50,19 +50,20 @@ module Mongo
|
||||||
@explain = options[:explain]
|
@explain = options[:explain]
|
||||||
@socket = options[:socket]
|
@socket = options[:socket]
|
||||||
@tailable = options[:tailable] || false
|
@tailable = options[:tailable] || false
|
||||||
@batch_size = options[:batch_size] || 0
|
batch_size(options[:batch_size] || 0)
|
||||||
|
|
||||||
@full_collection_name = "#{@collection.db.name}.#{@collection.name}"
|
@full_collection_name = "#{@collection.db.name}.#{@collection.name}"
|
||||||
@cache = []
|
@cache = []
|
||||||
@closed = false
|
@closed = false
|
||||||
@query_run = false
|
@query_run = false
|
||||||
|
@returned = 0
|
||||||
end
|
end
|
||||||
|
|
||||||
# Get the next document specified the cursor options.
|
# Get the next document specified the cursor options.
|
||||||
#
|
#
|
||||||
# @return [Hash, Nil] the next document or Nil if no documents remain.
|
# @return [Hash, Nil] the next document or Nil if no documents remain.
|
||||||
def next_document
|
def next_document
|
||||||
refresh if num_remaining == 0
|
refresh if @cache.length == 0#empty?# num_remaining == 0
|
||||||
doc = @cache.shift
|
doc = @cache.shift
|
||||||
|
|
||||||
if doc && doc['$err']
|
if doc && doc['$err']
|
||||||
|
@ -179,6 +180,26 @@ module Mongo
|
||||||
self
|
self
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Set the batch size for server responses.
|
||||||
|
#
|
||||||
|
# Note that the batch size will take effect only on queries
|
||||||
|
# where the number to be returned is greater than 100.
|
||||||
|
#
|
||||||
|
# @param [Integer] size either 0 or some integer greater than 1. If 0,
|
||||||
|
# the server will determine the batch size.
|
||||||
|
#
|
||||||
|
# @return [Cursor]
|
||||||
|
def batch_size(size=0)
|
||||||
|
check_modifiable
|
||||||
|
if size < 0 || size == 1
|
||||||
|
raise ArgumentError, "Invalid value for batch_size #{size}; must be 0 or > 1."
|
||||||
|
else
|
||||||
|
@batch_size = size > @limit ? @limit : size
|
||||||
|
end
|
||||||
|
|
||||||
|
self
|
||||||
|
end
|
||||||
|
|
||||||
# Iterate over each document in this cursor, yielding it to the given
|
# Iterate over each document in this cursor, yielding it to the given
|
||||||
# block.
|
# block.
|
||||||
#
|
#
|
||||||
|
@ -191,10 +212,11 @@ module Mongo
|
||||||
# puts doc['user']
|
# puts doc['user']
|
||||||
# end
|
# end
|
||||||
def each
|
def each
|
||||||
num_returned = 0
|
#num_returned = 0
|
||||||
while has_next? && (@limit <= 0 || num_returned < @limit)
|
#while has_next? && (@limit <= 0 || num_returned < @limit)
|
||||||
yield next_document
|
while doc = next_document
|
||||||
num_returned += 1
|
yield doc #next_document
|
||||||
|
#num_returned += 1
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -220,7 +242,7 @@ module Mongo
|
||||||
#
|
#
|
||||||
# @core explain explain-instance_method
|
# @core explain explain-instance_method
|
||||||
def explain
|
def explain
|
||||||
c = Cursor.new(@collection, query_options_hash.merge(:limit => -@limit.abs, :explain => true))
|
c = Cursor.new(@collection, query_opti/ns_hash.merge(:limit => -@limit.abs, :explain => true))
|
||||||
explanation = c.next_document
|
explanation = c.next_document
|
||||||
c.close
|
c.close
|
||||||
|
|
||||||
|
@ -327,7 +349,7 @@ module Mongo
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Return a number of documents remaining for this cursor.
|
# Return the number of documents remaining for this cursor.
|
||||||
def num_remaining
|
def num_remaining
|
||||||
refresh if @cache.length == 0
|
refresh if @cache.length == 0
|
||||||
@cache.length
|
@cache.length
|
||||||
|
@ -341,13 +363,22 @@ module Mongo
|
||||||
BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@collection.name}")
|
BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@collection.name}")
|
||||||
|
|
||||||
# Number of results to return.
|
# Number of results to return.
|
||||||
message.put_int(@batch_size)
|
if @limit
|
||||||
|
if (@returned + @batch_size) > @limit
|
||||||
|
message.put_int(@limit - @returned)
|
||||||
|
else
|
||||||
|
message.put_int(@batch_size)
|
||||||
|
end
|
||||||
|
else
|
||||||
|
message.put_int(@batch_size)
|
||||||
|
end
|
||||||
|
|
||||||
# Cursor id.
|
# Cursor id.
|
||||||
message.put_long(@cursor_id)
|
message.put_long(@cursor_id)
|
||||||
@logger.debug("MONGODB cursor.refresh() for cursor #{@cursor_id}") if @logger
|
@logger.debug("MONGODB cursor.refresh() for cursor #{@cursor_id}") if @logger
|
||||||
results, @n_received, @cursor_id = @connection.receive_message(Mongo::Constants::OP_GET_MORE,
|
results, @n_received, @cursor_id = @connection.receive_message(Mongo::Constants::OP_GET_MORE,
|
||||||
message, nil, @socket)
|
message, nil, @socket)
|
||||||
|
@returned += @n_received
|
||||||
@cache += results
|
@cache += results
|
||||||
close_cursor_if_query_complete
|
close_cursor_if_query_complete
|
||||||
end
|
end
|
||||||
|
@ -360,6 +391,7 @@ module Mongo
|
||||||
message = construct_query_message
|
message = construct_query_message
|
||||||
@logger.debug query_log_message if @logger
|
@logger.debug query_log_message if @logger
|
||||||
results, @n_received, @cursor_id = @connection.receive_message(Mongo::Constants::OP_QUERY, message, nil, @socket)
|
results, @n_received, @cursor_id = @connection.receive_message(Mongo::Constants::OP_QUERY, message, nil, @socket)
|
||||||
|
@returned += @n_received
|
||||||
@cache += results
|
@cache += results
|
||||||
@query_run = true
|
@query_run = true
|
||||||
close_cursor_if_query_complete
|
close_cursor_if_query_complete
|
||||||
|
@ -406,7 +438,9 @@ module Mongo
|
||||||
end
|
end
|
||||||
|
|
||||||
def close_cursor_if_query_complete
|
def close_cursor_if_query_complete
|
||||||
close if @limit > 0 && @n_received >= @limit
|
if @limit > 0 && @n_received >= @limit
|
||||||
|
close
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def check_modifiable
|
def check_modifiable
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
require 'test/test_helper'
|
||||||
|
require 'logger'
|
||||||
|
|
||||||
|
class CursorTest < Test::Unit::TestCase
|
||||||
|
|
||||||
|
include Mongo
|
||||||
|
|
||||||
|
@@connection = Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
|
||||||
|
ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT)
|
||||||
|
@@db = @@connection.db(MONGO_TEST_DB)
|
||||||
|
@@coll = @@db.collection('test')
|
||||||
|
@@version = @@connection.server_version
|
||||||
|
|
||||||
|
def setup
|
||||||
|
@@coll.remove
|
||||||
|
@@coll.insert('a' => 1) # collection not created until it's used
|
||||||
|
@@coll_full_name = "#{MONGO_TEST_DB}.test"
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_valid_batch_sizes
|
||||||
|
assert_raise ArgumentError do
|
||||||
|
@@coll.find({}, :batch_size => 1, :limit => 5)
|
||||||
|
end
|
||||||
|
|
||||||
|
assert_raise ArgumentError do
|
||||||
|
@@coll.find({}, :batch_size => -1, :limit => 5)
|
||||||
|
end
|
||||||
|
|
||||||
|
assert @@coll.find({}, :batch_size => 0, :limit => 5)
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_batch_size
|
||||||
|
@@coll.remove
|
||||||
|
200.times do |n|
|
||||||
|
@@coll.insert({:a => n})
|
||||||
|
end
|
||||||
|
|
||||||
|
list = @@coll.find({}, :batch_size => 2, :limit => 6).to_a
|
||||||
|
assert_equal 6, list.length
|
||||||
|
|
||||||
|
list = @@coll.find({}, :batch_size => 100, :limit => 101).to_a
|
||||||
|
assert_equal 101, list.length
|
||||||
|
end
|
||||||
|
end
|
|
@ -428,4 +428,5 @@ class CursorTest < Test::Unit::TestCase
|
||||||
cursor.rewind!
|
cursor.rewind!
|
||||||
assert_equal 100, cursor.map {|doc| doc }.length
|
assert_equal 100, cursor.map {|doc| doc }.length
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
end
|
||||||
|
|
Loading…
Reference in New Issue