API CHANGE: moving XGen::Mongo::Driver and XGen::Mongo to Mongo and XGen::Mongo::GridFS to GridFS

This commit is contained in:
Mike Dirolf 2009-08-20 10:50:48 -04:00
parent 040ba7c9c2
commit e65dd99667
63 changed files with 2291 additions and 2423 deletions

View File

@ -7,9 +7,9 @@ many more.
require 'mongo'
include XGen::Mongo::Driver
include Mongo
db = Mongo.new('localhost').db('sample-db')
db = Mongo::Mongo.new('localhost').db('sample-db')
coll = db.collection('test')
coll.clear
@ -79,8 +79,8 @@ Here is some simple example code:
require 'rubygems' # not required for Ruby 1.9
require 'mongo'
include XGen::Mongo::Driver
db = Mongo.new.db('my-db-name')
include Mongo
db = Mongo::Mongo.new.db('my-db-name')
things = db.collection('things')
things.clear
@ -149,8 +149,8 @@ using a PK factory lets you do so.
You can tell the Ruby Mongo driver how to create primary keys by passing in
the :pk option to the Mongo#db method.
include XGen::Mongo::Driver
db = Mongo.new.db('dbname', :pk => MyPKFactory.new)
include Mongo
db = Mongo::Mongo.new.db('dbname', :pk => MyPKFactory.new)
A primary key factory object must respond to :create_pk, which should take a
hash and return a hash which merges the original hash with any primary key
@ -164,7 +164,7 @@ Here is a sample primary key factory, taken from the tests:
class TestPKFactory
def create_pk(row)
row['_id'] ||= XGen::Mongo::Driver::ObjectID.new
row['_id'] ||= Mongo::ObjectID.new
row
end
end
@ -178,7 +178,7 @@ ActiveRecord-like framework for non-Rails apps) and the AR Mongo adapter code
def create_pk(row)
return row if row[:_id]
row.delete(:_id) # in case it exists but the value is nil
row['_id'] ||= XGen::Mongo::Driver::ObjectID.new
row['_id'] ||= Mongo::ObjectID.new
row
end
end
@ -205,7 +205,7 @@ completely harmless; strict mode is a programmer convenience only.
To turn on strict mode, either pass in :strict => true when obtaining a DB
object or call the :strict= method:
db = XGen::Mongo::Driver::Mongo.new.db('dbname', :strict => true)
db = Mongo::Mongo.new.db('dbname', :strict => true)
# I'm feeling lax
db.strict = false
# No, I'm not!

View File

@ -3,7 +3,7 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
include XGen::Mongo::Driver
include Mongo
TRIALS = 100000

View File

@ -7,14 +7,14 @@ require 'irb'
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
include XGen::Mongo::Driver
include Mongo
host = org_argv[0] || ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = org_argv[1] || ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = org_argv[1] || ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
dbnm = org_argv[2] || ENV['MONGO_RUBY_DRIVER_DB'] || 'ruby-mongo-console'
puts "Connecting to #{host}:#{port} (CONN) on with database #{dbnm} (DB)"
CONN = Mongo.new(host, port)
CONN = Mongo::Mongo.new(host, port)
DB = CONN.db(dbnm)
puts "Starting IRB session..."

View File

@ -5,7 +5,7 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
include XGen::Mongo::Driver
include Mongo
TRIALS = 2
PER_TRIAL = 5000
@ -50,9 +50,9 @@ def benchmark(str, proc, n, db, coll_name, object, setup=nil)
end
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
connection = Mongo.new(host, port)
connection = Mongo::Mongo.new(host, port)
connection.drop_database("benchmark")
db = connection.db('benchmark')

View File

@ -2,13 +2,13 @@ $LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
require 'pp'
include XGen::Mongo::Driver
include Mongo
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
puts "Connecting to #{host}:#{port}"
db = Mongo.new(host, port).db('ruby-mongo-examples')
db = Mongo::Mongo.new(host, port).db('ruby-mongo-examples')
coll = db.create_collection('test')
# Erase all records from collection, if any

View File

@ -4,10 +4,10 @@ $LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
puts "Connecting to #{host}:#{port}"
db = XGen::Mongo::Driver::Mongo.new(host, port).db('ruby-mongo-examples')
db = Mongo::Mongo.new(host, port).db('ruby-mongo-examples')
coll = db.collection('test')
coll.clear

View File

@ -9,13 +9,13 @@ end
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
include XGen::Mongo::Driver
include Mongo
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
puts ">> Connecting to #{host}:#{port}"
DB = Mongo.new(host, port).db('ruby-mongo-blog')
DB = Mongo::Mongo.new(host, port).db('ruby-mongo-blog')
LINE_SIZE = 120
puts "=" * LINE_SIZE

View File

@ -1,13 +1,13 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
include XGen::Mongo::Driver
include Mongo
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
puts "Connecting to #{host}:#{port}"
db = Mongo.new(host, port).db('ruby-mongo-examples')
db = Mongo::Mongo.new(host, port).db('ruby-mongo-examples')
db.drop_collection('test')
# A capped collection has a max size and optionally a max number of records.

View File

@ -2,13 +2,13 @@ $LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
require 'pp'
include XGen::Mongo::Driver
include Mongo
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
puts "Connecting to #{host}:#{port}"
db = Mongo.new(host, port).db('ruby-mongo-examples')
db = Mongo::Mongo.new(host, port).db('ruby-mongo-examples')
coll = db.collection('test')
# Erase all records from collection, if any

View File

@ -2,14 +2,14 @@ $LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
require 'mongo/gridfs'
include XGen::Mongo::Driver
include XGen::Mongo::GridFS
include Mongo
include GridFS
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
puts "Connecting to #{host}:#{port}"
db = Mongo.new(host, port).db('ruby-mongo-examples')
db = Mongo::Mongo.new(host, port).db('ruby-mongo-examples')
def dump(db, fname)
GridStore.open(db, fname, 'r') { |f| puts f.read }

View File

@ -7,13 +7,13 @@ end
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
include XGen::Mongo::Driver
include Mongo
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
puts ">> Connecting to #{host}:#{port}"
db = Mongo.new(host, port).db('ruby-mongo-index_test')
db = Mongo::Mongo.new(host, port).db('ruby-mongo-index_test')
puts ">> Dropping collection test"
begin
@ -76,7 +76,7 @@ end
puts ">> Dropping index"
begin
res = coll.drop_index "all"
res = coll.drop_index "all_1"
puts "dropped : #{res.inspect}"
rescue => e
puts "Error: #{e.errmsg}"

View File

@ -1,13 +1,13 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
include XGen::Mongo::Driver
include Mongo
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
puts "Connecting to #{host}:#{port}"
db = Mongo.new(host, port).db('ruby-mongo-examples')
db = Mongo::Mongo.new(host, port).db('ruby-mongo-examples')
coll = db.collection('test')
# Erase all records from collection, if any

View File

@ -2,13 +2,13 @@ $LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
require 'pp'
include XGen::Mongo::Driver
include Mongo
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
puts "Connecting to #{host}:#{port}"
db = Mongo.new(host, port).db('ruby-mongo-examples')
db = Mongo::Mongo.new(host, port).db('ruby-mongo-examples')
coll = db.collection('test')
# Remove all records, if any

View File

@ -1,13 +1,13 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
include XGen::Mongo::Driver
include Mongo
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
puts "Connecting to #{host}:#{port}"
db = Mongo.new(host, port).db('ruby-mongo-examples')
db = Mongo::Mongo.new(host, port).db('ruby-mongo-examples')
coll = db.collection('test')
# Erase all records from collection, if any

View File

@ -1,13 +1,13 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
include XGen::Mongo::Driver
include Mongo
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
puts "Connecting to #{host}:#{port}"
db = Mongo.new(host, port).db('ruby-mongo-examples')
db = Mongo::Mongo.new(host, port).db('ruby-mongo-examples')
db.drop_collection('does-not-exist')
db.create_collection('test')

View File

@ -2,13 +2,13 @@ $LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
require 'pp'
include XGen::Mongo::Driver
include Mongo
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
port = ENV['MONGO_RUBY_DRIVER_PORT'] || XGen::Mongo::Driver::Mongo::DEFAULT_PORT
port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::Mongo::DEFAULT_PORT
puts "Connecting to #{host}:#{port}"
db = Mongo.new(host, port).db('ruby-mongo-examples')
db = Mongo::Mongo.new(host, port).db('ruby-mongo-examples')
coll = db.collection('test')
# Remove all records, if any
@ -25,13 +25,8 @@ coll.insert('array' => [1, 2, 3],
'float' => 33.33333,
'regex' => /foobar/i,
'boolean' => true,
'$where' => Code.new('this.x == 3'),
'where' => Code.new('this.x == 3'),
'dbref' => DBRef.new(coll.name, ObjectID.new),
# NOTE: the undefined type is not saved to the database properly. This is a
# Mongo bug. However, the undefined type may go away completely.
# 'undef' => Undefined.new,
'null' => nil,
'symbol' => :zildjian)

View File

