f668678fd1
Allow connection pools to grow if needed. All this minimizes the number of locks required and reduces the waiting time for these locks.
627 lines
18 KiB
Ruby
627 lines
18 KiB
Ruby
# encoding: UTF-8
|
|
|
|
# Copyright (C) 2008-2011 10gen Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
module Mongo
|
|
|
|
# A cursor over query results. Returned objects are hashes.
|
|
class Cursor
|
|
include Enumerable
|
|
include Mongo::Constants
|
|
include Mongo::Conversions
|
|
include Mongo::Logging
|
|
|
|
attr_reader :collection, :selector, :fields,
|
|
:order, :hint, :snapshot, :timeout,
|
|
:full_collection_name, :transformer,
|
|
:options, :cursor_id
|
|
|
|
# Create a new cursor.
|
|
#
|
|
# Note: cursors are created when executing queries using [Collection#find] and other
|
|
# similar methods. Application developers shouldn't have to create cursors manually.
|
|
#
|
|
# @return [Cursor]
|
|
#
|
|
# @core cursors constructor_details
|
|
def initialize(collection, opts={})
|
|
@cursor_id = nil
|
|
|
|
@db = collection.db
|
|
@collection = collection
|
|
@connection = @db.connection
|
|
@logger = @connection.logger
|
|
|
|
# Query selector
|
|
@selector = opts[:selector] || {}
|
|
|
|
# Special operators that form part of $query
|
|
@order = opts[:order]
|
|
@explain = opts[:explain]
|
|
@hint = opts[:hint]
|
|
@snapshot = opts[:snapshot]
|
|
@max_scan = opts.fetch(:max_scan, nil)
|
|
@return_key = opts.fetch(:return_key, nil)
|
|
@show_disk_loc = opts.fetch(:show_disk_loc, nil)
|
|
|
|
# Wire-protocol settings
|
|
@fields = convert_fields_for_query(opts[:fields])
|
|
@skip = opts[:skip] || 0
|
|
@limit = opts[:limit] || 0
|
|
@tailable = opts[:tailable] || false
|
|
@timeout = opts.fetch(:timeout, true)
|
|
@options = 0
|
|
|
|
# Use this socket for the query
|
|
@socket = opts[:socket]
|
|
|
|
@closed = false
|
|
@query_run = false
|
|
|
|
@transformer = opts[:transformer]
|
|
if value = opts[:read]
|
|
Mongo::Support.validate_read_preference(value)
|
|
else
|
|
value = collection.read_preference
|
|
end
|
|
@read_preference = value.is_a?(Hash) ? value.dup : value
|
|
batch_size(opts[:batch_size] || 0)
|
|
|
|
@full_collection_name = "#{@collection.db.name}.#{@collection.name}"
|
|
@cache = []
|
|
@returned = 0
|
|
|
|
if(!@timeout)
|
|
add_option(OP_QUERY_NO_CURSOR_TIMEOUT)
|
|
end
|
|
if(@connection.slave_ok?)
|
|
add_option(OP_QUERY_SLAVE_OK)
|
|
end
|
|
if(@tailable)
|
|
add_option(OP_QUERY_TAILABLE)
|
|
end
|
|
|
|
if @collection.name =~ /^\$cmd/ || @collection.name =~ /^system/
|
|
@command = true
|
|
else
|
|
@command = false
|
|
end
|
|
|
|
@checkin_read_pool = false
|
|
@checkin_connection = false
|
|
@read_pool = nil
|
|
end
|
|
|
|
# Guess whether the cursor is alive on the server.
|
|
#
|
|
# Note that this method only checks whether we have
|
|
# a cursor id. The cursor may still have timed out
|
|
# on the server. This will be indicated in the next
|
|
# call to Cursor#next.
|
|
#
|
|
# @return [Boolean]
|
|
def alive?
|
|
@cursor_id && @cursor_id != 0
|
|
end
|
|
|
|
# Get the next document specified the cursor options.
|
|
#
|
|
# @return [Hash, Nil] the next document or Nil if no documents remain.
|
|
def next
|
|
if @cache.length == 0
|
|
if @query_run && (@options & OP_QUERY_EXHAUST != 0)
|
|
close
|
|
return nil
|
|
else
|
|
refresh
|
|
end
|
|
end
|
|
doc = @cache.shift
|
|
|
|
if doc && doc['$err']
|
|
err = doc['$err']
|
|
|
|
# If the server has stopped being the master (e.g., it's one of a
|
|
# pair but it has died or something like that) then we close that
|
|
# connection. The next request will re-open on master server.
|
|
if err == "not master"
|
|
@connection.close
|
|
raise ConnectionFailure.new(err, doc['code'], doc)
|
|
end
|
|
|
|
raise OperationFailure.new(err, doc['code'], doc)
|
|
end
|
|
|
|
if @transformer.nil?
|
|
doc
|
|
else
|
|
@transformer.call(doc) if doc
|
|
end
|
|
end
|
|
alias :next_document :next
|
|
|
|
# Reset this cursor on the server. Cursor options, such as the
|
|
# query string and the values for skip and limit, are preserved.
|
|
def rewind!
|
|
close
|
|
@cache.clear
|
|
@cursor_id = nil
|
|
@closed = false
|
|
@query_run = false
|
|
@n_received = nil
|
|
true
|
|
end
|
|
|
|
# Determine whether this cursor has any remaining results.
|
|
#
|
|
# @return [Boolean]
|
|
def has_next?
|
|
num_remaining > 0
|
|
end
|
|
|
|
# Get the size of the result set for this query.
|
|
#
|
|
# @param [Boolean] whether of not to take notice of skip and limit
|
|
#
|
|
# @return [Integer] the number of objects in the result set for this query.
|
|
#
|
|
# @raise [OperationFailure] on a database error.
|
|
def count(skip_and_limit = false)
|
|
command = BSON::OrderedHash["count", @collection.name, "query", @selector]
|
|
|
|
if skip_and_limit
|
|
command.merge!(BSON::OrderedHash["limit", @limit]) if @limit != 0
|
|
command.merge!(BSON::OrderedHash["skip", @skip]) if @skip != 0
|
|
end
|
|
|
|
command.merge!(BSON::OrderedHash["fields", @fields])
|
|
|
|
response = @db.command(command)
|
|
return response['n'].to_i if Mongo::Support.ok?(response)
|
|
return 0 if response['errmsg'] == "ns missing"
|
|
raise OperationFailure.new("Count failed: #{response['errmsg']}", response['code'], response)
|
|
end
|
|
|
|
# Sort this cursor's results.
|
|
#
|
|
# This method overrides any sort order specified in the Collection#find
|
|
# method, and only the last sort applied has an effect.
|
|
#
|
|
# @param [Symbol, Array] key_or_list either 1) a key to sort by or 2)
|
|
# an array of [key, direction] pairs to sort by. Direction should
|
|
# be specified as Mongo::ASCENDING (or :ascending / :asc) or Mongo::DESCENDING (or :descending / :desc)
|
|
#
|
|
# @raise [InvalidOperation] if this cursor has already been used.
|
|
#
|
|
# @raise [InvalidSortValueError] if the specified order is invalid.
|
|
def sort(key_or_list, direction=nil)
|
|
check_modifiable
|
|
|
|
if !direction.nil?
|
|
order = [[key_or_list, direction]]
|
|
else
|
|
order = key_or_list
|
|
end
|
|
|
|
@order = order
|
|
self
|
|
end
|
|
|
|
# Limit the number of results to be returned by this cursor.
|
|
#
|
|
# This method overrides any limit specified in the Collection#find method,
|
|
# and only the last limit applied has an effect.
|
|
#
|
|
# @return [Integer] the current number_to_return if no parameter is given.
|
|
#
|
|
# @raise [InvalidOperation] if this cursor has already been used.
|
|
#
|
|
# @core limit limit-instance_method
|
|
def limit(number_to_return=nil)
|
|
return @limit unless number_to_return
|
|
check_modifiable
|
|
|
|
@limit = number_to_return
|
|
self
|
|
end
|
|
|
|
# Skips the first +number_to_skip+ results of this cursor.
|
|
# Returns the current number_to_skip if no parameter is given.
|
|
#
|
|
# This method overrides any skip specified in the Collection#find method,
|
|
# and only the last skip applied has an effect.
|
|
#
|
|
# @return [Integer]
|
|
#
|
|
# @raise [InvalidOperation] if this cursor has already been used.
|
|
def skip(number_to_skip=nil)
|
|
return @skip unless number_to_skip
|
|
check_modifiable
|
|
|
|
@skip = number_to_skip
|
|
self
|
|
end
|
|
|
|
# Set the batch size for server responses.
|
|
#
|
|
# Note that the batch size will take effect only on queries
|
|
# where the number to be returned is greater than 100.
|
|
#
|
|
# @param [Integer] size either 0 or some integer greater than 1. If 0,
|
|
# the server will determine the batch size.
|
|
#
|
|
# @return [Cursor]
|
|
def batch_size(size=nil)
|
|
return @batch_size unless size
|
|
check_modifiable
|
|
if size < 0 || size == 1
|
|
raise ArgumentError, "Invalid value for batch_size #{size}; must be 0 or > 1."
|
|
else
|
|
@batch_size = @limit != 0 && size > @limit ? @limit : size
|
|
end
|
|
|
|
self
|
|
end
|
|
|
|
# Iterate over each document in this cursor, yielding it to the given
|
|
# block.
|
|
#
|
|
# Iterating over an entire cursor will close it.
|
|
#
|
|
# @yield passes each document to a block for processing.
|
|
#
|
|
# @example if 'comments' represents a collection of comments:
|
|
# comments.find.each do |doc|
|
|
# puts doc['user']
|
|
# end
|
|
def each
|
|
while doc = self.next
|
|
yield doc
|
|
end
|
|
end
|
|
|
|
# Receive all the documents from this cursor as an array of hashes.
|
|
#
|
|
# Notes:
|
|
#
|
|
# If you've already started iterating over the cursor, the array returned
|
|
# by this method contains only the remaining documents. See Cursor#rewind! if you
|
|
# need to reset the cursor.
|
|
#
|
|
# Use of this method is discouraged - in most cases, it's much more
|
|
# efficient to retrieve documents as you need them by iterating over the cursor.
|
|
#
|
|
# @return [Array] an array of documents.
|
|
def to_a
|
|
super
|
|
end
|
|
|
|
# Get the explain plan for this cursor.
|
|
#
|
|
# @return [Hash] a document containing the explain plan for this cursor.
|
|
#
|
|
# @core explain explain-instance_method
|
|
def explain
|
|
c = Cursor.new(@collection,
|
|
query_options_hash.merge(:limit => -@limit.abs, :explain => true))
|
|
explanation = c.next_document
|
|
c.close
|
|
|
|
explanation
|
|
end
|
|
|
|
# Close the cursor.
|
|
#
|
|
# Note: if a cursor is read until exhausted (read until Mongo::Constants::OP_QUERY or
|
|
# Mongo::Constants::OP_GETMORE returns zero for the cursor id), there is no need to
|
|
# close it manually.
|
|
#
|
|
# Note also: Collection#find takes an optional block argument which can be used to
|
|
# ensure that your cursors get closed.
|
|
#
|
|
# @return [True]
|
|
def close
|
|
if @cursor_id && @cursor_id != 0
|
|
message = BSON::ByteBuffer.new([0, 0, 0, 0])
|
|
message.put_int(1)
|
|
message.put_long(@cursor_id)
|
|
log(:debug, "Cursor#close #{@cursor_id}")
|
|
@connection.send_message(Mongo::Constants::OP_KILL_CURSORS, message, :connection => :reader)
|
|
end
|
|
@cursor_id = 0
|
|
@closed = true
|
|
end
|
|
|
|
# Is this cursor closed?
|
|
#
|
|
# @return [Boolean]
|
|
def closed?
|
|
@closed
|
|
end
|
|
|
|
# Returns an integer indicating which query options have been selected.
|
|
#
|
|
# @return [Integer]
|
|
#
|
|
# @see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY
|
|
# The MongoDB wire protocol.
|
|
def query_opts
|
|
warn "The method Cursor#query_opts has been deprecated " +
|
|
"and will removed in v2.0. Use Cursor#options instead."
|
|
@options
|
|
end
|
|
|
|
# Add an option to the query options bitfield.
|
|
#
|
|
# @param opt a valid query option
|
|
#
|
|
# @raise InvalidOperation if this method is run after the cursor has bee
|
|
# iterated for the first time.
|
|
#
|
|
# @return [Integer] the current value of the options bitfield for this cursor.
|
|
#
|
|
# @see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY
|
|
def add_option(opt)
|
|
check_modifiable
|
|
|
|
@options |= opt
|
|
@options
|
|
end
|
|
|
|
# Remove an option from the query options bitfield.
|
|
#
|
|
# @param opt a valid query option
|
|
#
|
|
# @raise InvalidOperation if this method is run after the cursor has bee
|
|
# iterated for the first time.
|
|
#
|
|
# @return [Integer] the current value of the options bitfield for this cursor.
|
|
#
|
|
# @see http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY
|
|
def remove_option(opt)
|
|
check_modifiable
|
|
|
|
@options &= ~opt
|
|
@options
|
|
end
|
|
|
|
# Get the query options for this Cursor.
|
|
#
|
|
# @return [Hash]
|
|
def query_options_hash
|
|
{ :selector => @selector,
|
|
:fields => @fields,
|
|
:skip => @skip,
|
|
:limit => @limit,
|
|
:order => @order,
|
|
:hint => @hint,
|
|
:snapshot => @snapshot,
|
|
:timeout => @timeout,
|
|
:max_scan => @max_scan,
|
|
:return_key => @return_key,
|
|
:show_disk_loc => @show_disk_loc }
|
|
end
|
|
|
|
# Clean output for inspect.
|
|
def inspect
|
|
"<Mongo::Cursor:0x#{object_id.to_s(16)} namespace='#{@db.name}.#{@collection.name}' " +
|
|
"@selector=#{@selector.inspect} @cursor_id=#{@cursor_id}>"
|
|
end
|
|
|
|
private
|
|
|
|
# Convert the +:fields+ parameter from a single field name or an array
|
|
# of fields names to a hash, with the field names for keys and '1' for each
|
|
# value.
|
|
def convert_fields_for_query(fields)
|
|
case fields
|
|
when String, Symbol
|
|
{fields => 1}
|
|
when Array
|
|
return nil if fields.length.zero?
|
|
fields.each_with_object({}) { |field, hash| hash[field] = 1 }
|
|
when Hash
|
|
return fields
|
|
end
|
|
end
|
|
|
|
# Return the number of documents remaining for this cursor.
|
|
def num_remaining
|
|
if @cache.length == 0
|
|
if @query_run && (@options & OP_QUERY_EXHAUST != 0)
|
|
close
|
|
return 0
|
|
else
|
|
refresh
|
|
end
|
|
end
|
|
|
|
@cache.length
|
|
end
|
|
|
|
# Refresh the documents in @cache. This means either
|
|
# sending the initial query or sending a GET_MORE operation.
|
|
def refresh
|
|
if !@query_run
|
|
send_initial_query
|
|
elsif !@cursor_id.zero?
|
|
send_get_more
|
|
end
|
|
end
|
|
|
|
def send_initial_query
|
|
message = construct_query_message
|
|
payload = instrument_payload if @logger
|
|
sock = @socket || checkout_socket_from_connection
|
|
instrument(:find, payload) do
|
|
begin
|
|
results, @n_received, @cursor_id = @connection.receive_message(
|
|
Mongo::Constants::OP_QUERY, message, nil, sock, @command,
|
|
nil, @options & OP_QUERY_EXHAUST != 0)
|
|
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
|
|
force_checkin_socket(sock)
|
|
raise ex
|
|
end
|
|
checkin_socket(sock) unless @socket
|
|
@returned += @n_received
|
|
@cache += results
|
|
@query_run = true
|
|
close_cursor_if_query_complete
|
|
end
|
|
end
|
|
|
|
def send_get_more
|
|
message = BSON::ByteBuffer.new([0, 0, 0, 0])
|
|
|
|
# DB name.
|
|
BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@collection.name}")
|
|
|
|
# Number of results to return.
|
|
if @limit > 0
|
|
limit = @limit - @returned
|
|
if @batch_size > 0
|
|
limit = limit < @batch_size ? limit : @batch_size
|
|
end
|
|
message.put_int(limit)
|
|
else
|
|
message.put_int(@batch_size)
|
|
end
|
|
|
|
# Cursor id.
|
|
message.put_long(@cursor_id)
|
|
log(:debug, "cursor.refresh() for cursor #{@cursor_id}") if @logger
|
|
sock = @socket || checkout_socket_for_op_get_more
|
|
|
|
begin
|
|
results, @n_received, @cursor_id = @connection.receive_message(
|
|
Mongo::Constants::OP_GET_MORE, message, nil, sock, @command, nil)
|
|
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
|
|
force_checkin_socket(sock)
|
|
raise ex
|
|
end
|
|
checkin_socket(sock) unless @socket
|
|
@returned += @n_received
|
|
@cache += results
|
|
close_cursor_if_query_complete
|
|
end
|
|
|
|
def checkout_socket_from_connection
|
|
@checkin_connection = true
|
|
if @command || @read_preference == :primary
|
|
@connection.get_local_writer
|
|
else
|
|
@read_pool = @connection.read_pool
|
|
@connection.get_local_reader
|
|
end
|
|
end
|
|
|
|
def checkout_socket_for_op_get_more
|
|
if @read_pool && (@read_pool != @connection.read_pool)
|
|
checkout_socket_from_read_pool
|
|
else
|
|
checkout_socket_from_connection
|
|
end
|
|
end
|
|
|
|
def checkout_socket_from_read_pool
|
|
new_pool = @connection.secondary_pools.detect do |pool|
|
|
pool.host == @read_pool.host && pool.port == @read_pool.port
|
|
end
|
|
if new_pool
|
|
@read_pool = new_pool
|
|
sock = new_pool.checkout
|
|
@checkin_read_pool = true
|
|
return sock
|
|
else
|
|
raise Mongo::OperationFailure, "Failure to continue iterating " +
|
|
"cursor because the the replica set member persisting this " +
|
|
"cursor at #{@read_pool.host_string} cannot be found."
|
|
end
|
|
end
|
|
|
|
def checkin_socket(sock)
|
|
if @checkin_read_pool
|
|
@read_pool.checkin(sock)
|
|
@checkin_read_pool = false
|
|
elsif @checkin_connection
|
|
@connection.local_socket_done(sock)
|
|
@checkin_connection = false
|
|
end
|
|
end
|
|
|
|
def force_checkin_socket(sock)
|
|
if @checkin_read_pool
|
|
@read_pool.checkin(sock)
|
|
@checkin_read_pool = false
|
|
else
|
|
@connection.checkin(sock)
|
|
@checkin_connection = false
|
|
end
|
|
end
|
|
|
|
def construct_query_message
|
|
message = BSON::ByteBuffer.new
|
|
message.put_int(@options)
|
|
BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@collection.name}")
|
|
message.put_int(@skip)
|
|
message.put_int(@limit)
|
|
spec = query_contains_special_fields? ? construct_query_spec : @selector
|
|
message.put_binary(BSON::BSON_CODER.serialize(spec, false).to_s)
|
|
message.put_binary(BSON::BSON_CODER.serialize(@fields, false).to_s) if @fields
|
|
message
|
|
end
|
|
|
|
def instrument_payload
|
|
log = { :database => @db.name, :collection => @collection.name, :selector => selector }
|
|
log[:fields] = @fields if @fields
|
|
log[:skip] = @skip if @skip && (@skip != 0)
|
|
log[:limit] = @limit if @limit && (@limit != 0)
|
|
log[:order] = @order if @order
|
|
log
|
|
end
|
|
|
|
def construct_query_spec
|
|
return @selector if @selector.has_key?('$query')
|
|
spec = BSON::OrderedHash.new
|
|
spec['$query'] = @selector
|
|
spec['$orderby'] = Mongo::Support.format_order_clause(@order) if @order
|
|
spec['$hint'] = @hint if @hint && @hint.length > 0
|
|
spec['$explain'] = true if @explain
|
|
spec['$snapshot'] = true if @snapshot
|
|
spec['$maxscan'] = @max_scan if @max_scan
|
|
spec['$returnKey'] = true if @return_key
|
|
spec['$showDiskLoc'] = true if @show_disk_loc
|
|
spec
|
|
end
|
|
|
|
# Returns true if the query contains order, explain, hint, or snapshot.
|
|
def query_contains_special_fields?
|
|
@order || @explain || @hint || @snapshot
|
|
end
|
|
|
|
def close_cursor_if_query_complete
|
|
if @limit > 0 && @returned >= @limit
|
|
close
|
|
end
|
|
end
|
|
|
|
def check_modifiable
|
|
if @query_run || @closed
|
|
raise InvalidOperation, "Cannot modify the query once it has been run or closed."
|
|
end
|
|
end
|
|
end
|
|
end
|