# Copyright (C) 2008-2009 10gen Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require 'mongo/util/byte_buffer' require 'mongo/util/bson' module Mongo # A cursor over query results. Returned objects are hashes. class Cursor include Mongo::Conversions include Enumerable RESPONSE_HEADER_SIZE = 20 attr_reader :db, :collection, :query # Create a new cursor. # # Should not be called directly by application developers. def initialize(db, collection, query, admin=false) @db, @collection, @query, @admin = db, collection, query, admin @cache = [] @closed = false @query_run = false end # Return the next object or nil if there are no more. Raises an error # if necessary. def next_object refill_via_get_more if num_remaining == 0 o = @cache.shift if o && o['$err'] err = o['$err'] # If the server has stopped being the master (e.g., it's one of a # pair but it has died or something like that) then we close that # connection. If the db has auto connect option and a pair of # servers, next request will re-open on master server. @db.close if err == "not master" raise err end o end # Get the size of the results set for this query. # # Returns the number of objects in the results set for this query. Does # not take limit and skip into account. Raises OperationFailure on a # database error. def count command = OrderedHash["count", @collection.name, "query", @query.selector, "fields", @query.fields()] response = @db.db_command(command) return response['n'].to_i if response['ok'] == 1 return 0 if response['errmsg'] == "ns missing" raise OperationFailure, "Count failed: #{response['errmsg']}" end # Sort this cursor's result # # Takes either a single key and a direction, or an array of [key, # direction] pairs. Directions should be specified as Mongo::ASCENDING # or Mongo::DESCENDING (or :ascending or :descending) (or :asc or :desc). # # Raises InvalidOperation if this cursor has already been used. Raises # InvalidSortValueError if specified order is invalid. # # This method overrides any sort order specified in the Collection#find # method, and only the last sort applied has an effect def sort(key_or_list, direction=nil) check_modifiable if !direction.nil? order = [[key_or_list, direction]] else order = key_or_list end @query.order_by = order self end # Limits the number of results to be returned by this cursor. # # Raises InvalidOperation if this cursor has already been used. # # This method overrides any limit specified in the Collection#find method, # and only the last limit applied has an effect. def limit(number_to_return) check_modifiable raise ArgumentError, "limit requires an integer" unless number_to_return.is_a? Integer @query.number_to_return = number_to_return self end # Skips the first +number_to_skip+ results of this cursor. # # Raises InvalidOperation if this cursor has already been used. # # This method overrides any skip specified in the Collection#find method, # and only the last skip applied has an effect. def skip(number_to_skip) check_modifiable raise ArgumentError, "skip requires an integer" unless number_to_skip.is_a? Integer @query.number_to_skip = number_to_skip self end # Iterate over each document in this cursor, yielding it to the given # block. # # Iterating over an entire cursor will close it. def each num_returned = 0 while more? && (@query.number_to_return <= 0 || num_returned < @query.number_to_return) yield next_object() num_returned += 1 end end # Return all of the documents in this cursor as an array of hashes. # # Raises InvalidOperation if this cursor has already been used (including # any previous calls to this method). # # Use of this method is discouraged - iterating over a cursor is much # more efficient in most cases. def to_a raise InvalidOperation, "can't call Cursor#to_a on a used cursor" if @query_run rows = [] num_returned = 0 while more? && (@query.number_to_return <= 0 || num_returned < @query.number_to_return) rows << next_object() num_returned += 1 end rows end # Returns an explain plan record for this cursor. def explain limit = @query.number_to_return @query.explain = true @query.number_to_return = -limit.abs c = Cursor.new(@db, @collection, @query) explanation = c.next_object c.close @query.explain = false @query.number_to_return = limit explanation end # Close the cursor. # # Note: if a cursor is read until exhausted (read until Mongo::Constants::OP_QUERY or # Mongo::Constants::OP_GETMORE returns zero for the cursor id), there is no need to # close it by calling this method. # # Collection#find takes an optional block argument which can be used to # ensure that your cursors get closed. See the documentation for # Collection#find for details. def close if @cursor_id message = ByteBuffer.new message.put_int(0) message.put_int(1) message.put_long(@cursor_id) @db.send_message_with_operation(Mongo::Constants::OP_KILL_CURSORS, message) end @cursor_id = 0 @closed = true end # Returns true if this cursor is closed, false otherwise. def closed?; @closed; end private def read_all read_message_header read_response_header read_objects_off_wire end def read_objects_off_wire while doc = next_object_on_wire @cache << doc end end def read_message_header message = ByteBuffer.new message.put_array(@db.receive_full(16).unpack("C*")) unless message.size == 16 #HEADER_SIZE raise "Short read for DB response header: expected #{16} bytes, saw #{message.size}" end message.rewind size = message.get_int request_id = message.get_int response_to = message.get_int op = message.get_int end def read_response_header header_buf = ByteBuffer.new header_buf.put_array(@db.receive_full(RESPONSE_HEADER_SIZE).unpack("C*")) raise "Short read for DB response header; expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}" unless header_buf.length == RESPONSE_HEADER_SIZE header_buf.rewind @result_flags = header_buf.get_int @cursor_id = header_buf.get_long @starting_from = header_buf.get_int @n_remaining = header_buf.get_int if @n_received @n_received += @n_remaining else @n_received = @n_remaining end end def num_remaining refill_via_get_more if @cache.length == 0 @cache.length end # Internal method, not for general use. Return +true+ if there are # more records to retrieve. We do not check @query.number_to_return; # #each is responsible for doing that. def more? num_remaining > 0 end def next_object_on_wire # if @n_remaining is 0 but we have a non-zero cursor, there are more # to fetch, so do a GetMore operation, but don't do it here - do it # when someone pulls an object out of the cache and it's empty return nil if @n_remaining == 0 object_from_stream end def refill_via_get_more return if send_query_if_needed || @cursor_id.zero? @db._synchronize { message = ByteBuffer.new # Reserved. message.put_int(0) # DB name. db_name = @admin ? 'admin' : @db.name BSON.serialize_cstr(message, "#{db_name}.#{@collection.name}") # Number of results to return; db decides for now. message.put_int(0) # Cursor id. message.put_long(@cursor_id) @db.send_message_with_operation_without_synchronize(Mongo::Constants::OP_GET_MORE, message) read_all } close_cursor_if_query_complete end def object_from_stream buf = ByteBuffer.new buf.put_array(@db.receive_full(4).unpack("C*")) buf.rewind size = buf.get_int buf.put_array(@db.receive_full(size - 4).unpack("C*"), 4) @n_remaining -= 1 buf.rewind BSON.new.deserialize(buf) end def send_query_if_needed # Run query first time we request an object from the wire if @query_run false else message = construct_query_message(@query) @db._synchronize { @db.send_message_with_operation_without_synchronize(Mongo::Constants::OP_QUERY, message) @query_run = true read_all } close_cursor_if_query_complete true end end def construct_query_message(query) message = ByteBuffer.new message.put_int(query.query_opts) db_name = @admin ? 'admin' : @db.name BSON.serialize_cstr(message, "#{db_name}.#{@collection.name}") message.put_int(query.number_to_skip) message.put_int(query.number_to_return) sel = query.selector if query.contains_special_fields sel = add_special_query_fields(sel, query) end message.put_array(BSON.new.serialize(sel).to_a) message.put_array(BSON.new.serialize(query.fields).to_a) if query.fields message end def add_special_query_fields(sel, query) sel = OrderedHash.new sel['query'] = query.selector order_by = query.order_by sel['orderby'] = get_query_order_by(order_by) if order_by sel['$hint'] = query.hint if query.hint && query.hint.length > 0 sel['$explain'] = true if query.explain sel['$snapshot'] = true if query.snapshot sel end def get_query_order_by(order_by) case order_by when String then string_as_sort_parameters(order_by) when Symbol then symbol_as_sort_parameters(order_by) when Array then array_as_sort_parameters(order_by) when Hash # Should be an ordered hash, but this message doesn't care warn_if_deprecated(order_by) order_by else raise InvalidSortValueError, "Illegal order_by, '#{query.order_by.class.name}'; must be String, Array, Hash, or OrderedHash" end end def to_s "DBResponse(flags=#@result_flags, cursor_id=#@cursor_id, start=#@starting_from)" end def close_cursor_if_query_complete close if @query.number_to_return > 0 && @n_received >= @query.number_to_return end def check_modifiable if @query_run || @closed raise InvalidOperation, "Cannot modify the query once it has been run or closed." end end end end