From 989503edd62b5eccb2e52203886a82c8d530bffe Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Fri, 14 Jan 2011 16:16:20 -0500 Subject: [PATCH] Initial async commit (cremes) --- Rakefile | 5 + lib/mongo.rb | 1 + lib/mongo/async/async_collection.rb | 795 ++++++++++++++++++++++++++++ lib/mongo/async/async_cursor.rb | 275 ++++++++++ lib/mongo/cursor.rb | 9 +- lib/mongo/util/worker_pool.rb | 55 ++ 6 files changed, 1135 insertions(+), 5 deletions(-) create mode 100644 lib/mongo/async/async_collection.rb create mode 100644 lib/mongo/async/async_cursor.rb create mode 100644 lib/mongo/util/worker_pool.rb diff --git a/Rakefile b/Rakefile index 35a4c84..6558b2b 100644 --- a/Rakefile +++ b/Rakefile @@ -73,6 +73,11 @@ namespace :test do end end + Rake::TestTask.new(:async) do |t| + t.test_files = FileList['test/async/*_test.rb'] + t.verbose = true + end + desc "Run the replica set test suite" Rake::TestTask.new(:rs) do |t| t.test_files = FileList['test/replica_sets/*_test.rb'] diff --git a/lib/mongo.rb b/lib/mongo.rb index 0d1dea0..cdeca7a 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -57,6 +57,7 @@ require 'mongo/gridfs/grid' require 'mongo/gridfs/grid_io' if RUBY_PLATFORM =~ /java/ require 'mongo/gridfs/grid_io_fix' + require 'mongo/util/worker_pool' end require 'mongo/gridfs/grid_file_system' diff --git a/lib/mongo/async/async_collection.rb b/lib/mongo/async/async_collection.rb new file mode 100644 index 0000000..be04529 --- /dev/null +++ b/lib/mongo/async/async_collection.rb @@ -0,0 +1,795 @@ +# encoding: UTF-8 + +# -- +# Copyright (C) 2008-2010 10gen Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + + # A named collection of documents in a database. + class AsyncCollection < Collection + + def initialize(name, db, opts={}) + super + @workers = @connection.workers + end + + # Return a sub-collection of this collection by name. If 'users' is a collection, then + # 'users.comments' is a sub-collection of users. + # + # @param [String] name + # the collection to return + # + # @raise [Mongo::InvalidNSName] + # if passed an invalid collection name + # + # @return [Collection] + # the specified sub-collection + def [](name) + name = "#{self.name}.#{name}" + return Collection.new(name, db) if !db.strict? || db.collection_names.include?(name) + raise "Collection #{name} doesn't exist. Currently in strict mode." + end + + # Set a hint field for query optimizer. Hint may be a single field + # name, array of field names, or a hash (preferably an [OrderedHash]). + # If using MongoDB > 1.1, you probably don't ever need to set a hint. + # + # @param [String, Array, OrderedHash] hint a single field, an array of + # fields, or a hash specifying fields + def hint=(hint=nil) + @hint = normalize_hint_fields(hint) + self + end + + # Query the database. + # + # The +selector+ argument is a prototype document that all results must + # match. For example: + # + # collection.find({"hello" => "world"}) + # + # only matches documents that have a key "hello" with value "world". + # Matches can have other keys *in addition* to "hello". + # + # If given an optional block +find+ will yield a Cursor to that block, + # close the cursor, and then return nil. This guarantees that partially + # evaluated cursors will be closed. If given no block +find+ returns a + # cursor. + # + # @param [Hash] selector + # a document specifying elements which must be present for a + # document to be included in the result set. Note that in rare cases, + # (e.g., with $near queries), the order of keys will matter. To preserve + # key order on a selector, use an instance of BSON::OrderedHash (only applies + # to Ruby 1.8). + # + # @option opts [Array, Hash] :fields field names that should be returned in the result + # set ("_id" will be included unless explicity excluded). By limiting results to a certain subset of fields, + # you can cut down on network traffic and decoding time. If using a Hash, keys should be field + # names and values should be either 1 or 0, depending on whether you want to include or exclude + # the given field. + # @option opts [Integer] :skip number of documents to skip from the beginning of the result set + # @option opts [Integer] :limit maximum number of documents to return + # @option opts [Array] :sort an array of [key, direction] pairs to sort by. Direction should + # be specified as Mongo::ASCENDING (or :ascending / :asc) or Mongo::DESCENDING (or :descending / :desc) + # @option opts [String, Array, OrderedHash] :hint hint for query optimizer, usually not necessary if using MongoDB > 1.1 + # @option opts [Boolean] :snapshot (false) if true, snapshot mode will be used for this query. + # Snapshot mode assures no duplicates are returned, or objects missed, which were preset at both the start and + # end of the query's execution. For details see http://www.mongodb.org/display/DOCS/How+to+do+Snapshotting+in+the+Mongo+Database + # @option opts [Boolean] :batch_size (100) the number of documents to returned by the database per GETMORE operation. A value of 0 + # will let the database server decide how many results to returns. This option can be ignored for most use cases. + # @option opts [Boolean] :timeout (true) when +true+, the returned cursor will be subject to + # the normal cursor timeout behavior of the mongod process. When +false+, the returned cursor will never timeout. Note + # that disabling timeout will only work when #find is invoked with a block. This is to prevent any inadvertant failure to + # close the cursor, as the cursor is explicitly closed when block code finishes. + # + # @raise [ArgumentError] + # if timeout is set to false and find is not invoked in a block + # + # @raise [RuntimeError] + # if given unknown options + # + # @core find find-instance_method + def find(selector={}, opts={}) + fields = opts.delete(:fields) + fields = ["_id"] if fields && fields.empty? + skip = opts.delete(:skip) || skip || 0 + limit = opts.delete(:limit) || 0 + sort = opts.delete(:sort) + hint = opts.delete(:hint) + snapshot = opts.delete(:snapshot) + batch_size = opts.delete(:batch_size) + timeout = (opts.delete(:timeout) == false) ? false : true + + if timeout == false && !block_given? + raise ArgumentError, "Collection#find must be invoked with a block when timeout is disabled." + end + + if hint + hint = normalize_hint_fields(hint) + else + hint = @hint # assumed to be normalized already + end + + raise RuntimeError, "Unknown options [#{opts.inspect}]" unless opts.empty? + + cursor = Cursor.new(self, :selector => selector, :fields => fields, :skip => skip, :limit => limit, + :order => sort, :hint => hint, :snapshot => snapshot, :timeout => timeout, :batch_size => batch_size) + + if block_given? + yield cursor + cursor.close + nil + else + cursor + end + end + + # Return a single object from the database. + # + # @return [OrderedHash, Nil] + # a single document or nil if no result is found. + # + # @param [Hash, ObjectId, Nil] spec_or_object_id a hash specifying elements + # which must be present for a document to be included in the result set or an + # instance of ObjectId to be used as the value for an _id query. + # If nil, an empty selector, {}, will be used. + # + # @option opts [Hash] + # any valid options that can be send to Collection#find + # + # @raise [TypeError] + # if the argument is of an improper type. + def find_one(spec_or_object_id=nil, opts={}) + spec = case spec_or_object_id + when nil + {} + when BSON::ObjectId + {:_id => spec_or_object_id} + when Hash + spec_or_object_id + else + raise TypeError, "spec_or_object_id must be an instance of ObjectId or Hash, or nil" + end + find(spec, opts.merge(:limit => -1)).next_document + end + + # Save a document to this collection. + # + # @param [Hash] doc + # the document to be saved. If the document already has an '_id' key, + # then an update (upsert) operation will be performed, and any existing + # document with that _id is overwritten. Otherwise an insert operation is performed. + # + # @return [ObjectId] the _id of the saved document. + # + # @option opts [Boolean, Hash] :safe (+false+) + # run the operation in safe mode, which run a getlasterror command on the + # database to report any assertion. In addition, a hash can be provided to + # run an fsync and/or wait for replication of the save (>= 1.5.1). See the options + # for DB#error. + # + # @raise [OperationFailure] when :safe mode fails. + # + # @see DB#remove for options that can be passed to :safe. + def save(doc, opts={}) + if doc.has_key?(:_id) || doc.has_key?('_id') + id = doc[:_id] || doc['_id'] + update({:_id => id}, doc, :upsert => true, :safe => opts.fetch(:safe, @safe)) + id + else + insert(doc, :safe => opts.fetch(:safe, @safe)) + end + end + + # Insert one or more documents into the collection. + # + # @param [Hash, Array] doc_or_docs + # a document (as a hash) or array of documents to be inserted. + # + # @return [ObjectId, Array] + # The _id of the inserted document or a list of _ids of all inserted documents. + # + # @option opts [Boolean, Hash] :safe (+false+) + # run the operation in safe mode, which run a getlasterror command on the + # database to report any assertion. In addition, a hash can be provided to + # run an fsync and/or wait for replication of the insert (>= 1.5.1). Safe + # options provided here will override any safe options set on this collection, + # its database object, or the current connection. See the options on + # for DB#get_last_error. + # + # @see DB#remove for options that can be passed to :safe. + # + # @core insert insert-instance_method + def insert(doc_or_docs, opts={}) + doc_or_docs = [doc_or_docs] unless doc_or_docs.is_a?(Array) + doc_or_docs.collect! { |doc| @pk_factory.create_pk(doc) } + safe = opts.fetch(:safe, @safe) + result = insert_documents(doc_or_docs, @name, true, safe) + result.size > 1 ? result : result.first + end + alias_method :<<, :insert + + # Remove all documents from this collection. + # + # @param [Hash] selector + # If specified, only matching documents will be removed. + # + # @option opts [Boolean, Hash] :safe (+false+) + # run the operation in safe mode, which will run a getlasterror command on the + # database to report any assertion. In addition, a hash can be provided to + # run an fsync and/or wait for replication of the remove (>= 1.5.1). Safe + # options provided here will override any safe options set on this collection, + # its database, or the current connection. See the options for DB#get_last_error for more details. + # + # @example remove all documents from the 'users' collection: + # users.remove + # users.remove({}) + # + # @example remove only documents that have expired: + # users.remove({:expire => {"$lte" => Time.now}}) + # + # @return [Hash, true] Returns a Hash containing the last error object if running in safe mode. + # Otherwise, returns true. + # + # @raise [Mongo::OperationFailure] an exception will be raised iff safe mode is enabled + # and the operation fails. + # + # @see DB#remove for options that can be passed to :safe. + # + # @core remove remove-instance_method + def remove(selector={}, opts={}) + # Initial byte is 0. + safe = opts.fetch(:safe, @safe) + message = BSON::ByteBuffer.new("\0\0\0\0") + BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@name}") + message.put_int(0) + message.put_binary(BSON::BSON_CODER.serialize(selector, false, true).to_s) + + @logger.debug("MONGODB #{@db.name}['#{@name}'].remove(#{selector.inspect})") if @logger + if safe + @connection.send_message_with_safe_check(Mongo::Constants::OP_DELETE, message, @db.name, nil, safe) + else + @connection.send_message(Mongo::Constants::OP_DELETE, message) + true + end + end + + # Update one or more documents in this collection. + # + # @param [Hash] selector + # a hash specifying elements which must be present for a document to be updated. Note: + # the update command currently updates only the first document matching the + # given selector. If you want all matching documents to be updated, be sure + # to specify :multi => true. + # @param [Hash] document + # a hash specifying the fields to be changed in the selected document, + # or (in the case of an upsert) the document to be inserted + # + # @option opts [Boolean] :upsert (+false+) if true, performs an upsert (update or insert) + # @option opts [Boolean] :multi (+false+) update all documents matching the selector, as opposed to + # just the first matching document. Note: only works in MongoDB 1.1.3 or later. + # @option opts [Boolean] :safe (+false+) + # If true, check that the save succeeded. OperationFailure + # will be raised on an error. Note that a safe check requires an extra + # round-trip to the database. Safe options provided here will override any safe + # options set on this collection, its database object, or the current collection. + # See the options for DB#get_last_error for details. + # + # @return [Hash, true] Returns a Hash containing the last error object if running in safe mode. + # Otherwise, returns true. + # + # @core update update-instance_method + def update(selector, document, opts={}) + # Initial byte is 0. + safe = opts.fetch(:safe, @safe) + message = BSON::ByteBuffer.new("\0\0\0\0") + BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@name}") + update_options = 0 + update_options += 1 if opts[:upsert] + update_options += 2 if opts[:multi] + message.put_int(update_options) + message.put_binary(BSON::BSON_CODER.serialize(selector, false, true).to_s) + message.put_binary(BSON::BSON_CODER.serialize(document, false, true).to_s) + @logger.debug("MONGODB #{@db.name}['#{@name}'].update(#{selector.inspect}, #{document.inspect})") if @logger + if safe + @connection.send_message_with_safe_check(Mongo::Constants::OP_UPDATE, message, @db.name, nil, safe) + else + @connection.send_message(Mongo::Constants::OP_UPDATE, message, nil) + end + end + + # Create a new index. + # + # @param [String, Array] spec + # should be either a single field name or an array of + # [field name, direction] pairs. Directions should be specified + # as Mongo::ASCENDING, Mongo::DESCENDING, or Mongo::GEO2D. + # + # Note that geospatial indexing only works with versions of MongoDB >= 1.3.3+. Keep in mind, too, + # that in order to geo-index a given field, that field must reference either an array or a sub-object + # where the first two values represent x- and y-coordinates. Examples can be seen below. + # + # Also note that it is permissible to create compound indexes that include a geospatial index as + # long as the geospatial index comes first. + # + # If your code calls create_index frequently, you can use Collection#ensure_index to cache these calls + # and thereby prevent excessive round trips to the database. + # + # @option opts [Boolean] :unique (false) if true, this index will enforce a uniqueness constraint. + # @option opts [Boolean] :background (false) indicate that the index should be built in the background. This + # feature is only available in MongoDB >= 1.3.2. + # @option opts [Boolean] :drop_dups (nil) If creating a unique index on a collection with pre-existing records, + # this option will keep the first document the database indexes and drop all subsequent with duplicate values. + # @option opts [Integer] :min (nil) specify the minimum longitude and latitude for a geo index. + # @option opts [Integer] :max (nil) specify the maximum longitude and latitude for a geo index. + # + # @example Creating a compound index: + # @posts.create_index([['subject', Mongo::ASCENDING], ['created_at', Mongo::DESCENDING]]) + # + # @example Creating a geospatial index: + # @restaurants.create_index([['location', Mongo::GEO2D]]) + # + # # Note that this will work only if 'location' represents x,y coordinates: + # {'location': [0, 50]} + # {'location': {'x' => 0, 'y' => 50}} + # {'location': {'latitude' => 0, 'longitude' => 50}} + # + # @example A geospatial index with alternate longitude and latitude: + # @restaurants.create_index([['location', Mongo::GEO2D]], :min => 500, :max => 500) + # + # @return [String] the name of the index created. + # + # @core indexes create_index-instance_method + def create_index(spec, opts={}) + opts[:dropDups] = opts.delete(:drop_dups) if opts[:drop_dups] + field_spec = parse_index_spec(spec) + name = opts.delete(:name) || generate_index_name(field_spec) + name = name.to_s if name + + generate_indexes(field_spec, name, opts) + name + end + + # Calls create_index and sets a flag to not do so again for another X minutes. + # this time can be specified as an option when initializing a Mongo::DB object as options[:cache_time] + # Any changes to an index will be propogated through regardless of cache time (e.g., a change of index direction) + # + # The parameters and options for this methods are the same as those for Collection#create_index. + # + # @example Call sequence: + # Time t: @posts.ensure_index([['subject', Mongo::ASCENDING]) -- calls create_index and + # sets the 5 minute cache + # Time t+2min : @posts.ensure_index([['subject', Mongo::ASCENDING]) -- doesn't do anything + # Time t+3min : @posts.ensure_index([['something_else', Mongo::ASCENDING]) -- calls create_index + # and sets 5 minute cache + # Time t+10min : @posts.ensure_index([['subject', Mongo::ASCENDING]) -- calls create_index and + # resets the 5 minute counter + # + # @return [String] the name of the index. + def ensure_index(spec, opts={}) + now = Time.now.utc.to_i + field_spec = parse_index_spec(spec) + + name = opts.delete(:name) || generate_index_name(field_spec) + name = name.to_s if name + + if !@cache[name] || @cache[name] <= now + generate_indexes(field_spec, name, opts) + end + + # Reset the cache here in case there are any errors inserting. Best to be safe. + @cache[name] = now + @cache_time + name + end + + # Drop a specified index. + # + # @param [String] name + # + # @core indexes + def drop_index(name) + @cache[name.to_s] = nil + @db.drop_index(@name, name) + end + + # Drop all indexes. + # + # @core indexes + def drop_indexes + @cache = {} + + # Note: calling drop_indexes with no args will drop them all. + @db.drop_index(@name, '*') + end + + # Drop the entire collection. USE WITH CAUTION. + def drop + @db.drop_collection(@name) + end + + # Atomically update and return a document using MongoDB's findAndModify command. (MongoDB > 1.3.0) + # + # @option opts [Hash] :query ({}) a query selector document for matching the desired document. + # @option opts [Hash] :update (nil) the update operation to perform on the matched document. + # @option opts [Array, String, OrderedHash] :sort ({}) specify a sort option for the query using any + # of the sort options available for Cursor#sort. Sort order is important if the query will be matching + # multiple documents since only the first matching document will be updated and returned. + # @option opts [Boolean] :remove (false) If true, removes the the returned document from the collection. + # @option opts [Boolean] :new (false) If true, returns the updated document; otherwise, returns the document + # prior to update. + # + # @return [Hash] the matched document. + # + # @core findandmodify find_and_modify-instance_method + def find_and_modify(opts={}) + cmd = BSON::OrderedHash.new + cmd[:findandmodify] = @name + cmd.merge!(opts) + cmd[:sort] = Mongo::Support.format_order_clause(opts[:sort]) if opts[:sort] + + @db.command(cmd)['value'] + end + + # Perform a map/reduce operation on the current collection. + # + # @param [String, BSON::Code] map a map function, written in JavaScript. + # @param [String, BSON::Code] reduce a reduce function, written in JavaScript. + # + # @option opts [Hash] :query ({}) a query selector document, like what's passed to #find, to limit + # the operation to a subset of the collection. + # @option opts [Array] :sort ([]) an array of [key, direction] pairs to sort by. Direction should + # be specified as Mongo::ASCENDING (or :ascending / :asc) or Mongo::DESCENDING (or :descending / :desc) + # @option opts [Integer] :limit (nil) if passing a query, number of objects to return from the collection. + # @option opts [String, BSON::Code] :finalize (nil) a javascript function to apply to the result set after the + # map/reduce operation has finished. + # @option opts [String] :out (nil) the name of the output collection. If specified, the collection will not be treated as temporary. + # @option opts [Boolean] :keeptemp (false) if true, the generated collection will be persisted. default is false. + # @option opts [Boolean ] :verbose (false) if true, provides statistics on job execution time. + # @option opts [Boolean] :raw (false) if true, return the raw result object from the map_reduce command, and not + # the instantiated collection that's returned by default. + # + # @return [Collection] a collection containing the results of the operation. + # + # @see http://www.mongodb.org/display/DOCS/MapReduce Offical MongoDB map/reduce documentation. + # + # @core mapreduce map_reduce-instance_method + def map_reduce(map, reduce, opts={}) + map = BSON::Code.new(map) unless map.is_a?(BSON::Code) + reduce = BSON::Code.new(reduce) unless reduce.is_a?(BSON::Code) + raw = opts.delete(:raw) + + hash = BSON::OrderedHash.new + hash['mapreduce'] = self.name + hash['map'] = map + hash['reduce'] = reduce + hash.merge! opts + + result = @db.command(hash) + unless Mongo::Support.ok?(result) + raise Mongo::OperationFailure, "map-reduce failed: #{result['errmsg']}" + end + + if raw + result + else + @db[result["result"]] + end + end + alias :mapreduce :map_reduce + + # Perform a group aggregation. + # + # @param [Hash] opts the options for this group operation. The minimum required are :initial + # and :reduce. + # + # @option opts [Array, String, Symbol] :key (nil) Either the name of a field or a list of fields to group by (optional). + # @option opts [String, BSON::Code] :keyf (nil) A JavaScript function to be used to generate the grouping keys (optional). + # @option opts [String, BSON::Code] :cond ({}) A document specifying a query for filtering the documents over + # which the aggregation is run (optional). + # @option opts [Hash] :initial the initial value of the aggregation counter object (required). + # @option opts [String, BSON::Code] :reduce (nil) a JavaScript aggregation function (required). + # @option opts [String, BSON::Code] :finalize (nil) a JavaScript function that receives and modifies + # each of the resultant grouped objects. Available only when group is run with command + # set to true. + # + # @return [Array] the command response consisting of grouped items. + def group(key, condition={}, initial={}, reduce=nil, finalize=nil) + if key.is_a?(Hash) + return new_group(key) + else + warn "Collection#group no longer take a list of paramters. This usage is deprecated." + + "Check out the new API at http://api.mongodb.org/ruby/current/Mongo/Collection.html#group-instance_method" + end + + reduce = BSON::Code.new(reduce) unless reduce.is_a?(BSON::Code) + + group_command = { + "group" => { + "ns" => @name, + "$reduce" => reduce, + "cond" => condition, + "initial" => initial + } + } + + if key.is_a?(Symbol) + raise MongoArgumentError, "Group takes either an array of fields to group by or a JavaScript function" + + "in the form of a String or BSON::Code." + end + + unless key.nil? + if key.is_a? Array + key_type = "key" + key_value = {} + key.each { |k| key_value[k] = 1 } + else + key_type = "$keyf" + key_value = key.is_a?(BSON::Code) ? key : BSON::Code.new(key) + end + + group_command["group"][key_type] = key_value + end + + finalize = BSON::Code.new(finalize) if finalize.is_a?(String) + if finalize.is_a?(BSON::Code) + group_command['group']['finalize'] = finalize + end + + result = @db.command(group_command) + + if Mongo::Support.ok?(result) + result["retval"] + else + raise OperationFailure, "group command failed: #{result['errmsg']}" + end + end + + private + + def new_group(opts={}) + reduce = opts[:reduce] + finalize = opts[:finalize] + cond = opts.fetch(:cond, {}) + initial = opts[:initial] + + if !(reduce && initial) + raise MongoArgumentError, "Group requires at minimum values for initial and reduce." + end + + cmd = { + "group" => { + "ns" => @name, + "$reduce" => reduce.to_bson_code, + "cond" => cond, + "initial" => initial + } + } + + if finalize + cmd['group']['finalize'] = finalize.to_bson_code + end + + if key = opts[:key] + if key.is_a?(String) || key.is_a?(Symbol) + key = [key] + end + key_value = {} + key.each { |k| key_value[k] = 1 } + cmd["group"]["key"] = key_value + elsif keyf = opts[:keyf] + cmd["group"]["$keyf"] = keyf.to_bson_code + end + + result = @db.command(cmd) + result["retval"] + end + + public + + # Return a list of distinct values for +key+ across all + # documents in the collection. The key may use dot notation + # to reach into an embedded object. + # + # @param [String, Symbol, OrderedHash] key or hash to group by. + # @param [Hash] query a selector for limiting the result set over which to group. + # + # @example Saving zip codes and ages and returning distinct results. + # @collection.save({:zip => 10010, :name => {:age => 27}}) + # @collection.save({:zip => 94108, :name => {:age => 24}}) + # @collection.save({:zip => 10010, :name => {:age => 27}}) + # @collection.save({:zip => 99701, :name => {:age => 24}}) + # @collection.save({:zip => 94108, :name => {:age => 27}}) + # + # @collection.distinct(:zip) + # [10010, 94108, 99701] + # @collection.distinct("name.age") + # [27, 24] + # + # # You may also pass a document selector as the second parameter + # # to limit the documents over which distinct is run: + # @collection.distinct("name.age", {"name.age" => {"$gt" => 24}}) + # [27] + # + # @return [Array] an array of distinct values. + def distinct(key, query=nil) + raise MongoArgumentError unless [String, Symbol].include?(key.class) + command = BSON::OrderedHash.new + command[:distinct] = @name + command[:key] = key.to_s + command[:query] = query + + @db.command(command)["values"] + end + + # Rename this collection. + # + # Note: If operating in auth mode, the client must be authorized as an admin to + # perform this operation. + # + # @param [String] new_name the new name for this collection + # + # @return [String] the name of the new collection. + # + # @raise [Mongo::InvalidNSName] if +new_name+ is an invalid collection name. + def rename(new_name) + case new_name + when Symbol, String + else + raise TypeError, "new_name must be a string or symbol" + end + + new_name = new_name.to_s + + if new_name.empty? or new_name.include? ".." + raise Mongo::InvalidNSName, "collection names cannot be empty" + end + if new_name.include? "$" + raise Mongo::InvalidNSName, "collection names must not contain '$'" + end + if new_name.match(/^\./) or new_name.match(/\.$/) + raise Mongo::InvalidNSName, "collection names must not start or end with '.'" + end + + @db.rename_collection(@name, new_name) + @name = new_name + end + + # Get information on the indexes for this collection. + # + # @return [Hash] a hash where the keys are index names. + # + # @core indexes + def index_information + @db.index_information(@name) + end + + # Return a hash containing options that apply to this collection. + # For all possible keys and values, see DB#create_collection. + # + # @return [Hash] options that apply to this collection. + def options + @db.collections_info(@name).next_document['options'] + end + + # Return stats on the collection. Uses MongoDB's collstats command. + # + # @return [Hash] + def stats + @db.command({:collstats => @name}) + end + + # Get the number of documents in this collection. + # + # @return [Integer] + def count + find().count() + end + + alias :size :count + + protected + + def normalize_hint_fields(hint) + case hint + when String + {hint => 1} + when Hash + hint + when nil + nil + else + h = BSON::OrderedHash.new + hint.to_a.each { |k| h[k] = 1 } + h + end + end + + private + + def parse_index_spec(spec) + field_spec = BSON::OrderedHash.new + if spec.is_a?(String) || spec.is_a?(Symbol) + field_spec[spec.to_s] = 1 + elsif spec.is_a?(Array) && spec.all? {|field| field.is_a?(Array) } + spec.each do |f| + if [Mongo::ASCENDING, Mongo::DESCENDING, Mongo::GEO2D].include?(f[1]) + field_spec[f[0].to_s] = f[1] + else + raise MongoArgumentError, "Invalid index field #{f[1].inspect}; " + + "should be one of Mongo::ASCENDING (1), Mongo::DESCENDING (-1) or Mongo::GEO2D ('2d')." + end + end + else + raise MongoArgumentError, "Invalid index specification #{spec.inspect}; " + + "should be either a string, symbol, or an array of arrays." + end + field_spec + end + + def generate_indexes(field_spec, name, opts) + selector = { + :name => name, + :ns => "#{@db.name}.#{@name}", + :key => field_spec + } + selector.merge!(opts) + + begin + insert_documents([selector], Mongo::DB::SYSTEM_INDEX_COLLECTION, false, true) + + rescue Mongo::OperationFailure => e + if selector[:dropDups] && e.message =~ /^11000/ + # NOP. If the user is intentionally dropping dups, we can ignore duplicate key errors. + else + raise Mongo::OperationFailure, "Failed to create index #{selector.inspect} with the following error: " + + "#{e.message}" + end + end + + nil + end + + # Sends a Mongo::Constants::OP_INSERT message to the database. + # Takes an array of +documents+, an optional +collection_name+, and a + # +check_keys+ setting. + def insert_documents(documents, collection_name=@name, check_keys=true, safe=false) + # Initial byte is 0. + message = BSON::ByteBuffer.new("\0\0\0\0") + BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{collection_name}") + documents.each do |doc| + message.put_binary(BSON::BSON_CODER.serialize(doc, check_keys, true).to_s) + end + raise InvalidOperation, "Exceded maximum insert size of 16,000,000 bytes" if message.size > 16_000_000 + + @logger.debug("MONGODB #{@db.name}['#{collection_name}'].insert(#{documents.inspect})") if @logger + if safe + @connection.send_message_with_safe_check(Mongo::Constants::OP_INSERT, message, @db.name, nil, safe) + else + @connection.send_message(Mongo::Constants::OP_INSERT, message, nil) + end + documents.collect { |o| o[:_id] || o['_id'] } + end + + def generate_index_name(spec) + indexes = [] + spec.each_pair do |field, direction| + indexes.push("#{field}_#{direction}") + end + indexes.join("_") + end + end + +end diff --git a/lib/mongo/async/async_cursor.rb b/lib/mongo/async/async_cursor.rb new file mode 100644 index 0000000..01a7a04 --- /dev/null +++ b/lib/mongo/async/async_cursor.rb @@ -0,0 +1,275 @@ +# encoding: UTF-8 + +# Copyright (C) 2008-2010 10gen Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Mongo + + # A cursor with async enhancements. + class AsyncCursor < Cursor + include Mongo::Conversions + include Enumerable + + def initialize(collection, opts={}) + super + + @worker_pool = @connection.worker_pool + end + + # Get the next document specified the cursor options. + # + # @return [Hash, Nil] the next document or Nil if no documents remain. + def next_document(opts={}, &blk) + if opts[:async] + async_exec(method(:next_document), nil, opts[:callback] || &blk) + else + super() + end + end + + # Determine whether this cursor has any remaining results. + # + # @return [Boolean] + def has_next?(opts={}, &blk) + if opts[:async] + async_exec(method(:has_next?), nil, opts[:callback] || &blk) + else + super() + end + end + + # Get the size of the result set for this query. + # + # @param [Boolean] whether of not to take notice of skip and limit + # + # @return [Integer] the number of objects in the result set for this query. + # + # @raise [OperationFailure] on a database error. + def count(skip_and_limit = false, opts={}, &blk) + if opts[:async] + async_exec(method(:count), [skip_and_limit], opts[:callback] || &blk) + else + super + end + end + + # Iterate over each document in this cursor, yielding it to the given + # block. + # + # Iterating over an entire cursor will close it. + # + # @yield passes each document to a block for processing. + # + # @example if 'comments' represents a collection of comments: + # comments.find.each do |doc| + # puts doc['user'] + # end + def each(opts={}, &blk) + if opts[:async] + opts[:callback] ||= blk + async_each(opts) + else + super() + end + end + + # Receive all the documents from this cursor as an array of hashes. + # + # Notes: + # + # If you've already started iterating over the cursor, the array returned + # by this method contains only the remaining documents. See Cursor#rewind! if you + # need to reset the cursor. + # + # Use of this method is discouraged - in most cases, it's much more + # efficient to retrieve documents as you need them by iterating over the cursor. + # + # @return [Array] an array of documents. + def to_a(opts={}, &blk) + if opts[:async] + opts[:callback] ||= blk + async_each(opts) + else + super + end + end + + # Get the explain plan for this cursor. + # + # @return [Hash] a document containing the explain plan for this cursor. + # + # @core explain explain-instance_method + def explain(opts={}, &blk) + if opts[:async] + async_exec(method(:explain), nil, opts[:callback] || &blk) + else + super + end + end + + # Close the cursor. + # + # Note: if a cursor is read until exhausted (read until Mongo::Constants::OP_QUERY or + # Mongo::Constants::OP_GETMORE returns zero for the cursor id), there is no need to + # close it manually. + # + # Note also: Collection#find takes an optional block argument which can be used to + # ensure that your cursors get closed. + # + # @return [True] + def close(opts={}, &blk) + if opts[:async] + async_exec(method(:async), nil, opts[:callback] || blk) + else + if @cursor_id && @cursor_id != 0 + message = BSON::ByteBuffer.new([0, 0, 0, 0]) + message.put_int(1) + message.put_long(@cursor_id) + @logger.debug("MONGODB cursor.close #{@cursor_id}") if @logger + @connection.send_message(Mongo::Constants::OP_KILL_CURSORS, message, nil) + end + @cursor_id = 0 + @closed = true + end + end + + private + + # Returns the number of documents available in the current batch without + # causing a blocking call. + # + # When it returns 0, the next call to #has_next? or #next_document will + # block as it goes to the server to fetch more documents. + def num_in_batch + @cache.length + end + + # Return the number of documents remaining for this cursor. + def num_remaining + refresh if @cache.length == 0 + @cache.length + end + + def refresh + return if send_initial_query || @cursor_id.zero? + message = BSON::ByteBuffer.new([0, 0, 0, 0]) + + # DB name. + BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@collection.name}") + + # Number of results to return. + if @limit > 0 + limit = @limit - @returned + if @batch_size > 0 + limit = limit < @batch_size ? limit : @batch_size + end + message.put_int(limit) + else + message.put_int(@batch_size) + end + + # Cursor id. + message.put_long(@cursor_id) + @logger.debug("MONGODB cursor.refresh() for cursor #{@cursor_id}") if @logger + results, @n_received, @cursor_id = @connection.receive_message( + Mongo::Constants::OP_GET_MORE, message, nil, @socket, @command) + @returned += @n_received + @cache += results + close_cursor_if_query_complete + end + + ## Async helper methods + + def async_each(opts) + callback = opts[:callback] + num_returned = opts[:num_returned] || 0 + in_batch = num_in_batch + count = 0 + + while count < in_batch && (@limit <= 0 || num_returned < @limit) + exception, doc = nil, nil + begin + doc = next_document + rescue => e + exception = e + end + + callback.call exception, doc + num_returned += 1 + count += 1 + end + + # block executed by the call to has_next?; when true, call #async_each again + # otherwise the cursor is exhausted + return_to_each = Proc.new do |error, more| + unless error + method(:async_each).call({:callback => callback, :num_returned => num_returned}) if more + else + # pass the exception through to the block for handling + callback.call error, nil + end + end + + async_exec method(:has_next?), nil, return_to_each + end + + def async_to_a(opts) + rows = opts[:rows] || [] + callback = opts[:callback] + num_returned = opts[:num_returned] || 0 + in_batch = num_in_batch + count = 0 + + # loop through all the docs in this batch since we know +in_batch+ documents are + # available without blocking + while count < in_batch && (@limit <= 0 || num_returned < @limit) + exception = nil + begin + rows << next_document + rescue => e + exception = e + end + + callback.call exception, nil if exception + num_returned += 1 + count += 1 + end + + # block executed by the call to has_next?; when true, call #each again + # otherwise the cursor is exhausted + return_to_a = Proc.new do |error, more| + unless error + if more + method(:async_to_a).call({:callback => callback, :rows => rows, :num_returned => num_returned}) + else + callback.call nil, rows + end + else + # pass the exception through to the block for handling + callback.call error, nil + end + end + + async_exec method(:has_next?), nil, return_to_a + end + + def async_exec(command, opts, callback) + raise ArgumentError, "Must pass a :callback proc or a block when executing in async mode!" unless callback + + opts.delete :async if opts + + @workers.enqueue command, [opts], callback + end + end +end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 4b3c5d3..84831b0 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -223,11 +223,8 @@ module Mongo # puts doc['user'] # end def each - #num_returned = 0 - #while has_next? && (@limit <= 0 || num_returned < @limit) while doc = next_document - yield doc #next_document - #num_returned += 1 + yield doc end end @@ -285,7 +282,9 @@ module Mongo # Is this cursor closed? # # @return [Boolean] - def closed?; @closed; end + def closed? + @closed || (@cursor_id && @cursor_id.zero?) + end # Returns an integer indicating which query options have been selected. # diff --git a/lib/mongo/util/worker_pool.rb b/lib/mongo/util/worker_pool.rb new file mode 100644 index 0000000..bbd91c3 --- /dev/null +++ b/lib/mongo/util/worker_pool.rb @@ -0,0 +1,55 @@ +module Mongo + module Async + + class WorkerPool + attr_reader :workers + + def initialize size + @jobs = Queue.new + @workers = Queue.new + + @size = size + spawn_pool @size + end + + def enqueue command, args, callback + # call compact to remove any nil arguments + @jobs << [command, args.compact, callback] + end + + private + + def spawn_pool size + size.times do + thread = Thread.new do + while true do + do_work + end + end + + thread.abort_on_exception = true + @workers << thread + end + end + + def do_work + # blocks on #pop until a job is available + command, cmd_args, callback = @jobs.pop + exception, result = nil, nil + + # Call the original command with its arguments; capture + # and save any result and/or exception + begin + result = command.call *cmd_args + rescue => e + exception = e + end + + # Execute the callback and pass in the exception and result; + # in successful cases, the exception should be nil + callback.call exception, result + end + end # Workers + + end # Async +end # Mongo