@ -275,7 +275,7 @@ static int write_element_allow_id(VALUE key, VALUE value, VALUE extra, int allow
case T_STRING:
{
if (strcmp(rb_class2name(RBASIC(value)->klass),
"XGen::Mongo::Driver::Code") == 0) {
"Mongo::Code") == 0) {
int start_position, length_location, length, total_length;
write_name_and_type(buffer, key, 0x0F);
@ -314,7 +314,7 @@ static int write_element_allow_id(VALUE key, VALUE value, VALUE extra, int allow
{
// TODO there has to be a better way to do these checks...
const char* cls = rb_class2name(RBASIC(value)->klass);
if (strcmp(cls, "XGen::Mongo::Driver::Binary") == 0 ||
if (strcmp(cls, "Mongo::Binary") == 0 ||
strcmp(cls, "ByteBuffer") == 0) {
const char subtype = strcmp(cls, "ByteBuffer") ?
(const char)FIX2INT(rb_funcall(value, rb_intern("subtype"), 0)) : 2;
@ -333,7 +333,7 @@ static int write_element_allow_id(VALUE key, VALUE value, VALUE extra, int allow
buffer_write_bytes(buffer, RSTRING_PTR(string_data), length);
break;
}
if (strcmp(cls, "XGen::Mongo::Driver::ObjectID") == 0) {
if (strcmp(cls, "Mongo::ObjectID") == 0) {
VALUE as_array = rb_funcall(value, rb_intern("to_a"), 0);
int i;
write_name_and_type(buffer, key, 0x07);
@ -343,7 +343,7 @@ static int write_element_allow_id(VALUE key, VALUE value, VALUE extra, int allow
}
break;
}
if (strcmp(cls, "XGen::Mongo::Driver::DBRef") == 0) {
if (strcmp(cls, "Mongo::DBRef") == 0) {
int start_position, length_location, obj_length;
VALUE ns, oid;
write_name_and_type(buffer, key, 0x03);
@ -364,7 +364,7 @@ static int write_element_allow_id(VALUE key, VALUE value, VALUE extra, int allow
memcpy(buffer->buffer + length_location, &obj_length, 4);
break;
}
if (strcmp(cls, "XGen::Mongo::Driver::Undefined") == 0) {
if (strcmp(cls, "Mongo::Undefined") == 0) {
write_name_and_type(buffer, key, 0x0A); // just use nil type
break;
}
@ -736,25 +736,22 @@ static VALUE method_deserialize(VALUE self, VALUE bson) {
}
void Init_cbson() {
VALUE driver, CBson;
VALUE mongo, CBson;
Time = rb_const_get(rb_cObject, rb_intern("Time"));
driver = rb_const_get(rb_const_get(rb_const_get(rb_cObject,
rb_intern("XGen")),
rb_intern("Mongo")),
rb_intern("Driver"));
mongo = rb_const_get(rb_cObject, rb_intern("Mongo"));
rb_require("mongo/types/binary");
Binary = rb_const_get(driver, rb_intern("Binary"));
Binary = rb_const_get(mongo, rb_intern("Binary"));
rb_require("mongo/types/objectid");
ObjectID = rb_const_get(driver, rb_intern("ObjectID"));
ObjectID = rb_const_get(mongo, rb_intern("ObjectID"));
rb_require("mongo/types/dbref");
DBRef = rb_const_get(driver, rb_intern("DBRef"));
DBRef = rb_const_get(mongo, rb_intern("DBRef"));
rb_require("mongo/types/code");
Code = rb_const_get(driver, rb_intern("Code"));
Code = rb_const_get(mongo, rb_intern("Code"));
rb_require("mongo/types/regexp_of_holding");
RegexpOfHolding = rb_const_get(driver, rb_intern("RegexpOfHolding"));
RegexpOfHolding = rb_const_get(mongo, rb_intern("RegexpOfHolding"));
rb_require("mongo/errors");
InvalidName = rb_const_get(driver, rb_intern("InvalidName"));
InvalidName = rb_const_get(mongo, rb_intern("InvalidName"));
rb_require("mongo/util/ordered_hash");
OrderedHash = rb_const_get(rb_cObject, rb_intern("OrderedHash"));

View File

@ -12,9 +12,7 @@ require 'mongo/cursor'
require 'mongo/collection'
require 'mongo/admin'
module XGen
module Mongo
ASCENDING = 1
DESCENDING = -1
end
module Mongo
ASCENDING = 1
DESCENDING = -1
end

View File

@ -16,72 +16,68 @@
require 'mongo/util/ordered_hash'
module XGen
module Mongo
module Driver
module Mongo
# Provide administrative database methods: those having to do with
# profiling and validation.
class Admin
# Provide administrative database methods: those having to do with
# profiling and validation.
class Admin
def initialize(db)
@db = db
end
# Return the current database profiling level.
def profiling_level
oh = OrderedHash.new
oh[:profile] = -1
doc = @db.db_command(oh)
raise "Error with profile command: #{doc.inspect}" unless @db.ok?(doc) && doc['was'].kind_of?(Numeric)
case doc['was'].to_i
when 0
:off
when 1
:slow_only
when 2
:all
else
raise "Error: illegal profiling level value #{doc['was']}"
end
end
# Set database profiling level to :off, :slow_only, or :all.
def profiling_level=(level)
oh = OrderedHash.new
oh[:profile] = case level
when :off
0
when :slow_only
1
when :all
2
else
raise "Error: illegal profiling level value #{level}"
end
doc = @db.db_command(oh)
raise "Error with profile command: #{doc.inspect}" unless @db.ok?(doc)
end
# Return an array contining current profiling information from the
# database.
def profiling_info
@db.query(Collection.new(@db, DB::SYSTEM_PROFILE_COLLECTION), Query.new({})).to_a
end
# Validate a named collection by raising an exception if there is a
# problem or returning an interesting hash (see especially the
# 'result' string value) if all is well.
def validate_collection(name)
doc = @db.db_command(:validate => name)
raise "Error with validate command: #{doc.inspect}" unless @db.ok?(doc)
result = doc['result']
raise "Error with validation data: #{doc.inspect}" unless result.kind_of?(String)
raise "Error: invalid collection #{name}: #{doc.inspect}" if result =~ /\b(exception|corrupt)\b/i
doc
end
def initialize(db)
@db = db
end
# Return the current database profiling level.
def profiling_level
oh = OrderedHash.new
oh[:profile] = -1
doc = @db.db_command(oh)
raise "Error with profile command: #{doc.inspect}" unless @db.ok?(doc) && doc['was'].kind_of?(Numeric)
case doc['was'].to_i
when 0
:off
when 1
:slow_only
when 2
:all
else
raise "Error: illegal profiling level value #{doc['was']}"
end
end
# Set database profiling level to :off, :slow_only, or :all.
def profiling_level=(level)
oh = OrderedHash.new
oh[:profile] = case level
when :off
0
when :slow_only
1
when :all
2
else
raise "Error: illegal profiling level value #{level}"
end
doc = @db.db_command(oh)
raise "Error with profile command: #{doc.inspect}" unless @db.ok?(doc)
end
# Return an array contining current profiling information from the
# database.
def profiling_info
@db.query(Collection.new(@db, DB::SYSTEM_PROFILE_COLLECTION), Query.new({})).to_a
end
# Validate a named collection by raising an exception if there is a
# problem or returning an interesting hash (see especially the
# 'result' string value) if all is well.
def validate_collection(name)
doc = @db.db_command(:validate => name)
raise "Error with validate command: #{doc.inspect}" unless @db.ok?(doc)
result = doc['result']
raise "Error with validation data: #{doc.inspect}" unless result.kind_of?(String)
raise "Error: invalid collection #{name}: #{doc.inspect}" if result =~ /\b(exception|corrupt)\b/i
doc
end
end
end

View File

@ -16,320 +16,318 @@
require 'mongo/query'
module XGen
module Mongo
module Driver
module Mongo
# A named collection of records in a database.
class Collection
# A named collection of records in a database.
class Collection
attr_reader :db, :name, :hint
attr_reader :db, :name, :hint
def initialize(db, name)
case name
when Symbol, String
else
raise TypeError, "new_name must be a string or symbol"
end
def initialize(db, name)
case name
when Symbol, String
else
raise TypeError, "new_name must be a string or symbol"
end
name = name.to_s
name = name.to_s
if name.empty? or name.include? ".."
raise InvalidName, "collection names cannot be empty"
end
if name.include? "$" and not name.match(/^\$cmd/)
raise InvalidName, "collection names must not contain '$'"
end
if name.match(/^\./) or name.match(/\.$/)
raise InvalidName, "collection names must not start or end with '.'"
end
if name.empty? or name.include? ".."
raise InvalidName, "collection names cannot be empty"
end
if name.include? "$" and not name.match(/^\$cmd/)
raise InvalidName, "collection names must not contain '$'"
end
if name.match(/^\./) or name.match(/\.$/)
raise InvalidName, "collection names must not start or end with '.'"
end
@db, @name = db, name
@hint = nil
@db, @name = db, name
@hint = nil
end
# Get a sub-collection of this collection by name.
#
# Raises InvalidName if an invalid collection name is used.
#
# :name :: the name of the collection to get
def [](name)
name = "#{self.name}.#{name}"
return Collection.new(self, name) if !db.strict? || db.collection_names.include?(name)
raise "Collection #{name} doesn't exist. Currently in strict mode."
end
# Set hint fields to use and return +self+. hint may be a single field
# name, array of field names, or a hash (preferably an OrderedHash).
# May be +nil+.
def hint=(hint)
@hint = normalize_hint_fields(hint)
self
end
# Query the database.
#
# The +selector+ argument is a prototype document that all results must
# match. For example:
#
# collection.find({"hello" => "world"})
#
# only matches documents that have a key "hello" with value "world".
# Matches can have other keys *in addition* to "hello".
#
# If given an optional block +find+ will yield a Cursor to that block,
# close the cursor, and then return nil. This guarantees that partially
# evaluated cursors will be closed. If given no block +find+ returns a
# cursor.
#
# :selector :: A document (hash) specifying elements which must be
# present for a document to be included in the result set.
#
# Options:
# :fields :: Array of field names that should be returned in the result
# set ("_id" will always be included). By limiting results
# to a certain subset of fields you can cut down on network
# traffic and decoding time.
# :offset :: Start at this record when returning records
# :limit :: Maximum number of records to return
# :sort :: Either hash of field names as keys and 1/-1 as values; 1 ==
# ascending, -1 == descending, or array of field names (all
# assumed to be sorted in ascending order).
# :hint :: See #hint. This option overrides the collection-wide value.
# :snapshot :: If true, snapshot mode will be used for this query.
# Snapshot mode assures no duplicates are returned, or
# objects missed, which were preset at both the start and
# end of the query's execution. For details see
# http://www.mongodb.org/display/DOCS/How+to+do+Snapshotting+in+the+Mongo+Database
def find(selector={}, options={})
fields = options.delete(:fields)
fields = ["_id"] if fields && fields.empty?
offset = options.delete(:offset) || 0
limit = options.delete(:limit) || 0
sort = options.delete(:sort)
hint = options.delete(:hint)
snapshot = options.delete(:snapshot)
if hint
hint = normalize_hint_fields(hint)
else
hint = @hint # assumed to be normalized already
end
raise RuntimeError, "Unknown options [#{options.inspect}]" unless options.empty?
cursor = @db.query(self, Query.new(selector, fields, offset, limit, sort, hint, snapshot))
if block_given?
yield cursor
cursor.close()
nil
else
cursor
end
end
# Get a single object from the database.
#
# Raises TypeError if the argument is of an improper type. Returns a
# single document (hash), or nil if no result is found.
#
# :spec_or_object_id :: a hash specifying elements which must be
# present for a document to be included in the result set OR an
# instance of ObjectID to be used as the value for an _id query.
# if nil an empty spec, {}, will be used.
# :options :: options, as passed to Collection#find
def find_one(spec_or_object_id=nil, options={})
spec = case spec_or_object_id
when nil
{}
when ObjectID
{:_id => spec_or_object_id}
when Hash
spec_or_object_id
else
raise TypeError, "spec_or_object_id must be an instance of ObjectID or Hash, or nil"
end
find(spec, options.merge(:limit => -1)).next_object
end
# DEPRECATED - use find_one instead
#
# Find the first record that matches +selector+. See #find.
def find_first(selector={}, options={})
warn "Collection#find_first is deprecated and will be removed. Please use Collection#find_one instead."
find_one(selector, options)
end
# Save a document in this collection.
#
# If +to_save+ already has an '_id' then an update (upsert) operation
# is performed and any existing document with that _id is overwritten.
# Otherwise an insert operation is performed. Returns the _id of the
# saved document.
#
# :to_save :: the document (a hash) to be saved
#
# Options:
# :safe :: if true, check that the save succeeded. OperationFailure
# will be raised on an error. Checking for safety requires an extra
# round-trip to the database
def save(to_save, options={})
if id = to_save[:_id] || to_save['_id']
update({:_id => id}, to_save, :upsert => true, :safe => options.delete(:safe))
id
else
insert(to_save, :safe => options.delete(:safe))
end
end
# Insert a document(s) into this collection.
#
# "<<" is aliased to this method. Returns the _id of the inserted
# document or a list of _ids of the inserted documents. The object(s)
# may have been modified by the database's PK factory, if it has one.
#
# :doc_or_docs :: a document (as a hash) or Array of documents to be
# inserted
#
# Options:
# :safe :: if true, check that the insert succeeded. OperationFailure
# will be raised on an error. Checking for safety requires an extra
# round-trip to the database
def insert(doc_or_docs, options={})
doc_or_docs = [doc_or_docs] if !doc_or_docs.is_a?(Array)
res = @db.insert_into_db(@name, doc_or_docs)
if options.delete(:safe)
error = @db.error
if error
raise OperationFailure, error
end
end
res.size > 1 ? res : res.first
end
alias_method :<<, :insert
# Get a sub-collection of this collection by name.
#
# Raises InvalidName if an invalid collection name is used.
#
# :name :: the name of the collection to get
def [](name)
name = "#{self.name}.#{name}"
return Collection.new(self, name) if !db.strict? || db.collection_names.include?(name)
raise "Collection #{name} doesn't exist. Currently in strict mode."
# Remove the records that match +selector+.
def remove(selector={})
@db.remove_from_db(@name, selector)
end
# Remove all records.
def clear
remove({})
end
# DEPRECATED - use update(... :upsert => true) instead
#
# Update records that match +selector+ by applying +obj+ as an update.
# If no match, inserts (???).
def repsert(selector, obj)
warn "Collection#repsert is deprecated and will be removed. Please use Collection#update instead."
update(selector, obj, :upsert => true)
end
# DEPRECATED - use update(... :upsert => false) instead
#
# Update records that match +selector+ by applying +obj+ as an update.
def replace(selector, obj)
warn "Collection#replace is deprecated and will be removed. Please use Collection#update instead."
update(selector, obj)
end
# DEPRECATED - use update(... :upsert => false) instead
#
# Update records that match +selector+ by applying +obj+ as an update.
# Both +selector+ and +modifier_obj+ are required.
def modify(selector, modifier_obj)
warn "Collection#modify is deprecated and will be removed. Please use Collection#update instead."
update(selector, modifier_obj)
end
# Update a document(s) in this collection.
#
# :spec :: a hash specifying elements which must be present for
# a document to be updated
# :document :: a hash specifying the fields to be changed in the
# selected document(s), or (in the case of an upsert) the document to
# be inserted
#
# Options:
# :upsert :: if true, perform an upsert operation
# :safe :: if true, check that the update succeeded. OperationFailure
# will be raised on an error. Checking for safety requires an extra
# round-trip to the database
def update(spec, document, options={})
upsert = options.delete(:upsert)
safe = options.delete(:safe)
if upsert
@db.repsert_in_db(@name, spec, document)
else
@db.replace_in_db(@name, spec, document)
end
if safe
error = @db.error
if error
raise OperationFailure, error
end
end
end
# Set hint fields to use and return +self+. hint may be a single field
# name, array of field names, or a hash (preferably an OrderedHash).
# May be +nil+.
def hint=(hint)
@hint = normalize_hint_fields(hint)
self
# Create a new index. +field_or_spec+
# should be either a single field name or a Array of [field name,
# direction] pairs. Directions should be specified as
# Mongo::ASCENDING or Mongo::DESCENDING.
# +unique+ is an optional boolean indicating whether this index
# should enforce a uniqueness constraint.
def create_index(field_or_spec, unique=false)
@db.create_index(@name, field_or_spec, unique)
end
# Drop index +name+.
def drop_index(name)
@db.drop_index(@name, name)
end
# Drop all indexes.
def drop_indexes
# just need to call drop indexes with no args; will drop them all
@db.drop_index(@name, '*')
end
# Drop the entire collection. USE WITH CAUTION.
def drop
@db.drop_collection(@name)
end
# Perform a query similar to an SQL group by operation.
#
# Returns an array of grouped items.
#
# :keys :: Array of fields to group by
# :condition :: specification of rows to be considered (as a 'find'
# query specification)
# :initial :: initial value of the aggregation counter object
# :reduce :: aggregation function as a JavaScript string
# :command :: if true, run the group as a command instead of in an
# eval - it is likely that this option will eventually be
# deprecated and all groups will be run as commands
def group(keys, condition, initial, reduce, command=false)
if command
hash = {}
keys.each do |k|
hash[k] = 1
end
# Query the database.
#
# The +selector+ argument is a prototype document that all results must
# match. For example:
#
# collection.find({"hello" => "world"})
#
# only matches documents that have a key "hello" with value "world".
# Matches can have other keys *in addition* to "hello".
#
# If given an optional block +find+ will yield a Cursor to that block,
# close the cursor, and then return nil. This guarantees that partially
# evaluated cursors will be closed. If given no block +find+ returns a
# cursor.
#
# :selector :: A document (hash) specifying elements which must be
# present for a document to be included in the result set.
#
# Options:
# :fields :: Array of field names that should be returned in the result
# set ("_id" will always be included). By limiting results
# to a certain subset of fields you can cut down on network
# traffic and decoding time.
# :offset :: Start at this record when returning records
# :limit :: Maximum number of records to return
# :sort :: Either hash of field names as keys and 1/-1 as values; 1 ==
# ascending, -1 == descending, or array of field names (all
# assumed to be sorted in ascending order).
# :hint :: See #hint. This option overrides the collection-wide value.
# :snapshot :: If true, snapshot mode will be used for this query.
# Snapshot mode assures no duplicates are returned, or
# objects missed, which were preset at both the start and
# end of the query's execution. For details see
# http://www.mongodb.org/display/DOCS/How+to+do+Snapshotting+in+the+Mongo+Database
def find(selector={}, options={})
fields = options.delete(:fields)
fields = ["_id"] if fields && fields.empty?
offset = options.delete(:offset) || 0
limit = options.delete(:limit) || 0
sort = options.delete(:sort)
hint = options.delete(:hint)
snapshot = options.delete(:snapshot)
if hint
hint = normalize_hint_fields(hint)
else
hint = @hint # assumed to be normalized already
end
raise RuntimeError, "Unknown options [#{options.inspect}]" unless options.empty?
cursor = @db.query(self, Query.new(selector, fields, offset, limit, sort, hint, snapshot))
if block_given?
yield cursor
cursor.close()
nil
else
cursor
end
result = @db.db_command({"group" =>
{
"ns" => @name,
"$reduce" => Code.new(reduce),
"key" => hash,
"cond" => condition,
"initial" => initial}})
if result["ok"] == 1
return result["retval"]
else
raise OperationFailure, "group command failed: #{result['errmsg']}"
end
# Get a single object from the database.
#
# Raises TypeError if the argument is of an improper type. Returns a
# single document (hash), or nil if no result is found.
#
# :spec_or_object_id :: a hash specifying elements which must be
# present for a document to be included in the result set OR an
# instance of ObjectID to be used as the value for an _id query.
# if nil an empty spec, {}, will be used.
# :options :: options, as passed to Collection#find
def find_one(spec_or_object_id=nil, options={})
spec = case spec_or_object_id
when nil
{}
when ObjectID
{:_id => spec_or_object_id}
when Hash
spec_or_object_id
else
raise TypeError, "spec_or_object_id must be an instance of ObjectID or Hash, or nil"
end
find(spec, options.merge(:limit => -1)).next_object
end
# DEPRECATED - use find_one instead
#
# Find the first record that matches +selector+. See #find.
def find_first(selector={}, options={})
warn "Collection#find_first is deprecated and will be removed. Please use Collection#find_one instead."
find_one(selector, options)
end
# Save a document in this collection.
#
# If +to_save+ already has an '_id' then an update (upsert) operation
# is performed and any existing document with that _id is overwritten.
# Otherwise an insert operation is performed. Returns the _id of the
# saved document.
#
# :to_save :: the document (a hash) to be saved
#
# Options:
# :safe :: if true, check that the save succeeded. OperationFailure
# will be raised on an error. Checking for safety requires an extra
# round-trip to the database
def save(to_save, options={})
if id = to_save[:_id] || to_save['_id']
update({:_id => id}, to_save, :upsert => true, :safe => options.delete(:safe))
id
else
insert(to_save, :safe => options.delete(:safe))
end
end
# Insert a document(s) into this collection.
#
# "<<" is aliased to this method. Returns the _id of the inserted
# document or a list of _ids of the inserted documents. The object(s)
# may have been modified by the database's PK factory, if it has one.
#
# :doc_or_docs :: a document (as a hash) or Array of documents to be
# inserted
#
# Options:
# :safe :: if true, check that the insert succeeded. OperationFailure
# will be raised on an error. Checking for safety requires an extra
# round-trip to the database
def insert(doc_or_docs, options={})
doc_or_docs = [doc_or_docs] if !doc_or_docs.is_a?(Array)
res = @db.insert_into_db(@name, doc_or_docs)
if options.delete(:safe)
error = @db.error
if error
raise OperationFailure, error
end
end
res.size > 1 ? res : res.first
end
alias_method :<<, :insert
# Remove the records that match +selector+.
def remove(selector={})
@db.remove_from_db(@name, selector)
end
# Remove all records.
def clear
remove({})
end
# DEPRECATED - use update(... :upsert => true) instead
#
# Update records that match +selector+ by applying +obj+ as an update.
# If no match, inserts (???).
def repsert(selector, obj)
warn "Collection#repsert is deprecated and will be removed. Please use Collection#update instead."
update(selector, obj, :upsert => true)
end
# DEPRECATED - use update(... :upsert => false) instead
#
# Update records that match +selector+ by applying +obj+ as an update.
def replace(selector, obj)
warn "Collection#replace is deprecated and will be removed. Please use Collection#update instead."
update(selector, obj)
end
# DEPRECATED - use update(... :upsert => false) instead
#
# Update records that match +selector+ by applying +obj+ as an update.
# Both +selector+ and +modifier_obj+ are required.
def modify(selector, modifier_obj)
warn "Collection#modify is deprecated and will be removed. Please use Collection#update instead."
update(selector, modifier_obj)
end
# Update a document(s) in this collection.
#
# :spec :: a hash specifying elements which must be present for
# a document to be updated
# :document :: a hash specifying the fields to be changed in the
# selected document(s), or (in the case of an upsert) the document to
# be inserted
#
# Options:
# :upsert :: if true, perform an upsert operation
# :safe :: if true, check that the update succeeded. OperationFailure
# will be raised on an error. Checking for safety requires an extra
# round-trip to the database
def update(spec, document, options={})
upsert = options.delete(:upsert)
safe = options.delete(:safe)
if upsert
@db.repsert_in_db(@name, spec, document)
else
@db.replace_in_db(@name, spec, document)
end
if safe
error = @db.error
if error
raise OperationFailure, error
end
end
end
# Create a new index. +field_or_spec+
# should be either a single field name or a Array of [field name,
# direction] pairs. Directions should be specified as
# XGen::Mongo::ASCENDING or XGen::Mongo::DESCENDING.
# +unique+ is an optional boolean indicating whether this index
# should enforce a uniqueness constraint.
def create_index(field_or_spec, unique=false)
@db.create_index(@name, field_or_spec, unique)
end
# Drop index +name+.
def drop_index(name)
@db.drop_index(@name, name)
end
# Drop all indexes.
def drop_indexes
# just need to call drop indexes with no args; will drop them all
@db.drop_index(@name, '*')
end
# Drop the entire collection. USE WITH CAUTION.
def drop
@db.drop_collection(@name)
end
# Perform a query similar to an SQL group by operation.
#
# Returns an array of grouped items.
#
# :keys :: Array of fields to group by
# :condition :: specification of rows to be considered (as a 'find'
# query specification)
# :initial :: initial value of the aggregation counter object
# :reduce :: aggregation function as a JavaScript string
# :command :: if true, run the group as a command instead of in an
# eval - it is likely that this option will eventually be
# deprecated and all groups will be run as commands
def group(keys, condition, initial, reduce, command=false)
if command
hash = {}
keys.each do |k|
hash[k] = 1
end
result = @db.db_command({"group" =>
{
"ns" => @name,
"$reduce" => Code.new(reduce),
"key" => hash,
"cond" => condition,
"initial" => initial}})
if result["ok"] == 1
return result["retval"]
else
raise OperationFailure, "group command failed: #{result['errmsg']}"
end
end
group_function = <<EOS
end
group_function = <<EOS
function () {
var c = db[ns].find(condition);
var map = new Map();
@ -354,88 +352,85 @@ function () {
return {"result": map.values()};
}
EOS
return @db.eval(Code.new(group_function,
{
"ns" => @name,
"keys" => keys,
"condition" => condition,
"initial" => initial
}))["result"]
end
return @db.eval(Code.new(group_function,
{
"ns" => @name,
"keys" => keys,
"condition" => condition,
"initial" => initial
}))["result"]
end
# Rename this collection.
#
# If operating in auth mode, client must be authorized as an admin to
# perform this operation. Raises +InvalidName+ if +new_name+ is an invalid
# collection name.
#
# :new_name :: new name for this collection
def rename(new_name)
case new_name
when Symbol, String
else
raise TypeError, "new_name must be a string or symbol"
end
# Rename this collection.
#
# If operating in auth mode, client must be authorized as an admin to
# perform this operation. Raises +InvalidName+ if +new_name+ is an invalid
# collection name.
#
# :new_name :: new name for this collection
def rename(new_name)
case new_name
when Symbol, String
else
raise TypeError, "new_name must be a string or symbol"
end
new_name = new_name.to_s
new_name = new_name.to_s
if new_name.empty? or new_name.include? ".."
raise InvalidName, "collection names cannot be empty"
end
if new_name.include? "$"
raise InvalidName, "collection names must not contain '$'"
end
if new_name.match(/^\./) or new_name.match(/\.$/)
raise InvalidName, "collection names must not start or end with '.'"
end
if new_name.empty? or new_name.include? ".."
raise InvalidName, "collection names cannot be empty"
end
if new_name.include? "$"
raise InvalidName, "collection names must not contain '$'"
end
if new_name.match(/^\./) or new_name.match(/\.$/)
raise InvalidName, "collection names must not start or end with '.'"
end
@db.rename_collection(@name, new_name)
end
@db.rename_collection(@name, new_name)
end
# Get information on the indexes for the collection +collection_name+.
# Returns a hash where the keys are index names (as returned by
# Collection#create_index and the values are lists of [key, direction]
# pairs specifying the index (as passed to Collection#create_index).
def index_information
@db.index_information(@name)
end
# Get information on the indexes for the collection +collection_name+.
# Returns a hash where the keys are index names (as returned by
# Collection#create_index and the values are lists of [key, direction]
# pairs specifying the index (as passed to Collection#create_index).
def index_information
@db.index_information(@name)
end
# Return a hash containing options that apply to this collection.
# 'create' will be the collection name. For the other possible keys
# and values, see DB#create_collection.
def options
@db.collections_info(@name).next_object()['options']
end
# Return a hash containing options that apply to this collection.
# 'create' will be the collection name. For the other possible keys
# and values, see DB#create_collection.
def options
@db.collections_info(@name).next_object()['options']
end
# Get the number of documents in this collection.
#
# Specifying a +selector+ is DEPRECATED and will be removed. Please use
# find(selector).count() instead.
def count(selector=nil)
if selector
warn "specifying a selector for Collection#count is deprecated and will be removed. Please use Collection.find(selector).count instead."
end
find(selector || {}).count()
end
# Get the number of documents in this collection.
#
# Specifying a +selector+ is DEPRECATED and will be removed. Please use
# find(selector).count() instead.
def count(selector=nil)
if selector
warn "specifying a selector for Collection#count is deprecated and will be removed. Please use Collection.find(selector).count instead."
end
find(selector || {}).count()
end
protected
protected
def normalize_hint_fields(hint)
case hint
when String
{hint => 1}
when Hash
hint
when nil
nil
else
h = OrderedHash.new
hint.to_a.each { |k| h[k] = 1 }
h
end
end
def normalize_hint_fields(hint)
case hint
when String
{hint => 1}
when Hash
hint
when nil
nil
else
h = OrderedHash.new
hint.to_a.each { |k| h[k] = 1 }
h
end
end
end
end

View File

@ -18,232 +18,228 @@ require 'mongo/message'
require 'mongo/util/byte_buffer'
require 'mongo/util/bson'
module XGen
module Mongo
module Driver
module Mongo
# A cursor over query results. Returned objects are hashes.
class Cursor
# A cursor over query results. Returned objects are hashes.
class Cursor
include Enumerable
include Enumerable
RESPONSE_HEADER_SIZE = 20
RESPONSE_HEADER_SIZE = 20
attr_reader :db, :collection, :query
attr_reader :db, :collection, :query
def initialize(db, collection, query, admin=false)
@db, @collection, @query, @admin = db, collection, query, admin
@num_to_return = @query.number_to_return || 0
@cache = []
@closed = false
@can_call_to_a = true
@query_run = false
@rows = nil
end
def initialize(db, collection, query, admin=false)
@db, @collection, @query, @admin = db, collection, query, admin
@num_to_return = @query.number_to_return || 0
@cache = []
@closed = false
@can_call_to_a = true
@query_run = false
@rows = nil
end
def closed?; @closed; end
def closed?; @closed; end
# Internal method, not for general use. Return +true+ if there are
# more records to retrieve. We do not check @num_to_return; #each is
# responsible for doing that.
def more?
num_remaining > 0
end
# Internal method, not for general use. Return +true+ if there are
# more records to retrieve. We do not check @num_to_return; #each is
# responsible for doing that.
def more?
num_remaining > 0
end
# Return the next object or nil if there are no more. Raises an error
# if necessary.
def next_object
refill_via_get_more if num_remaining == 0
o = @cache.shift
# Return the next object or nil if there are no more. Raises an error
# if necessary.
def next_object
refill_via_get_more if num_remaining == 0
o = @cache.shift
if o && o['$err']
err = o['$err']
if o && o['$err']
err = o['$err']
# If the server has stopped being the master (e.g., it's one of a
# pair but it has died or something like that) then we close that
# connection. If the db has auto connect option and a pair of
# servers, next request will re-open on master server.
@db.close if err == "not master"
# If the server has stopped being the master (e.g., it's one of a
# pair but it has died or something like that) then we close that
# connection. If the db has auto connect option and a pair of
# servers, next request will re-open on master server.
@db.close if err == "not master"
raise err
end
o
end
# Get the size of the results set for this query.
#
# Returns the number of objects in the results set for this query. Does
# not take limit and skip into account. Raises OperationFailure on a
# database error.
def count
command = OrderedHash["count", @collection.name,
"query", @query.selector]
response = @db.db_command(command)
return response['n'].to_i if response['ok'] == 1
return 0 if response['errmsg'] == "ns missing"
raise OperationFailure, "Count failed: #{response['errmsg']}"
end
# Iterate over each object, yielding it to the given block. At most
# @num_to_return records are returned (or all of them, if
# @num_to_return is 0).
#
# If #to_a has already been called then this method uses the array
# that we store internally. In that case, #each can be called multiple
# times because it re-uses that array.
#
# You can call #each after calling #to_a (multiple times even, because
# it will use the internally-stored array), but you can't call #to_a
# after calling #each unless you also called it before calling #each.
# If you try to do that, an error will be raised.
def each
if @rows # Already turned into an array
@rows.each { |row| yield row }
else
num_returned = 0
while more? && (@num_to_return <= 0 || num_returned < @num_to_return)
yield next_object()
num_returned += 1
end
@can_call_to_a = false
end
end
# Return all of the rows (up to the +num_to_return+ value specified in
# #new) as an array. Calling this multiple times will work fine; it
# always returns the same array.
#
# Don't use this if you're expecting large amounts of data, of course.
# All of the returned rows are kept in an array stored in this object
# so it can be reused.
#
# You can call #each after calling #to_a (multiple times even, because
# it will use the internally-stored array), but you can't call #to_a
# after calling #each unless you also called it before calling #each.
# If you try to do that, an error will be raised.
def to_a
return @rows if @rows
raise "can't call Cursor#to_a after calling Cursor#each" unless @can_call_to_a
@rows = []
num_returned = 0
while more? && (@num_to_return <= 0 || num_returned < @num_to_return)
@rows << next_object()
num_returned += 1
end
@rows
end
# Returns an explain plan record.
def explain
old_val = @query.explain
@query.explain = true
c = Cursor.new(@db, @collection, @query)
explanation = c.next_object
c.close
@query.explain = old_val
explanation
end
# Close the cursor.
#
# Note: if a cursor is read until exhausted (read until OP_QUERY or
# OP_GETMORE returns zero for the cursor id), there is no need to
# close it by calling this method.
def close
@db.send_to_db(KillCursorsMessage.new(@cursor_id)) if @cursor_id
@cache = []
@cursor_id = 0
@closed = true
end
protected
def read_all
read_message_header
read_response_header
read_objects_off_wire
end
def read_objects_off_wire
while doc = next_object_on_wire
@cache << doc
end
end
def read_message_header
MessageHeader.new.read_header(@db)
end
def read_response_header
header_buf = ByteBuffer.new
header_buf.put_array(@db.receive_full(RESPONSE_HEADER_SIZE).unpack("C*"))
raise "Short read for DB response header; expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}" unless header_buf.length == RESPONSE_HEADER_SIZE
header_buf.rewind
@result_flags = header_buf.get_int
@cursor_id = header_buf.get_long
@starting_from = header_buf.get_int
@n_returned = header_buf.get_int
@n_remaining = @n_returned
end
def num_remaining
refill_via_get_more if @cache.length == 0
@cache.length
end
private
def next_object_on_wire
send_query_if_needed
# if @n_remaining is 0 but we have a non-zero cursor, there are more
# to fetch, so do a GetMore operation, but don't do it here - do it
# when someone pulls an object out of the cache and it's empty
return nil if @n_remaining == 0
object_from_stream
end
def refill_via_get_more
if send_query_if_needed or @cursor_id == 0
return
end
@db._synchronize {
@db.send_to_db(GetMoreMessage.new(@admin ? 'admin' : @db.name, @collection.name, @cursor_id))
read_all
}
end
def object_from_stream
buf = ByteBuffer.new
buf.put_array(@db.receive_full(4).unpack("C*"))
buf.rewind
size = buf.get_int
buf.put_array(@db.receive_full(size - 4).unpack("C*"), 4)
@n_remaining -= 1
buf.rewind
BSON.new.deserialize(buf)
end
def send_query_if_needed
# Run query first time we request an object from the wire
if @query_run
false
else
@db._synchronize {
@db.send_query_message(QueryMessage.new(@admin ? 'admin' : @db.name, @collection.name, @query))
@query_run = true
read_all
}
true
end
end
def to_s
"DBResponse(flags=#@result_flags, cursor_id=#@cursor_id, start=#@starting_from, n_returned=#@n_returned)"
end
raise err
end
o
end
# Get the size of the results set for this query.
#
# Returns the number of objects in the results set for this query. Does
# not take limit and skip into account. Raises OperationFailure on a
# database error.
def count
command = OrderedHash["count", @collection.name,
"query", @query.selector]
response = @db.db_command(command)
return response['n'].to_i if response['ok'] == 1
return 0 if response['errmsg'] == "ns missing"
raise OperationFailure, "Count failed: #{response['errmsg']}"
end
# Iterate over each object, yielding it to the given block. At most
# @num_to_return records are returned (or all of them, if
# @num_to_return is 0).
#
# If #to_a has already been called then this method uses the array
# that we store internally. In that case, #each can be called multiple
# times because it re-uses that array.
#
# You can call #each after calling #to_a (multiple times even, because
# it will use the internally-stored array), but you can't call #to_a
# after calling #each unless you also called it before calling #each.
# If you try to do that, an error will be raised.
def each
if @rows # Already turned into an array
@rows.each { |row| yield row }
else
num_returned = 0
while more? && (@num_to_return <= 0 || num_returned < @num_to_return)
yield next_object()
num_returned += 1
end
@can_call_to_a = false
end
end
# Return all of the rows (up to the +num_to_return+ value specified in
# #new) as an array. Calling this multiple times will work fine; it
# always returns the same array.
#
# Don't use this if you're expecting large amounts of data, of course.
# All of the returned rows are kept in an array stored in this object
# so it can be reused.
#
# You can call #each after calling #to_a (multiple times even, because
# it will use the internally-stored array), but you can't call #to_a
# after calling #each unless you also called it before calling #each.
# If you try to do that, an error will be raised.
def to_a
return @rows if @rows
raise "can't call Cursor#to_a after calling Cursor#each" unless @can_call_to_a
@rows = []
num_returned = 0
while more? && (@num_to_return <= 0 || num_returned < @num_to_return)
@rows << next_object()
num_returned += 1
end
@rows
end
# Returns an explain plan record.
def explain
old_val = @query.explain
@query.explain = true
c = Cursor.new(@db, @collection, @query)
explanation = c.next_object
c.close
@query.explain = old_val
explanation
end
# Close the cursor.
#
# Note: if a cursor is read until exhausted (read until OP_QUERY or
# OP_GETMORE returns zero for the cursor id), there is no need to
# close it by calling this method.
def close
@db.send_to_db(KillCursorsMessage.new(@cursor_id)) if @cursor_id
@cache = []
@cursor_id = 0
@closed = true
end
protected
def read_all
read_message_header
read_response_header
read_objects_off_wire
end
def read_objects_off_wire
while doc = next_object_on_wire
@cache << doc
end
end
def read_message_header
MessageHeader.new.read_header(@db)
end
def read_response_header
header_buf = ByteBuffer.new
header_buf.put_array(@db.receive_full(RESPONSE_HEADER_SIZE).unpack("C*"))
raise "Short read for DB response header; expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}" unless header_buf.length == RESPONSE_HEADER_SIZE
header_buf.rewind
@result_flags = header_buf.get_int
@cursor_id = header_buf.get_long
@starting_from = header_buf.get_int
@n_returned = header_buf.get_int
@n_remaining = @n_returned
end
def num_remaining
refill_via_get_more if @cache.length == 0
@cache.length
end
private
def next_object_on_wire
send_query_if_needed
# if @n_remaining is 0 but we have a non-zero cursor, there are more
# to fetch, so do a GetMore operation, but don't do it here - do it
# when someone pulls an object out of the cache and it's empty
return nil if @n_remaining == 0
object_from_stream
end
def refill_via_get_more
if send_query_if_needed or @cursor_id == 0
return
end
@db._synchronize {
@db.send_to_db(GetMoreMessage.new(@admin ? 'admin' : @db.name, @collection.name, @cursor_id))
read_all
}
end
def object_from_stream
buf = ByteBuffer.new
buf.put_array(@db.receive_full(4).unpack("C*"))
buf.rewind
size = buf.get_int
buf.put_array(@db.receive_full(size - 4).unpack("C*"), 4)
@n_remaining -= 1
buf.rewind
BSON.new.deserialize(buf)
end
def send_query_if_needed
# Run query first time we request an object from the wire
if @query_run
false
else
@db._synchronize {
@db.send_query_message(QueryMessage.new(@admin ? 'admin' : @db.name, @collection.name, @query))
@query_run = true
read_all
}
true
end
end
def to_s
"DBResponse(flags=#@result_flags, cursor_id=#@cursor_id, start=#@starting_from, n_returned=#@n_returned)"
end
end
end

File diff suppressed because it is too large Load Diff

View File

@ -14,14 +14,10 @@
# Exceptions raised by the MongoDB driver.
module XGen
module Mongo
module Driver
# Raised when a database operation fails.
class OperationFailure < RuntimeError; end
module Mongo
# Raised when a database operation fails.
class OperationFailure < RuntimeError; end
# Raised when an invalid name is used.
class InvalidName < RuntimeError; end
end
end
# Raised when an invalid name is used.
class InvalidName < RuntimeError; end
end

View File

@ -19,78 +19,74 @@ require 'mongo/util/byte_buffer'
require 'mongo/util/ordered_hash'
module XGen
module Mongo
module GridFS
module GridFS
# A chunk stores a portion of GridStore data.
class Chunk
# A chunk stores a portion of GridStore data.
class Chunk
DEFAULT_CHUNK_SIZE = 1024 * 256
DEFAULT_CHUNK_SIZE = 1024 * 256
attr_reader :object_id, :chunk_number
attr_accessor :data
attr_reader :object_id, :chunk_number
attr_accessor :data
def initialize(file, mongo_object={})
@file = file
@object_id = mongo_object['_id'] || XGen::Mongo::Driver::ObjectID.new
@chunk_number = mongo_object['n'] || 0
def initialize(file, mongo_object={})
@file = file
@object_id = mongo_object['_id'] || Mongo::ObjectID.new
@chunk_number = mongo_object['n'] || 0
@data = ByteBuffer.new
case mongo_object['data']
when String
mongo_object['data'].each_byte { |b| @data.put(b) }
when ByteBuffer
@data.put_array(mongo_object['data'].to_a)
when Array
@data.put_array(mongo_object['data'])
when nil
else
raise "illegal chunk format; data is #{mongo_object['data'] ? (' ' + mongo_object['data'].class.name) : 'nil'}"
end
@data.rewind
end
@data = ByteBuffer.new
case mongo_object['data']
when String
mongo_object['data'].each_byte { |b| @data.put(b) }
when ByteBuffer
@data.put_array(mongo_object['data'].to_a)
when Array
@data.put_array(mongo_object['data'])
when nil
else
raise "illegal chunk format; data is #{mongo_object['data'] ? (' ' + mongo_object['data'].class.name) : 'nil'}"
end
@data.rewind
end
def pos; @data.position; end
def pos=(pos); @data.position = pos; end
def eof?; !@data.more?; end
def pos; @data.position; end
def pos=(pos); @data.position = pos; end
def eof?; !@data.more?; end
def size; @data.size; end
alias_method :length, :size
# Erase all data after current position.
def truncate
if @data.position < @data.length
curr_data = @data
@data = ByteBuffer.new
@data.put_array(curr_data.to_a[0...curr_data.position])
end
end
def getc
@data.more? ? @data.get : nil
end
def putc(byte)
@data.put(byte)
end
def save
coll = @file.chunk_collection
coll.remove({'_id' => @object_id})
coll.insert(to_mongo_object)
end
def to_mongo_object
h = OrderedHash.new
h['_id'] = @object_id
h['files_id'] = @file.files_id
h['n'] = @chunk_number
h['data'] = data
h
end
def size; @data.size; end
alias_method :length, :size
# Erase all data after current position.
def truncate
if @data.position < @data.length
curr_data = @data
@data = ByteBuffer.new
@data.put_array(curr_data.to_a[0...curr_data.position])
end
end
def getc
@data.more? ? @data.get : nil
end
def putc(byte)
@data.put(byte)
end
def save
coll = @file.chunk_collection
coll.remove({'_id' => @object_id})
coll.insert(to_mongo_object)
end
def to_mongo_object
h = OrderedHash.new
h['_id'] = @object_id
h['files_id'] = @file.files_id
h['n'] = @chunk_number
h['data'] = data
h
end
end
end

View File

@ -18,451 +18,447 @@ require 'mongo/types/objectid'
require 'mongo/util/ordered_hash'
require 'mongo/gridfs/chunk'
module XGen
module Mongo
module GridFS
module GridFS
# GridStore is an IO-like object that provides input and output for
# streams of data to Mongo. See Mongo's documentation about GridFS for
# storage implementation details.
# GridStore is an IO-like object that provides input and output for
# streams of data to Mongo. See Mongo's documentation about GridFS for
# storage implementation details.
#
# Example code:
#
# require 'mongo/gridfs'
# GridStore.open(database, 'filename', 'w') { |f|
# f.puts "Hello, world!"
# }
# GridStore.open(database, 'filename, 'r') { |f|
# puts f.read # => Hello, world!\n
# }
# GridStore.open(database, 'filename', 'w+') { |f|
# f.puts "But wait, there's more!"
# }
# GridStore.open(database, 'filename, 'r') { |f|
# puts f.read # => Hello, world!\nBut wait, there's more!\n
# }
class GridStore
DEFAULT_ROOT_COLLECTION = 'fs'
DEFAULT_CONTENT_TYPE = 'text/plain'
include Enumerable
attr_accessor :filename
# Array of strings; may be +nil+
attr_accessor :aliases
# Default is DEFAULT_CONTENT_TYPE
attr_accessor :content_type
attr_accessor :metadata
attr_reader :files_id
# Time that the file was first saved.
attr_reader :upload_date
attr_reader :chunk_size
attr_accessor :lineno
attr_reader :md5
class << self
def exist?(db, name, root_collection=DEFAULT_ROOT_COLLECTION)
db.collection("#{root_collection}.files").find({'filename' => name}).next_object != nil
end
def open(db, name, mode, options={})
gs = self.new(db, name, mode, options)
result = nil
begin
result = yield gs if block_given?
ensure
gs.close
end
result
end
def read(db, name, length=nil, offset=nil)
GridStore.open(db, name, 'r') { |gs|
gs.seek(offset) if offset
gs.read(length)
}
end
# List the contains of all GridFS files stored in the given db and
# root collection.
#
# Example code:
# :db :: the database to use
#
# require 'mongo/gridfs'
# GridStore.open(database, 'filename', 'w') { |f|
# f.puts "Hello, world!"
# }
# GridStore.open(database, 'filename, 'r') { |f|
# puts f.read # => Hello, world!\n
# }
# GridStore.open(database, 'filename', 'w+') { |f|
# f.puts "But wait, there's more!"
# }
# GridStore.open(database, 'filename, 'r') { |f|
# puts f.read # => Hello, world!\nBut wait, there's more!\n
# }
class GridStore
# :root_collection :: the root collection to use
def list(db, root_collection=DEFAULT_ROOT_COLLECTION)
db.collection("#{root_collection}.files").find().map { |f|
f['filename']
}
end
DEFAULT_ROOT_COLLECTION = 'fs'
DEFAULT_CONTENT_TYPE = 'text/plain'
def readlines(db, name, separator=$/)
GridStore.open(db, name, 'r') { |gs|
gs.readlines(separator)
}
end
include Enumerable
def unlink(db, *names)
names.each { |name|
gs = GridStore.new(db, name)
gs.send(:delete_chunks)
gs.collection.remove('_id' => gs.files_id)
}
end
alias_method :delete, :unlink
attr_accessor :filename
end
# Array of strings; may be +nil+
attr_accessor :aliases
#---
# ================================================================
#+++
# Default is DEFAULT_CONTENT_TYPE
attr_accessor :content_type
# Mode may only be 'r', 'w', or 'w+'.
#
# Options. Descriptions start with a list of the modes for which that
# option is legitimate.
#
# :root :: (r, w, w+) Name of root collection to use, instead of
# DEFAULT_ROOT_COLLECTION.
#
# :metadata:: (w, w+) A hash containing any data you want persisted as
# this file's metadata. See also metadata=
#
# :chunk_size :: (w) Sets chunk size for files opened for writing
# See also chunk_size= which may only be called before
# any data is written.
#
# :content_type :: (w) Default value is DEFAULT_CONTENT_TYPE. See
# also #content_type=
def initialize(db, name, mode='r', options={})
@db, @filename, @mode = db, name, mode
@root = options[:root] || DEFAULT_ROOT_COLLECTION
attr_accessor :metadata
doc = collection.find({'filename' => @filename}).next_object
if doc
@files_id = doc['_id']
@content_type = doc['contentType']
@chunk_size = doc['chunkSize']
@upload_date = doc['uploadDate']
@aliases = doc['aliases']
@length = doc['length']
@metadata = doc['metadata']
@md5 = doc['md5']
else
@files_id = Mongo::ObjectID.new
@content_type = DEFAULT_CONTENT_TYPE
@chunk_size = Chunk::DEFAULT_CHUNK_SIZE
@length = 0
end
attr_reader :files_id
case mode
when 'r'
@curr_chunk = nth_chunk(0)
@position = 0
when 'w'
chunk_collection.create_index([['files_id', Mongo::ASCENDING], ['n', Mongo::ASCENDING]])
delete_chunks
@curr_chunk = Chunk.new(self, 'n' => 0)
@content_type = options[:content_type] if options[:content_type]
@chunk_size = options[:chunk_size] if options[:chunk_size]
@metadata = options[:metadata] if options[:metadata]
@position = 0
when 'w+'
chunk_collection.create_index([['files_id', Mongo::ASCENDING], ['n', Mongo::ASCENDING]])
@curr_chunk = nth_chunk(last_chunk_number) || Chunk.new(self, 'n' => 0) # might be empty
@curr_chunk.pos = @curr_chunk.data.length if @curr_chunk
@metadata = options[:metadata] if options[:metadata]
@position = @length
else
raise "error: illegal mode #{mode}"
end
# Time that the file was first saved.
attr_reader :upload_date
@lineno = 0
@pushback_byte = nil
end
attr_reader :chunk_size
def collection
@db.collection("#{@root}.files")
end
attr_accessor :lineno
# Returns collection used for storing chunks. Depends on value of
# @root.
def chunk_collection
@db.collection("#{@root}.chunks")
end
attr_reader :md5
# Change chunk size. Can only change if the file is opened for write
# and no data has yet been written.
def chunk_size=(size)
unless @mode[0] == ?w && @position == 0 && @upload_date == nil
raise "error: can only change chunk size if open for write and no data written."
end
@chunk_size = size
end
class << self
def exist?(db, name, root_collection=DEFAULT_ROOT_COLLECTION)
db.collection("#{root_collection}.files").find({'filename' => name}).next_object != nil
end
def open(db, name, mode, options={})
gs = self.new(db, name, mode, options)
result = nil
begin
result = yield gs if block_given?
ensure
gs.close
end
result
end
def read(db, name, length=nil, offset=nil)
GridStore.open(db, name, 'r') { |gs|
gs.seek(offset) if offset
gs.read(length)
}
end
# List the contains of all GridFS files stored in the given db and
# root collection.
#
# :db :: the database to use
#
# :root_collection :: the root collection to use
def list(db, root_collection=DEFAULT_ROOT_COLLECTION)
db.collection("#{root_collection}.files").find().map { |f|
f['filename']
}
end
def readlines(db, name, separator=$/)
GridStore.open(db, name, 'r') { |gs|
gs.readlines(separator)
}
end
def unlink(db, *names)
names.each { |name|
gs = GridStore.new(db, name)
gs.send(:delete_chunks)
gs.collection.remove('_id' => gs.files_id)
}
end
alias_method :delete, :unlink
#---
# ================ reading ================
#+++
def getc
if @pushback_byte
byte = @pushback_byte
@pushback_byte = nil
@position += 1
byte
elsif eof?
nil
else
if @curr_chunk.eof?
@curr_chunk = nth_chunk(@curr_chunk.chunk_number + 1)
end
#---
# ================================================================
#+++
# Mode may only be 'r', 'w', or 'w+'.
#
# Options. Descriptions start with a list of the modes for which that
# option is legitimate.
#
# :root :: (r, w, w+) Name of root collection to use, instead of
# DEFAULT_ROOT_COLLECTION.
#
# :metadata:: (w, w+) A hash containing any data you want persisted as
# this file's metadata. See also metadata=
#
# :chunk_size :: (w) Sets chunk size for files opened for writing
# See also chunk_size= which may only be called before
# any data is written.
#
# :content_type :: (w) Default value is DEFAULT_CONTENT_TYPE. See
# also #content_type=
def initialize(db, name, mode='r', options={})
@db, @filename, @mode = db, name, mode
@root = options[:root] || DEFAULT_ROOT_COLLECTION
doc = collection.find({'filename' => @filename}).next_object
if doc
@files_id = doc['_id']
@content_type = doc['contentType']
@chunk_size = doc['chunkSize']
@upload_date = doc['uploadDate']
@aliases = doc['aliases']
@length = doc['length']
@metadata = doc['metadata']
@md5 = doc['md5']
else
@files_id = XGen::Mongo::Driver::ObjectID.new
@content_type = DEFAULT_CONTENT_TYPE
@chunk_size = Chunk::DEFAULT_CHUNK_SIZE
@length = 0
end
case mode
when 'r'
@curr_chunk = nth_chunk(0)
@position = 0
when 'w'
chunk_collection.create_index([['files_id', XGen::Mongo::ASCENDING], ['n', XGen::Mongo::ASCENDING]])
delete_chunks
@curr_chunk = Chunk.new(self, 'n' => 0)
@content_type = options[:content_type] if options[:content_type]
@chunk_size = options[:chunk_size] if options[:chunk_size]
@metadata = options[:metadata] if options[:metadata]
@position = 0
when 'w+'
chunk_collection.create_index([['files_id', XGen::Mongo::ASCENDING], ['n', XGen::Mongo::ASCENDING]])
@curr_chunk = nth_chunk(last_chunk_number) || Chunk.new(self, 'n' => 0) # might be empty
@curr_chunk.pos = @curr_chunk.data.length if @curr_chunk
@metadata = options[:metadata] if options[:metadata]
@position = @length
else
raise "error: illegal mode #{mode}"
end
@lineno = 0
@pushback_byte = nil
end
def collection
@db.collection("#{@root}.files")
end
# Returns collection used for storing chunks. Depends on value of
# @root.
def chunk_collection
@db.collection("#{@root}.chunks")
end
# Change chunk size. Can only change if the file is opened for write
# and no data has yet been written.
def chunk_size=(size)
unless @mode[0] == ?w && @position == 0 && @upload_date == nil
raise "error: can only change chunk size if open for write and no data written."
end
@chunk_size = size
end
#---
# ================ reading ================
#+++
def getc
if @pushback_byte
byte = @pushback_byte
@pushback_byte = nil
@position += 1
byte
elsif eof?
nil
else
if @curr_chunk.eof?
@curr_chunk = nth_chunk(@curr_chunk.chunk_number + 1)
end
@position += 1
@curr_chunk.getc
end
end
def gets(separator=$/)
str = ''
byte = self.getc
return nil if byte == nil # EOF
while byte != nil
s = byte.chr
str << s
break if s == separator
byte = self.getc
end
@lineno += 1
str
end
def read(len=nil, buf=nil)
buf ||= ''
byte = self.getc
while byte != nil && (len == nil || len > 0)
buf << byte.chr
len -= 1 if len
byte = self.getc if (len == nil || len > 0)
end
buf
end
def readchar
byte = self.getc
raise EOFError.new if byte == nil
byte
end
def readline(separator=$/)
line = gets
raise EOFError.new if line == nil
line
end
def readlines(separator=$/)
read.split(separator).collect { |line| "#{line}#{separator}" }
end
def each
line = gets
while line
yield line
line = gets
end
end
alias_method :each_line, :each
def each_byte
byte = self.getc
while byte
yield byte
byte = self.getc
end
end
def ungetc(byte)
@pushback_byte = byte
@position -= 1
end
#---
# ================ writing ================
#+++
def putc(byte)
if @curr_chunk.pos == @chunk_size
prev_chunk_number = @curr_chunk.chunk_number
@curr_chunk.save
@curr_chunk = Chunk.new(self, 'n' => prev_chunk_number + 1)
end
@position += 1
@curr_chunk.putc(byte)
end
def print(*objs)
objs = [$_] if objs == nil || objs.empty?
objs.each { |obj|
str = obj.to_s
str.each_byte { |byte| self.putc(byte) }
}
nil
end
def puts(*objs)
if objs == nil || objs.empty?
self.putc(10)
else
print(*objs.collect{ |obj|
str = obj.to_s
str << "\n" unless str =~ /\n$/
str
})
end
nil
end
def <<(obj)
write(obj.to_s)
end
# Writes +string+ as bytes and returns the number of bytes written.
def write(string)
raise "#@filename not opened for write" unless @mode[0] == ?w
count = 0
string.each_byte { |byte|
self.putc byte
count += 1
}
count
end
# A no-op.
def flush
end
#---
# ================ status ================
#+++
def eof
raise IOError.new("stream not open for reading") unless @mode[0] == ?r
@position >= @length
end
alias_method :eof?, :eof
#---
# ================ positioning ================
#+++
def rewind
if @curr_chunk.chunk_number != 0
if @mode[0] == ?w
delete_chunks
@curr_chunk = Chunk.new(self, 'n' => 0)
else
@curr_chunk == nth_chunk(0)
end
end
@curr_chunk.pos = 0
@lineno = 0
@position = 0
end
def seek(pos, whence=IO::SEEK_SET)
target_pos = case whence
when IO::SEEK_CUR
@position + pos
when IO::SEEK_END
@length + pos
when IO::SEEK_SET
pos
end
new_chunk_number = (target_pos / @chunk_size).to_i
if new_chunk_number != @curr_chunk.chunk_number
@curr_chunk.save if @mode[0] == ?w
@curr_chunk = nth_chunk(new_chunk_number)
end
@position = target_pos
@curr_chunk.pos = @position % @chunk_size
0
end
def tell
@position
end
#---
# ================ closing ================
#+++
def close
if @mode[0] == ?w
if @curr_chunk
@curr_chunk.truncate
@curr_chunk.save if @curr_chunk.pos > 0
end
files = collection
if @upload_date
files.remove('_id' => @files_id)
else
@upload_date = Time.now
end
files.insert(to_mongo_object)
end
@db = nil
end
def closed?
@db == nil
end
#---
# ================ protected ================
#+++
protected
def to_mongo_object
h = OrderedHash.new
h['_id'] = @files_id
h['filename'] = @filename
h['contentType'] = @content_type
h['length'] = @curr_chunk ? @curr_chunk.chunk_number * @chunk_size + @curr_chunk.pos : 0
h['chunkSize'] = @chunk_size
h['uploadDate'] = @upload_date
h['aliases'] = @aliases
h['metadata'] = @metadata
md5_command = OrderedHash.new
md5_command['filemd5'] = @files_id
md5_command['root'] = @root
h['md5'] = @db.db_command(md5_command)['md5']
h
end
def delete_chunks
chunk_collection.remove({'files_id' => @files_id}) if @files_id
@curr_chunk = nil
end
def nth_chunk(n)
mongo_chunk = chunk_collection.find({'files_id' => @files_id, 'n' => n}).next_object
Chunk.new(self, mongo_chunk || {})
end
def last_chunk_number
(@length / @chunk_size).to_i
end
@position += 1
@curr_chunk.getc
end
end
def gets(separator=$/)
str = ''
byte = self.getc
return nil if byte == nil # EOF
while byte != nil
s = byte.chr
str << s
break if s == separator
byte = self.getc
end
@lineno += 1
str
end
def read(len=nil, buf=nil)
buf ||= ''
byte = self.getc
while byte != nil && (len == nil || len > 0)
buf << byte.chr
len -= 1 if len
byte = self.getc if (len == nil || len > 0)
end
buf
end
def readchar
byte = self.getc
raise EOFError.new if byte == nil
byte
end
def readline(separator=$/)
line = gets
raise EOFError.new if line == nil
line
end
def readlines(separator=$/)
read.split(separator).collect { |line| "#{line}#{separator}" }
end
def each
line = gets
while line
yield line
line = gets
end
end
alias_method :each_line, :each
def each_byte
byte = self.getc
while byte
yield byte
byte = self.getc
end
end
def ungetc(byte)
@pushback_byte = byte
@position -= 1
end
#---
# ================ writing ================
#+++
def putc(byte)
if @curr_chunk.pos == @chunk_size
prev_chunk_number = @curr_chunk.chunk_number
@curr_chunk.save
@curr_chunk = Chunk.new(self, 'n' => prev_chunk_number + 1)
end
@position += 1
@curr_chunk.putc(byte)
end
def print(*objs)
objs = [$_] if objs == nil || objs.empty?
objs.each { |obj|
str = obj.to_s
str.each_byte { |byte| self.putc(byte) }
}
nil
end
def puts(*objs)
if objs == nil || objs.empty?
self.putc(10)
else
print(*objs.collect{ |obj|
str = obj.to_s
str << "\n" unless str =~ /\n$/
str
})
end
nil
end
def <<(obj)
write(obj.to_s)
end
# Writes +string+ as bytes and returns the number of bytes written.
def write(string)
raise "#@filename not opened for write" unless @mode[0] == ?w
count = 0
string.each_byte { |byte|
self.putc byte
count += 1
}
count
end
# A no-op.
def flush
end
#---
# ================ status ================
#+++
def eof
raise IOError.new("stream not open for reading") unless @mode[0] == ?r
@position >= @length
end
alias_method :eof?, :eof
#---
# ================ positioning ================
#+++
def rewind
if @curr_chunk.chunk_number != 0
if @mode[0] == ?w
delete_chunks
@curr_chunk = Chunk.new(self, 'n' => 0)
else
@curr_chunk == nth_chunk(0)
end
end
@curr_chunk.pos = 0
@lineno = 0
@position = 0
end
def seek(pos, whence=IO::SEEK_SET)
target_pos = case whence
when IO::SEEK_CUR
@position + pos
when IO::SEEK_END
@length + pos
when IO::SEEK_SET
pos
end
new_chunk_number = (target_pos / @chunk_size).to_i
if new_chunk_number != @curr_chunk.chunk_number
@curr_chunk.save if @mode[0] == ?w
@curr_chunk = nth_chunk(new_chunk_number)
end
@position = target_pos
@curr_chunk.pos = @position % @chunk_size
0
end
def tell
@position
end
#---
# ================ closing ================
#+++
def close
if @mode[0] == ?w
if @curr_chunk
@curr_chunk.truncate
@curr_chunk.save if @curr_chunk.pos > 0
end
files = collection
if @upload_date
files.remove('_id' => @files_id)
else
@upload_date = Time.now
end
files.insert(to_mongo_object)
end
@db = nil
end
def closed?
@db == nil
end
#---
# ================ protected ================
#+++
protected
def to_mongo_object
h = OrderedHash.new
h['_id'] = @files_id
h['filename'] = @filename
h['contentType'] = @content_type
h['length'] = @curr_chunk ? @curr_chunk.chunk_number * @chunk_size + @curr_chunk.pos : 0
h['chunkSize'] = @chunk_size
h['uploadDate'] = @upload_date
h['aliases'] = @aliases
h['metadata'] = @metadata
md5_command = OrderedHash.new
md5_command['filemd5'] = @files_id
md5_command['root'] = @root
h['md5'] = @db.db_command(md5_command)['md5']
h
end
def delete_chunks
chunk_collection.remove({'files_id' => @files_id}) if @files_id
@curr_chunk = nil
end
def nth_chunk(n)
mongo_chunk = chunk_collection.find({'files_id' => @files_id, 'n' => n}).next_object
Chunk.new(self, mongo_chunk || {})
end
def last_chunk_number
(@length / @chunk_size).to_i
end
end
end

View File

@ -17,21 +17,16 @@
require 'mongo/message/message'
require 'mongo/message/opcodes'
module XGen
module Mongo
module Driver
module Mongo
class GetMoreMessage < Message
class GetMoreMessage < Message
def initialize(db_name, collection_name, cursor)
super(OP_GET_MORE)
write_int(0)
write_string("#{db_name}.#{collection_name}")
write_int(0) # num to return; leave it up to the db for now
write_long(cursor)
end
end
def initialize(db_name, collection_name, cursor)
super(OP_GET_MORE)
write_int(0)
write_string("#{db_name}.#{collection_name}")
write_int(0) # num to return; leave it up to the db for now
write_long(cursor)
end
end
end

View File

@ -17,19 +17,15 @@
require 'mongo/message/message'
require 'mongo/message/opcodes'
module XGen
module Mongo
module Driver
module Mongo
class InsertMessage < Message
class InsertMessage < Message
def initialize(db_name, collection_name, check_keys=true, *objs)
super(OP_INSERT)
write_int(0)
write_string("#{db_name}.#{collection_name}")
objs.each { |o| write_doc(o, check_keys) }
end
end
def initialize(db_name, collection_name, check_keys=true, *objs)
super(OP_INSERT)
write_int(0)
write_string("#{db_name}.#{collection_name}")
objs.each { |o| write_doc(o, check_keys) }
end
end
end

View File

@ -17,20 +17,15 @@
require 'mongo/message/message'
require 'mongo/message/opcodes'
module XGen
module Mongo
module Driver
module Mongo
class KillCursorsMessage < Message
class KillCursorsMessage < Message
def initialize(*cursors)
super(OP_KILL_CURSORS)
write_int(0)
write_int(cursors.length)
cursors.each { |c| write_long c }
end
end
def initialize(*cursors)
super(OP_KILL_CURSORS)
write_int(0)
write_int(cursors.length)
cursors.each { |c| write_long c }
end
end
end

View File

@ -17,68 +17,64 @@
require 'mongo/util/bson'
require 'mongo/util/byte_buffer'
module XGen
module Mongo
module Driver
module Mongo
class Message
class Message
HEADER_SIZE = 16 # size, id, response_to, opcode
HEADER_SIZE = 16 # size, id, response_to, opcode
@@class_req_id = 0
@@class_req_id = 0
attr_reader :buf # for testing
attr_reader :buf # for testing
def initialize(op)
@op = op
@message_length = HEADER_SIZE
@data_length = 0
@request_id = (@@class_req_id += 1)
@response_id = 0
@buf = ByteBuffer.new
def initialize(op)
@op = op
@message_length = HEADER_SIZE
@data_length = 0
@request_id = (@@class_req_id += 1)
@response_id = 0
@buf = ByteBuffer.new
@buf.put_int(16) # holder for length
@buf.put_int(@request_id)
@buf.put_int(0) # response_to
@buf.put_int(op)
end
def write_int(i)
@buf.put_int(i)
update_message_length
end
def write_long(i)
@buf.put_long(i)
update_message_length
end
def write_string(s)
BSON.serialize_cstr(@buf, s)
update_message_length
end
def write_doc(hash, check_keys=false)
@buf.put_array(BSON.new.serialize(hash, check_keys).to_a)
update_message_length
end
def to_a
@buf.to_a
end
def dump
@buf.dump
end
# Do not call. Private, but kept public for testing.
def update_message_length
pos = @buf.position
@buf.put_int(@buf.size, 0)
@buf.position = pos
end
end
@buf.put_int(16) # holder for length
@buf.put_int(@request_id)
@buf.put_int(0) # response_to
@buf.put_int(op)
end
def write_int(i)
@buf.put_int(i)
update_message_length
end
def write_long(i)
@buf.put_long(i)
update_message_length
end
def write_string(s)
BSON.serialize_cstr(@buf, s)
update_message_length
end
def write_doc(hash, check_keys=false)
@buf.put_array(BSON.new.serialize(hash, check_keys).to_a)
update_message_length
end
def to_a
@buf.to_a
end
def dump
@buf.dump
end
# Do not call. Private, but kept public for testing.
def update_message_length
pos = @buf.position
@buf.put_int(@buf.size, 0)
@buf.position = pos
end
end
end

View File

@ -16,35 +16,30 @@
require 'mongo/util/byte_buffer'
module XGen
module Mongo
module Driver
module Mongo
class MessageHeader
class MessageHeader
HEADER_SIZE = 16
HEADER_SIZE = 16
def initialize()
@buf = ByteBuffer.new
end
def initialize()
@buf = ByteBuffer.new
end
def read_header(db)
@buf.rewind
@buf.put_array(db.receive_full(HEADER_SIZE).unpack("C*"))
raise "Short read for DB response header: expected #{HEADER_SIZE} bytes, saw #{@buf.size}" unless @buf.size == HEADER_SIZE
@buf.rewind
@size = @buf.get_int
@request_id = @buf.get_int
@response_to = @buf.get_int
@op = @buf.get_int
self
end
def read_header(db)
@buf.rewind
@buf.put_array(db.receive_full(HEADER_SIZE).unpack("C*"))
raise "Short read for DB response header: expected #{HEADER_SIZE} bytes, saw #{@buf.size}" unless @buf.size == HEADER_SIZE
@buf.rewind
@size = @buf.get_int
@request_id = @buf.get_int
@response_to = @buf.get_int
@op = @buf.get_int
self
end
def dump
@buf.dump
end
end
def dump
@buf.dump
end
end
end

View File

@ -17,17 +17,13 @@
require 'mongo/message/message'
require 'mongo/message/opcodes'
module XGen
module Mongo
module Driver
module Mongo
class MsgMessage < Message
class MsgMessage < Message
def initialize(msg)
super(OP_MSG)
write_string(msg)
end
end
def initialize(msg)
super(OP_MSG)
write_string(msg)
end
end
end

View File

@ -14,19 +14,14 @@
# limitations under the License.
# ++
module XGen
module Mongo
module Driver
OP_REPLY = 1 # reply. responseTo is set.
OP_MSG = 1000 # generic msg command followed by a string
OP_UPDATE = 2001 # update object
OP_INSERT = 2002
# GET_BY_OID = 2003
OP_QUERY = 2004
OP_GET_MORE = 2005
OP_DELETE = 2006
OP_KILL_CURSORS = 2007
end
end
module Mongo
OP_REPLY = 1 # reply. responseTo is set.
OP_MSG = 1000 # generic msg command followed by a string
OP_UPDATE = 2001 # update object
OP_INSERT = 2002
# GET_BY_OID = 2003
OP_QUERY = 2004
OP_GET_MORE = 2005
OP_DELETE = 2006
OP_KILL_CURSORS = 2007
end

View File

@ -18,60 +18,56 @@ require 'mongo/message/message'
require 'mongo/message/opcodes'
require 'mongo/util/ordered_hash'
module XGen
module Mongo
module Driver
module Mongo
class QueryMessage < Message
class QueryMessage < Message
attr_reader :query
attr_reader :query
def initialize(db_name, collection_name, query)
super(OP_QUERY)
@query = query
write_int(0)
write_string("#{db_name}.#{collection_name}")
write_int(query.number_to_skip)
write_int(-query.number_to_return) # Negative means hard limit
sel = query.selector
if query.contains_special_fields
sel = OrderedHash.new
sel['query'] = query.selector
if query.order_by && query.order_by.length > 0
sel['orderby'] = case query.order_by
def initialize(db_name, collection_name, query)
super(OP_QUERY)
@query = query
write_int(0)
write_string("#{db_name}.#{collection_name}")
write_int(query.number_to_skip)
write_int(-query.number_to_return) # Negative means hard limit
sel = query.selector
if query.contains_special_fields
sel = OrderedHash.new
sel['query'] = query.selector
if query.order_by && query.order_by.length > 0
sel['orderby'] = case query.order_by
when String
{query.order_by => 1}
when Array
h = OrderedHash.new
query.order_by.each { |ob|
case ob
when String
{query.order_by => 1}
when Array
h = OrderedHash.new
query.order_by.each { |ob|
case ob
when String
h[ob] = 1
when Hash # should have one entry; will handle all
ob.each { |k,v| h[k] = v }
else
raise "illegal query order_by value #{query.order_by.inspect}"
end
}
h
when Hash # Should be an ordered hash, but this message doesn't care
query.order_by
h[ob] = 1
when Hash # should have one entry; will handle all
ob.each { |k,v| h[k] = v }
else
raise "illegal order_by: is a #{query.order_by.class.name}, must be String, Array, Hash, or OrderedHash"
raise "illegal query order_by value #{query.order_by.inspect}"
end
end
sel['$hint'] = query.hint if query.hint && query.hint.length > 0
sel['$explain'] = true if query.explain
sel['$snapshot'] = true if query.snapshot
end
write_doc(sel)
write_doc(query.fields) if query.fields
end
def first_key(key)
@first_key = key
}
h
when Hash # Should be an ordered hash, but this message doesn't care
query.order_by
else
raise "illegal order_by: is a #{query.order_by.class.name}, must be String, Array, Hash, or OrderedHash"
end
end
sel['$hint'] = query.hint if query.hint && query.hint.length > 0
sel['$explain'] = true if query.explain
sel['$snapshot'] = true if query.snapshot
end
write_doc(sel)
write_doc(query.fields) if query.fields
end
def first_key(key)
@first_key = key
end
end
end

View File

@ -17,20 +17,16 @@
require 'mongo/message/message'
require 'mongo/message/opcodes'
module XGen
module Mongo
module Driver
module Mongo
class RemoveMessage < Message
class RemoveMessage < Message
def initialize(db_name, collection_name, sel)
super(OP_DELETE)
write_int(0)
write_string("#{db_name}.#{collection_name}")
write_int(0) # flags?
write_doc(sel)
end
end
def initialize(db_name, collection_name, sel)
super(OP_DELETE)
write_int(0)
write_string("#{db_name}.#{collection_name}")
write_int(0) # flags?
write_doc(sel)
end
end
end

View File

@ -17,21 +17,17 @@
require 'mongo/message/message'
require 'mongo/message/opcodes'
module XGen
module Mongo
module Driver
module Mongo
class UpdateMessage < Message
class UpdateMessage < Message
def initialize(db_name, collection_name, sel, obj, repsert)
super(OP_UPDATE)
write_int(0)
write_string("#{db_name}.#{collection_name}")
write_int(repsert ? 1 : 0) # 1 if a repsert operation (upsert)
write_doc(sel)
write_doc(obj)
end
end
def initialize(db_name, collection_name, sel, obj, repsert)
super(OP_UPDATE)
write_int(0)
write_string("#{db_name}.#{collection_name}")
write_int(repsert ? 1 : 0) # 1 if a repsert operation (upsert)
write_doc(sel)
write_doc(obj)
end
end
end

View File

@ -16,149 +16,144 @@
require 'mongo/db'
module XGen
module Mongo
module Driver
module Mongo
# Represents a Mongo database server.
class Mongo
# Represents a Mongo database server.
class Mongo
DEFAULT_PORT = 27017
DEFAULT_PORT = 27017
# Create a Mongo database server instance. You specify either one or a
# pair of servers. If one, you also say if connecting to a slave is
# OK. In either case, the host default is "localhost" and port default
# is DEFAULT_PORT.
#
# If you specify a pair, pair_or_host is a hash with two keys :left
# and :right. Each key maps to either
# * a server name, in which case port is DEFAULT_PORT
# * a port number, in which case server is "localhost"
# * an array containing a server name and a port number in that order
#
# +options+ are passed on to each DB instance:
#
# :slave_ok :: Only used if one host is specified. If false, when
# connecting to that host/port a DB object will check to
# see if the server is the master. If it is not, an error
# is thrown.
#
# :auto_reconnect :: If a DB connection gets closed (for example, we
# have a server pair and saw the "not master"
# error, which closes the connection), then
# automatically try to reconnect to the master or
# to the single server we have been given. Defaults
# to +false+.
#
# Since that's so confusing, here are a few examples:
#
# Mongo.new # localhost, DEFAULT_PORT, !slave
# Mongo.new("localhost") # localhost, DEFAULT_PORT, !slave
# Mongo.new("localhost", 3000) # localhost, 3000, slave not ok
# # localhost, 3000, slave ok
# Mongo.new("localhost", 3000, :slave_ok => true)
# # localhost, DEFAULT_PORT, auto reconnect
# Mongo.new(nil, nil, :auto_reconnect => true)
#
# # A pair of servers. DB will always talk to the master. On socket
# # error or "not master" error, we will auto-reconnect to the
# # current master.
# Mongo.new({:left => ["db1.example.com", 3000],
# :right => "db2.example.com"}, # DEFAULT_PORT
# nil, :auto_reconnect => true)
#
# # Here, :right is localhost/DEFAULT_PORT. No auto-reconnect.
# Mongo.new({:left => ["db1.example.com", 3000]})
#
# When a DB object first connects to a pair, it will find the master
# instance and connect to that one.
def initialize(pair_or_host=nil, port=nil, options={})
@pair = case pair_or_host
when String
[[pair_or_host, port ? port.to_i : DEFAULT_PORT]]
when Hash
connections = []
connections << pair_val_to_connection(pair_or_host[:left])
connections << pair_val_to_connection(pair_or_host[:right])
connections
when nil
[['localhost', DEFAULT_PORT]]
end
@options = options
end
# Create a Mongo database server instance. You specify either one or a
# pair of servers. If one, you also say if connecting to a slave is
# OK. In either case, the host default is "localhost" and port default
# is DEFAULT_PORT.
#
# If you specify a pair, pair_or_host is a hash with two keys :left
# and :right. Each key maps to either
# * a server name, in which case port is DEFAULT_PORT
# * a port number, in which case server is "localhost"
# * an array containing a server name and a port number in that order
#
# +options+ are passed on to each DB instance:
#
# :slave_ok :: Only used if one host is specified. If false, when
# connecting to that host/port a DB object will check to
# see if the server is the master. If it is not, an error
# is thrown.
#
# :auto_reconnect :: If a DB connection gets closed (for example, we
# have a server pair and saw the "not master"
# error, which closes the connection), then
# automatically try to reconnect to the master or
# to the single server we have been given. Defaults
# to +false+.
#
# Since that's so confusing, here are a few examples:
#
# Mongo.new # localhost, DEFAULT_PORT, !slave
# Mongo.new("localhost") # localhost, DEFAULT_PORT, !slave
# Mongo.new("localhost", 3000) # localhost, 3000, slave not ok
# # localhost, 3000, slave ok
# Mongo.new("localhost", 3000, :slave_ok => true)
# # localhost, DEFAULT_PORT, auto reconnect
# Mongo.new(nil, nil, :auto_reconnect => true)
#
# # A pair of servers. DB will always talk to the master. On socket
# # error or "not master" error, we will auto-reconnect to the
# # current master.
# Mongo.new({:left => ["db1.example.com", 3000],
# :right => "db2.example.com"}, # DEFAULT_PORT
# nil, :auto_reconnect => true)
#
# # Here, :right is localhost/DEFAULT_PORT. No auto-reconnect.
# Mongo.new({:left => ["db1.example.com", 3000]})
#
# When a DB object first connects to a pair, it will find the master
# instance and connect to that one.
def initialize(pair_or_host=nil, port=nil, options={})
@pair = case pair_or_host
when String
[[pair_or_host, port ? port.to_i : DEFAULT_PORT]]
when Hash
connections = []
connections << pair_val_to_connection(pair_or_host[:left])
connections << pair_val_to_connection(pair_or_host[:right])
connections
when nil
[['localhost', DEFAULT_PORT]]
end
@options = options
end
# Return the XGen::Mongo::Driver::DB named +db_name+. The slave_ok and
# auto_reconnect options passed in via #new may be overridden here.
# See DB#new for other options you can pass in.
def db(db_name, options={})
XGen::Mongo::Driver::DB.new(db_name, @pair, @options.merge(options))
end
# Return the Mongo::DB named +db_name+. The slave_ok and
# auto_reconnect options passed in via #new may be overridden here.
# See DB#new for other options you can pass in.
def db(db_name, options={})
DB.new(db_name, @pair, @options.merge(options))
end
# Returns a hash containing database names as keys and disk space for
# each as values.
def database_info
doc = single_db_command('admin', :listDatabases => 1)
h = {}
doc['databases'].each { |db|
h[db['name']] = db['sizeOnDisk'].to_i
}
h
end
# Returns a hash containing database names as keys and disk space for
# each as values.
def database_info
doc = single_db_command('admin', :listDatabases => 1)
h = {}
doc['databases'].each { |db|
h[db['name']] = db['sizeOnDisk'].to_i
}
h
end
# Returns an array of database names.
def database_names
database_info.keys
end
# Returns an array of database names.
def database_names
database_info.keys
end
# Not implemented.
def clone_database(from)
raise "not implemented"
end
# Not implemented.
def clone_database(from)
raise "not implemented"
end
# Not implemented.
def copy_database(from_host, from_db, to_db)
raise "not implemented"
end
# Not implemented.
def copy_database(from_host, from_db, to_db)
raise "not implemented"
end
# Drops the database +name+.
def drop_database(name)
single_db_command(name, :dropDatabase => 1)
end
# Drops the database +name+.
def drop_database(name)
single_db_command(name, :dropDatabase => 1)
end
protected
# Turns an array containing a host name string and a
# port number integer into a [host, port] pair array.
def pair_val_to_connection(a)
case a
when nil
['localhost', DEFAULT_PORT]
when String
[a, DEFAULT_PORT]
when Integer
['localhost', a]
when Array
a
end
end
# Send cmd (a hash, possibly ordered) to the admin database and return
# the answer. Raises an error unless the return is "ok" (DB#ok?
# returns +true+).
def single_db_command(db_name, cmd)
db = nil
begin
db = db(db_name)
doc = db.db_command(cmd)
raise "error retrieving database info: #{doc.inspect}" unless db.ok?(doc)
doc
ensure
db.close if db
end
end
protected
# Turns an array containing a host name string and a
# port number integer into a [host, port] pair array.
def pair_val_to_connection(a)
case a
when nil
['localhost', DEFAULT_PORT]
when String
[a, DEFAULT_PORT]
when Integer
['localhost', a]
when Array
a
end
end
# Send cmd (a hash, possibly ordered) to the admin database and return
# the answer. Raises an error unless the return is "ok" (DB#ok?
# returns +true+).
def single_db_command(db_name, cmd)
db = nil
begin
db = db(db_name)
doc = db.db_command(cmd)
raise "error retrieving database info: #{doc.inspect}" unless db.ok?(doc)
doc
ensure
db.close if db
end
end
end
end

View File

@ -18,102 +18,98 @@ require 'mongo/collection'
require 'mongo/message'
require 'mongo/types/code'
module XGen
module Mongo
module Driver
module Mongo
# A query against a collection. A query's selector is a hash. See the
# Mongo documentation for query details.
class Query
# A query against a collection. A query's selector is a hash. See the
# Mongo documentation for query details.
class Query
attr_accessor :number_to_skip, :number_to_return, :order_by, :snapshot
# If true, $explain will be set in QueryMessage that uses this query.
attr_accessor :explain
# Either +nil+ or a hash (preferably an OrderedHash).
attr_accessor :hint
attr_reader :selector # writer defined below
attr_accessor :number_to_skip, :number_to_return, :order_by, :snapshot
# If true, $explain will be set in QueryMessage that uses this query.
attr_accessor :explain
# Either +nil+ or a hash (preferably an OrderedHash).
attr_accessor :hint
attr_reader :selector # writer defined below
# sel :: A hash describing the query. See the Mongo docs for details.
#
# return_fields :: If not +nil+, a single field name or an array of
# field names. Only those fields will be returned.
# (Called :fields in calls to Collection#find.)
#
# number_to_skip :: Number of records to skip before returning
# records. (Called :offset in calls to
# Collection#find.) Default is 0.
#
# number_to_return :: Max number of records to return. (Called :limit
# in calls to Collection#find.) Default is 0 (all
# records).
#
# order_by :: If not +nil+, specifies record sort order. May be a
# String, Hash, OrderedHash, or Array. If a string, the
# results will be ordered by that field in ascending
# order. If an array, it should be an array of field names
# which will all be sorted in ascending order. If a hash,
# it may be either a regular Hash or an OrderedHash. The
# keys should be field names, and the values should be 1
# (ascending) or -1 (descending). Note that if it is a
# regular Hash then sorting by more than one field
# probably will not be what you intend because key order
# is not preserved. (order_by is called :sort in calls to
# Collection#find.)
#
# hint :: If not +nil+, specifies query hint fields. Must be either
# +nil+ or a hash (preferably an OrderedHash). See
# Collection#hint.
def initialize(sel={}, return_fields=nil, number_to_skip=0, number_to_return=0, order_by=nil, hint=nil, snapshot=nil)
@number_to_skip, @number_to_return, @order_by, @hint, @snapshot =
number_to_skip, number_to_return, order_by, hint, snapshot
@explain = nil
self.selector = sel
self.fields = return_fields
end
# Set query selector hash. If sel is Code/string, it will be used as a
# $where clause. (See Mongo docs for details.)
def selector=(sel)
@selector = case sel
when nil
{}
when Code
{"$where" => sel}
when String
{"$where" => Code.new(sel)}
when Hash
sel
end
end
# Set fields to return. If +val+ is +nil+ or empty, all fields will be
# returned.
def fields=(val)
@fields = val
@fields = nil if @fields && @fields.empty?
end
def fields
case @fields
when String
{@fields => 1}
when Array
if @fields.length == 0
nil
else
h = {}
@fields.each { |field| h[field] = 1 }
h
end
else # nil, anything else
nil
end
end
def contains_special_fields
(@order_by != nil && @order_by.length > 0) || @explain || @hint || @snapshot
# sel :: A hash describing the query. See the Mongo docs for details.
#
# return_fields :: If not +nil+, a single field name or an array of
# field names. Only those fields will be returned.
# (Called :fields in calls to Collection#find.)
#
# number_to_skip :: Number of records to skip before returning
# records. (Called :offset in calls to
# Collection#find.) Default is 0.
#
# number_to_return :: Max number of records to return. (Called :limit
# in calls to Collection#find.) Default is 0 (all
# records).
#
# order_by :: If not +nil+, specifies record sort order. May be a
# String, Hash, OrderedHash, or Array. If a string, the
# results will be ordered by that field in ascending
# order. If an array, it should be an array of field names
# which will all be sorted in ascending order. If a hash,
# it may be either a regular Hash or an OrderedHash. The
# keys should be field names, and the values should be 1
# (ascending) or -1 (descending). Note that if it is a
# regular Hash then sorting by more than one field
# probably will not be what you intend because key order
# is not preserved. (order_by is called :sort in calls to
# Collection#find.)
#
# hint :: If not +nil+, specifies query hint fields. Must be either
# +nil+ or a hash (preferably an OrderedHash). See
# Collection#hint.
def initialize(sel={}, return_fields=nil, number_to_skip=0, number_to_return=0, order_by=nil, hint=nil, snapshot=nil)
@number_to_skip, @number_to_return, @order_by, @hint, @snapshot =
number_to_skip, number_to_return, order_by, hint, snapshot
@explain = nil
self.selector = sel
self.fields = return_fields
end
# Set query selector hash. If sel is Code/string, it will be used as a
# $where clause. (See Mongo docs for details.)
def selector=(sel)
@selector = case sel
when nil
{}
when Code
{"$where" => sel}
when String
{"$where" => Code.new(sel)}
when Hash
sel
end
end
# Set fields to return. If +val+ is +nil+ or empty, all fields will be
# returned.
def fields=(val)
@fields = val
@fields = nil if @fields && @fields.empty?
end
def fields
case @fields
when String
{@fields => 1}
when Array
if @fields.length == 0
nil
else
h = {}
@fields.each { |field| h[field] = 1 }
h
end
else # nil, anything else
nil
end
end
def contains_special_fields
(@order_by != nil && @order_by.length > 0) || @explain || @hint || @snapshot
end
end
end

View File

@ -16,27 +16,23 @@
require 'mongo/util/byte_buffer'
module XGen
module Mongo
module Driver
module Mongo
# An array of binary bytes with a Mongo subtype value.
class Binary < ByteBuffer
# An array of binary bytes with a Mongo subtype value.
class Binary < ByteBuffer
SUBTYPE_BYTES = 0x02
SUBTYPE_UUID = 0x03
SUBTYPE_MD5 = 0x05
SUBTYPE_USER_DEFINED = 0x80
SUBTYPE_BYTES = 0x02
SUBTYPE_UUID = 0x03
SUBTYPE_MD5 = 0x05
SUBTYPE_USER_DEFINED = 0x80
# One of the SUBTYPE_* constants. Default is SUBTYPE_BYTES.
attr_accessor :subtype
# One of the SUBTYPE_* constants. Default is SUBTYPE_BYTES.
attr_accessor :subtype
def initialize(initial_data=[], subtype=SUBTYPE_BYTES)
super(initial_data)
@subtype = subtype
end
end
def initialize(initial_data=[], subtype=SUBTYPE_BYTES)
super(initial_data)
@subtype = subtype
end
end
end

View File

@ -14,21 +14,17 @@
# limitations under the License.
# ++
module XGen
module Mongo
module Driver
module Mongo
# JavaScript code to be evaluated by MongoDB
class Code < String
# Hash mapping identifiers to their values
attr_accessor :scope
# JavaScript code to be evaluated by MongoDB
class Code < String
# Hash mapping identifiers to their values
attr_accessor :scope
def initialize(code, scope={})
super(code)
@scope = scope
end
end
def initialize(code, scope={})
super(code)
@scope = scope
end
end
end

View File

@ -14,24 +14,20 @@
# limitations under the License.
# ++
module XGen
module Mongo
module Driver
module Mongo
class DBRef
class DBRef
attr_reader :namespace, :object_id
attr_reader :namespace, :object_id
def initialize(namespace, object_id)
@namespace, @object_id =
namespace, object_id
end
def to_s
"ns: #{namespace}, id: #{object_id}"
end
end
def initialize(namespace, object_id)
@namespace, @object_id =
namespace, object_id
end
def to_s
"ns: #{namespace}, id: #{object_id}"
end
end
end

View File

@ -17,121 +17,117 @@
require 'mutex_m'
require 'mongo/util/byte_buffer'
module XGen
module Mongo
module Driver
module Mongo
# Implementation of the Babble OID. Object ids are not required by
# Mongo, but they make certain operations more efficient.
#
# The driver does not automatically assign ids to records that are
# inserted. (An upcoming feature will allow you to give an id "factory"
# to a database and/or a collection.)
#
# 12 bytes
# ---
# 0 time
# 1
# 2
# 3
# 4 machine
# 5
# 6
# 7 pid
# 8
# 9 inc
# 10
# 11
class ObjectID
# Implementation of the Babble OID. Object ids are not required by
# Mongo, but they make certain operations more efficient.
#
# The driver does not automatically assign ids to records that are
# inserted. (An upcoming feature will allow you to give an id "factory"
# to a database and/or a collection.)
#
# 12 bytes
# ---
# 0 time
# 1
# 2
# 3
# 4 machine
# 5
# 6
# 7 pid
# 8
# 9 inc
# 10
# 11
class ObjectID
MACHINE = ( val = rand(0x1000000); [val & 0xff, (val >> 8) & 0xff, (val >> 16) & 0xff] )
PID = ( val = rand(0x10000); [val & 0xff, (val >> 8) & 0xff]; )
MACHINE = ( val = rand(0x1000000); [val & 0xff, (val >> 8) & 0xff, (val >> 16) & 0xff] )
PID = ( val = rand(0x10000); [val & 0xff, (val >> 8) & 0xff]; )
# The string representation of an OID is different than its internal
# and BSON byte representations. The BYTE_ORDER here maps
# internal/BSON byte position (the index in BYTE_ORDER) to the
# position of the two hex characters representing that byte in the
# string representation. For example, the 0th BSON byte corresponds to
# the (0-based) 7th pair of hex chars in the string.
BYTE_ORDER = [7, 6, 5, 4, 3, 2, 1, 0, 11, 10, 9, 8]
# The string representation of an OID is different than its internal
# and BSON byte representations. The BYTE_ORDER here maps
# internal/BSON byte position (the index in BYTE_ORDER) to the
# position of the two hex characters representing that byte in the
# string representation. For example, the 0th BSON byte corresponds to
# the (0-based) 7th pair of hex chars in the string.
BYTE_ORDER = [7, 6, 5, 4, 3, 2, 1, 0, 11, 10, 9, 8]
LOCK = Object.new
LOCK.extend Mutex_m
LOCK = Object.new
LOCK.extend Mutex_m
@@index_time = Time.new.to_i
@@index = 0
@@index_time = Time.new.to_i
@@index = 0
# Given a string representation of an ObjectID, return a new ObjectID
# with that value.
def self.from_string(str)
raise "illegal ObjectID format" unless legal?(str)
data = []
BYTE_ORDER.each_with_index { |string_position, data_index|
data[data_index] = str[string_position * 2, 2].to_i(16)
}
self.new(data)
end
def self.legal?(str)
len = BYTE_ORDER.length * 2
str =~ /([0-9a-f]+)/i
match = $1
str && str.length == len && match == str
end
# +data+ is an array of bytes. If nil, a new id will be generated.
# The time +t+ is only used for testing; leave it nil.
def initialize(data=nil, t=nil)
@data = data || generate_id(t)
end
def eql?(other)
@data == other.instance_variable_get("@data")
end
alias_method :==, :eql?
def to_a
@data.dup
end
def to_s
str = ' ' * 24
BYTE_ORDER.each_with_index { |string_position, data_index|
str[string_position * 2, 2] = '%02x' % @data[data_index]
}
str
end
# (Would normally be private, but isn't so we can test it.)
def generate_id(t=nil)
t ||= Time.new.to_i
buf = ByteBuffer.new
buf.put_int(t & 0xffffffff)
buf.put_array(MACHINE)
buf.put_array(PID)
i = index_for_time(t)
buf.put(i & 0xff)
buf.put((i >> 8) & 0xff)
buf.put((i >> 16) & 0xff)
buf.rewind
buf.to_a.dup
end
# (Would normally be private, but isn't so we can test it.)
def index_for_time(t)
LOCK.mu_synchronize {
if t != @@index_time
@@index = 0
@@index_time = t
end
retval = @@index
@@index += 1
retval
}
end
end
# Given a string representation of an ObjectID, return a new ObjectID
# with that value.
def self.from_string(str)
raise "illegal ObjectID format" unless legal?(str)
data = []
BYTE_ORDER.each_with_index { |string_position, data_index|
data[data_index] = str[string_position * 2, 2].to_i(16)
}
self.new(data)
end
def self.legal?(str)
len = BYTE_ORDER.length * 2
str =~ /([0-9a-f]+)/i
match = $1
str && str.length == len && match == str
end
# +data+ is an array of bytes. If nil, a new id will be generated.
# The time +t+ is only used for testing; leave it nil.
def initialize(data=nil, t=nil)
@data = data || generate_id(t)
end
def eql?(other)
@data == other.instance_variable_get("@data")
end
alias_method :==, :eql?
def to_a
@data.dup
end
def to_s
str = ' ' * 24
BYTE_ORDER.each_with_index { |string_position, data_index|
str[string_position * 2, 2] = '%02x' % @data[data_index]
}
str
end
# (Would normally be private, but isn't so we can test it.)
def generate_id(t=nil)
t ||= Time.new.to_i
buf = ByteBuffer.new
buf.put_int(t & 0xffffffff)
buf.put_array(MACHINE)
buf.put_array(PID)
i = index_for_time(t)
buf.put(i & 0xff)
buf.put((i >> 8) & 0xff)
buf.put((i >> 16) & 0xff)
buf.rewind
buf.to_a.dup
end
# (Would normally be private, but isn't so we can test it.)
def index_for_time(t)
LOCK.mu_synchronize {
if t != @@index_time
@@index = 0
@@index_time = t
end
retval = @@index
@@index += 1
retval
}
end
end
end

View File

@ -14,31 +14,27 @@
# limitations under the License.
# ++
module XGen
module Mongo
module Driver
module Mongo
# A Regexp that can hold on to extra options and ignore them. Mongo
# regexes may contain option characters beyond 'i', 'm', and 'x'. (Note
# that Mongo only uses those three, but that regexes coming from other
# languages may store different option characters.)
#
# Note that you do not have to use this class at all if you wish to
# store regular expressions in Mongo. The Mongo and Ruby regex option
# flags are the same. Storing regexes is discouraged, in any case.
class RegexpOfHolding < Regexp
# A Regexp that can hold on to extra options and ignore them. Mongo
# regexes may contain option characters beyond 'i', 'm', and 'x'. (Note
# that Mongo only uses those three, but that regexes coming from other
# languages may store different option characters.)
#
# Note that you do not have to use this class at all if you wish to
# store regular expressions in Mongo. The Mongo and Ruby regex option
# flags are the same. Storing regexes is discouraged, in any case.
class RegexpOfHolding < Regexp
attr_accessor :extra_options_str
# +str+ and +options+ are the same as Regexp. +extra_options_str+
# contains all the other flags that were in Mongo but we do not use or
# understand.
def initialize(str, options, extra_options_str)
super(str, options)
@extra_options_str = extra_options_str
end
end
attr_accessor :extra_options_str
# +str+ and +options+ are the same as Regexp. +extra_options_str+
# contains all the other flags that were in Mongo but we do not use or
# understand.
def initialize(str, options, extra_options_str)
super(str, options)
@extra_options_str = extra_options_str
end
end
end

View File

@ -14,19 +14,15 @@
# limitations under the License.
# ++
module XGen
module Mongo
module Driver
module Mongo
# DEPRECATED - the ruby driver converts the BSON undefined type to nil,
# and saves this type as nil
class Undefined < Object
# DEPRECATED - the ruby driver converts the BSON undefined type to nil,
# and saves this type as nil
class Undefined < Object
def initialize
super
warn "the Undefined type is deprecated and will be removed - BSON undefineds get implicitely converted to nil now"
end
end
def initialize
super
warn "the Undefined type is deprecated and will be removed - BSON undefineds get implicitely converted to nil now"
end
end
end

View File

@ -26,7 +26,7 @@ require 'mongo/types/undefined'
# A BSON seralizer/deserializer.
class BSON
include XGen::Mongo::Driver
include Mongo
MINKEY = -1
EOO = 0

View File

@ -21,7 +21,7 @@ require 'mongo'
# an OrderedHash.
class XMLToRuby
include XGen::Mongo::Driver
include Mongo
def xml_to_ruby(io)
doc = REXML::Document.new(io)

View File

@ -5,4 +5,4 @@ DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 27017
DEFAULT_DB = 'driver_test_framework'
include XGen::Mongo::Driver
include Mongo

View File

@ -3,9 +3,9 @@
require File.join(File.dirname(__FILE__), '_common.rb')
require 'mongo/gridfs'
include XGen::Mongo::GridFS
include GridFS
db = Mongo.new(DEFAULT_HOST, DEFAULT_PORT).db(DEFAULT_DB)
db = Mongo::Mongo.new(DEFAULT_HOST, DEFAULT_PORT).db(DEFAULT_DB)
input_file = ARGV[0]

View File

@ -3,9 +3,9 @@
require File.join(File.dirname(__FILE__), '_common.rb')
require 'mongo/gridfs'
include XGen::Mongo::GridFS
include GridFS
db = Mongo.new(DEFAULT_HOST, DEFAULT_PORT).db(DEFAULT_DB)
db = Mongo::Mongo.new(DEFAULT_HOST, DEFAULT_PORT).db(DEFAULT_DB)
input_file = ARGV[0]
output_file = ARGV[1]

View File

@ -2,9 +2,9 @@
require File.join(File.dirname(__FILE__), '_common.rb')
include XGen::Mongo
include Mongo
db = Mongo.new(DEFAULT_HOST, DEFAULT_PORT).db(DEFAULT_DB)
db = Mongo::Mongo.new(DEFAULT_HOST, DEFAULT_PORT).db(DEFAULT_DB)
x = db.collection('x')
y = db.collection('y')

View File

@ -5,7 +5,7 @@ require 'test/unit'
# NOTE: assumes Mongo is running
class AdminTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include Mongo
@@db = Mongo.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT).db('ruby-mongo-test')

View File

@ -5,7 +5,7 @@ require 'test/unit'
class BSONTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include Mongo
def setup
# We don't pass a DB to the constructor, even though we are about to test
@ -85,7 +85,7 @@ class BSONTest < Test::Unit::TestCase
assert_equal doc, doc2
r = doc2['doc']
assert_kind_of XGen::Mongo::Driver::RegexpOfHolding, r
assert_kind_of RegexpOfHolding, r
assert_equal '', r.extra_options_str
r.extra_options_str << 'zywcab'
@ -99,7 +99,7 @@ class BSONTest < Test::Unit::TestCase
assert_equal doc, doc2
r = doc2['doc']
assert_kind_of XGen::Mongo::Driver::RegexpOfHolding, r
assert_kind_of RegexpOfHolding, r
assert_equal 'abcwyz', r.extra_options_str # must be sorted
end

View File

@ -5,8 +5,8 @@ require 'mongo/gridfs'
class ChunkTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include XGen::Mongo::GridFS
include Mongo
include GridFS
@@db = Mongo.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT).db('ruby-mongo-utils-test')

View File

@ -20,8 +20,7 @@ require 'test/unit'
# NOTE: assumes Mongo is running
class TestCollection < Test::Unit::TestCase
include XGen::Mongo
include XGen::Mongo::Driver
include Mongo
@@db = Mongo.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT).db('ruby-mongo-test')

View File

@ -5,7 +5,7 @@ require 'test/unit'
# NOTE: assumes Mongo is running
class CursorTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include Mongo
@@db = Mongo.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT).db('ruby-mongo-test')

View File

@ -5,7 +5,7 @@ require 'test/unit'
class TestPKFactory
def create_pk(row)
row['_id'] ||= XGen::Mongo::Driver::ObjectID.new
row['_id'] ||= Mongo::ObjectID.new
row
end
end
@ -13,7 +13,7 @@ end
# NOTE: assumes Mongo is running
class DBTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include Mongo
@@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
@@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT
@ -85,7 +85,7 @@ class DBTest < Test::Unit::TestCase
assert_not_nil oid
assert_equal insert_id, oid
oid = XGen::Mongo::Driver::ObjectID.new
oid = ObjectID.new
data = {'_id' => oid, 'name' => 'Barney', 'age' => 41}
coll.insert(data)
row = coll.find_one({'name' => data['name']})

View File

@ -4,8 +4,7 @@ require 'test/unit'
# NOTE: assumes Mongo is running
class DBAPITest < Test::Unit::TestCase
include XGen::Mongo
include XGen::Mongo::Driver
include Mongo
@@db = Mongo.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT).db('ruby-mongo-test')
@ -817,18 +816,4 @@ class DBAPITest < Test::Unit::TestCase
@@db.collection("test").find({}, :snapshot => true, :sort => 'a').to_a
end
end
# TODO this test fails with error message "Undefed Before end of object"
# That is a database error. The undefined type may go away.
# def test_insert_undefined
# doc = {'undef' => Undefined.new}
# @@coll.clear
# @@coll.insert(doc)
# p @@db.error # DEBUG
# assert_equal 1, @@coll.count
# row = @@coll.find().next_object
# assert_not_nil row
# end
end

View File

@ -5,7 +5,7 @@ require 'test/unit'
# NOTE: assumes Mongo is running
class DBConnectionTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include Mongo
def test_no_exceptions
host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'

View File

@ -5,8 +5,8 @@ require 'mongo/gridfs'
class GridStoreTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include XGen::Mongo::GridFS
include Mongo
include GridFS
@@db = Mongo.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT).db('ruby-mongo-test')

View File

@ -4,7 +4,7 @@ require 'test/unit'
class MessageTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include Mongo
def setup
@msg = Message.new(42)

View File

@ -5,7 +5,7 @@ require 'test/unit'
# NOTE: assumes Mongo is running
class MongoTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include Mongo
def setup
@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'

View File

@ -4,7 +4,7 @@ require 'test/unit'
class ObjectIDTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include Mongo
def setup
@t = 42

View File

@ -13,7 +13,7 @@ require 'test/unit'
# of this project), then we find the BSON test files there and use those, too.
class RoundTripTest < Test::Unit::TestCase
include XGen::Mongo::Driver
include Mongo
@@ruby = nil

View File

@ -4,7 +4,7 @@ require 'test/unit'
class TestThreading < Test::Unit::TestCase
include XGen::Mongo::Driver
include Mongo
@@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
@@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Mongo::DEFAULT_PORT