From cced8ae5b769ad3d706030b744b5565e9829d3e9 Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Mon, 19 Oct 2009 17:14:41 -0400 Subject: [PATCH] Removed InsertMessage, RemoveMessage, and UpdateMessage, and refactored accordingly. --- lib/mongo/collection.rb | 84 +++++++++++++----- lib/mongo/db.rb | 132 ++++++++++++---------------- lib/mongo/message.rb | 4 +- lib/mongo/message/insert_message.rb | 37 -------- lib/mongo/message/remove_message.rb | 37 -------- lib/mongo/message/update_message.rb | 38 -------- lib/mongo/types/objectid.rb | 5 ++ lib/mongo/util/byte_buffer.rb | 12 +++ test/test_byte_buffer.rb | 14 +++ test/test_collection.rb | 16 ++++ test/test_db.rb | 2 +- test/test_objectid.rb | 10 +++ 12 files changed, 178 insertions(+), 213 deletions(-) delete mode 100644 lib/mongo/message/insert_message.rb delete mode 100644 lib/mongo/message/remove_message.rb delete mode 100644 lib/mongo/message/update_message.rb diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index 00f54b4..1a4a262 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -21,9 +21,9 @@ module Mongo # A named collection of records in a database. class Collection - attr_reader :db, :name, :hint + attr_reader :db, :name, :pk_factory, :hint - def initialize(db, name) + def initialize(db, name, pk_factory=nil) case name when Symbol, String else @@ -42,7 +42,8 @@ module Mongo raise InvalidName, "collection names must not start or end with '.'" end - @db, @name = db, name + @db, @name = db, name + @pk_factory = pk_factory || ObjectID @hint = nil end @@ -200,21 +201,30 @@ module Mongo # will be raised on an error. Checking for safety requires an extra # round-trip to the database def insert(doc_or_docs, options={}) - doc_or_docs = [doc_or_docs] if !doc_or_docs.is_a?(Array) - res = @db.insert_into_db(@name, doc_or_docs) + doc_or_docs = [doc_or_docs] unless doc_or_docs.is_a?(Array) + doc_or_docs.collect! { |doc| @pk_factory.create_pk(doc) } + result = insert_documents(doc_or_docs) if options.delete(:safe) error = @db.error if error raise OperationFailure, error end end - res.size > 1 ? res : res.first + result.size > 1 ? result : result.first end alias_method :<<, :insert # Remove the records that match +selector+. - def remove(selector={}) - @db.remove_from_db(@name, selector) + # def remove(selector={}) + # @db.remove_from_db(@name, selector) + # end + def remove(selector={}, check_keys=false) + message = ByteBuffer.new + message.put_int(0) + BSON.serialize_cstr(message, "#{@db.name}.#{@name}") + message.put_int(0) + message.put_array(BSON.new.serialize(selector, check_keys).to_a) + db.send_message_with_operation(OP_DELETE, message) end # Remove all records. @@ -236,19 +246,16 @@ module Mongo # will be raised on an error. Checking for safety requires an extra # round-trip to the database def update(spec, document, options={}) - upsert = options.delete(:upsert) - safe = options.delete(:safe) + message = ByteBuffer.new + message.put_int(0) + BSON.serialize_cstr(message, "#{@db.name}.#{@name}") + message.put_int(options[:upsert] ? 1 : 0) # 1 if a repsert operation (upsert) + message.put_array(BSON.new.serialize(spec, false).to_a) + message.put_array(BSON.new.serialize(document, false).to_a) + @db.send_message_with_operation(OP_UPDATE, message) - if upsert - @db.repsert_in_db(@name, spec, document) - else - @db.replace_in_db(@name, spec, document) - end - if safe - error = @db.error - if error - raise OperationFailure, error - end + if options[:safe] && error=@db.error + raise OperationFailure, error end end @@ -259,7 +266,20 @@ module Mongo # +unique+ is an optional boolean indicating whether this index # should enforce a uniqueness constraint. def create_index(field_or_spec, unique=false) - @db.create_index(@name, field_or_spec, unique) + field_h = OrderedHash.new + if field_or_spec.is_a?(String) || field_or_spec.is_a?(Symbol) + field_h[field_or_spec.to_s] = 1 + else + field_or_spec.each { |f| field_h[f[0].to_s] = f[1] } + end + name = generate_index_names(field_h) + sel = { + :name => name, + :ns => "#{@db.name}.#{@name}", + :key => field_h, + :unique => unique } + insert_documents([sel], Mongo::DB::SYSTEM_INDEX_COLLECTION, false) + name end # Drop index +name+. @@ -424,5 +444,27 @@ EOS h end end + + private + + # Sends an OP_INSERT message to the database. + # Takes an array of +documents+, an optional +collection_name+, and a + # +check_keys+ setting. + def insert_documents(documents, collection_name=@name, check_keys=true) + message = ByteBuffer.new + message.put_int(0) + BSON.serialize_cstr(message, "#{@db.name}.#{collection_name}") + documents.each { |doc| message.put_array(BSON.new.serialize(doc, check_keys).to_a) } + @db.send_message_with_operation(OP_INSERT, message) + documents.collect { |o| o[:_id] || o['_id'] } + end + + def generate_index_names(spec) + indexes = [] + spec.each_pair do |field, direction| + indexes.push("#{field}_#{direction}") + end + indexes.join("_") + end end end diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index 3917fc8..df7efb8 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -34,6 +34,9 @@ module Mongo SYSTEM_USER_COLLECTION = "system.users" SYSTEM_COMMAND_COLLECTION = "$cmd" + # Counter for generating unique request ids. + @@current_request_id = 0 + # Strict mode enforces collection existence checks. When +true+, # asking for a collection that does not exist or trying to create a # collection that already exists raises an error. @@ -213,7 +216,7 @@ module Mongo # specified, an array of length 1 is returned. def collections_info(coll_name=nil) selector = {} - selector[:name] = full_coll_name(coll_name) if coll_name + selector[:name] = full_collection_name(coll_name) if coll_name query(Collection.new(self, SYSTEM_NAMESPACE_COLLECTION), Query.new(selector)) end @@ -245,7 +248,7 @@ module Mongo oh[:create] = name doc = db_command(oh.merge(options || {})) ok = doc['ok'] - return Collection.new(self, name) if ok.kind_of?(Numeric) && (ok.to_i == 1 || ok.to_i == 0) + return Collection.new(self, name, @pk_factory) if ok.kind_of?(Numeric) && (ok.to_i == 1 || ok.to_i == 0) raise "Error creating collection: #{doc.inspect}" end @@ -257,7 +260,7 @@ module Mongo # new collection. If +strict+ is true, will raise an error if # collection +name+ does not already exists. def collection(name) - return Collection.new(self, name) if !strict? || collection_names.include?(name) + return Collection.new(self, name, @pk_factory) if !strict? || collection_names.include?(name) raise "Collection #{name} doesn't exist. Currently in strict mode." end alias_method :[], :collection @@ -374,33 +377,6 @@ module Mongo send_to_db(query_message) end - # Remove the records that match +selector+ from +collection_name+. - # Normally called by Collection#remove or Collection#clear. - def remove_from_db(collection_name, selector) - _synchronize { - send_to_db(RemoveMessage.new(@name, collection_name, selector)) - } - end - - # Update records in +collection_name+ that match +selector+ by - # applying +obj+ as an update. Normally called by Collection#replace. - def replace_in_db(collection_name, selector, obj) - _synchronize { - send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, false)) - } - end - - # Update records in +collection_name+ that match +selector+ by - # applying +obj+ as an update. If no match, inserts (???). Normally - # called by Collection#repsert. - def repsert_in_db(collection_name, selector, obj) - _synchronize { - obj = @pk_factory.create_pk(obj) if @pk_factory - send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, true)) - obj - } - end - # Dereference a DBRef, getting the document it points to. def dereference(dbref) collection(dbref.namespace).find_one("_id" => dbref.object_id) @@ -449,7 +425,7 @@ module Mongo # the values are lists of [key, direction] pairs specifying the index # (as passed to Collection#create_index). def index_information(collection_name) - sel = {:ns => full_coll_name(collection_name)} + sel = {:ns => full_collection_name(collection_name)} info = {} query(Collection.new(self, SYSTEM_INDEX_COLLECTION), Query.new(sel)).each { |index| info[index['name']] = index['key'].to_a @@ -464,42 +440,7 @@ module Mongo # by Collection#create_index. If +unique+ is true the index will # enforce a uniqueness constraint. def create_index(collection_name, field_or_spec, unique=false) - field_h = OrderedHash.new - if field_or_spec.is_a?(String) || field_or_spec.is_a?(Symbol) - field_h[field_or_spec.to_s] = 1 - else - field_or_spec.each { |f| field_h[f[0].to_s] = f[1] } - end - name = gen_index_name(field_h) - sel = { - :name => name, - :ns => full_coll_name(collection_name), - :key => field_h, - :unique => unique - } - _synchronize { - send_to_db(InsertMessage.new(@name, SYSTEM_INDEX_COLLECTION, false, sel)) - } - name - end - - # Insert +objects+ into +collection_name+. Normally called by - # Collection#insert. Returns a new array containing the _ids - # of the inserted documents. - def insert_into_db(collection_name, objects) - _synchronize { - if @pk_factory - objects.collect! { |o| - @pk_factory.create_pk(o) - } - else - objects = objects.collect do |o| - o[:_id] || o['_id'] ? o : o.merge!(:_id => ObjectID.new) - end - end - send_to_db(InsertMessage.new(@name, collection_name, true, *objects)) - objects.collect { |o| o[:_id] || o['_id'] } - } + self.collection(collection_name).create_index(field_or_spec, unique) end def send_to_db(message) @@ -514,8 +455,23 @@ module Mongo end end - def full_coll_name(collection_name) - "#{@name}.#{collection_name}" + # Sends a message to MongoDB. + # + # Takes a MongoDB opcode, +operation+, and a message of class ByteBuffer, + # +message+, and sends the message to the databse, adding the necessary headers. + def send_message_with_operation(operation, message) + _synchronize do + connect_to_master if !connected? && @auto_reconnect + begin + message_with_headers = add_message_headers(operation, message) + @logger.debug(" MONGODB #{message}") if @logger + @socket.print(message_with_headers.to_s) + @socket.flush + rescue => ex + close + raise ex + end + end end # Return +true+ if +doc+ contains an 'ok' field with the value 1. @@ -543,18 +499,40 @@ module Mongo @semaphore.synchronize &block end + def full_collection_name(collection_name) + "#{@name}.#{collection_name}" + end + private + # Prepares a message for transmission to MongoDB by + # constructing a valid message header. + def add_message_headers(operation, message) + headers = ByteBuffer.new + + # Message size. + headers.put_int(16 + message.size) + + # Unique request id. + headers.put_int(get_request_id) + + # Response id. + headers.put_int(0) + + # Opcode. + headers.put_int(operation) + message.prepend!(headers) + end + + # Increments and then returns the next available request id. + # Note: this method should be called from within a lock. + def get_request_id + @@current_request_id += 1 + @@current_request_id + end + def hash_password(username, plaintext) Digest::MD5.hexdigest("#{username}:mongo:#{plaintext}") end - - def gen_index_name(spec) - temp = [] - spec.each_pair { |field, direction| - temp = temp.push("#{field}_#{direction}") - } - return temp.join("_") - end end end diff --git a/lib/mongo/message.rb b/lib/mongo/message.rb index 9b70342..928915d 100644 --- a/lib/mongo/message.rb +++ b/lib/mongo/message.rb @@ -14,7 +14,7 @@ # limitations under the License. # ++ -%w(get_more_message insert_message kill_cursors_message message_header - msg_message query_message remove_message update_message).each { |f| +%w(get_more_message kill_cursors_message message_header + msg_message query_message).each { |f| require "mongo/message/#{f}" } diff --git a/lib/mongo/message/insert_message.rb b/lib/mongo/message/insert_message.rb deleted file mode 100644 index 420c2a4..0000000 --- a/lib/mongo/message/insert_message.rb +++ /dev/null @@ -1,37 +0,0 @@ -# -- -# Copyright (C) 2008-2009 10gen Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ++ - -require 'mongo/message/message' -require 'mongo/message/opcodes' - -module Mongo - - class InsertMessage < Message - - def initialize(db_name, collection_name, check_keys=true, *objs) - @collection_name = collection_name - @objs = objs - super(OP_INSERT) - write_int(0) - write_string("#{db_name}.#{collection_name}") - objs.each { |o| write_doc(o, check_keys) } - end - - def to_s - "db.#{@collection_name}.insert(#{@objs.inspect})" - end - end -end diff --git a/lib/mongo/message/remove_message.rb b/lib/mongo/message/remove_message.rb deleted file mode 100644 index 54cb3bc..0000000 --- a/lib/mongo/message/remove_message.rb +++ /dev/null @@ -1,37 +0,0 @@ -# -- -# Copyright (C) 2008-2009 10gen Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ++ - -require 'mongo/message/message' -require 'mongo/message/opcodes' - -module Mongo - - class RemoveMessage < Message - - def initialize(db_name, collection_name, sel) - @collection_name = collection_name - super(OP_DELETE) - write_int(0) - write_string("#{db_name}.#{collection_name}") - write_int(0) # flags? - write_doc(sel) - end - - def to_s - "#{@collection_name}.clear()" - end - end -end diff --git a/lib/mongo/message/update_message.rb b/lib/mongo/message/update_message.rb deleted file mode 100644 index fa8c62d..0000000 --- a/lib/mongo/message/update_message.rb +++ /dev/null @@ -1,38 +0,0 @@ -# -- -# Copyright (C) 2008-2009 10gen Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ++ - -require 'mongo/message/message' -require 'mongo/message/opcodes' - -module Mongo - - class UpdateMessage < Message - - def initialize(db_name, collection_name, sel, obj, repsert) - @collection_name = collection_name - super(OP_UPDATE) - write_int(0) - write_string("#{db_name}.#{collection_name}") - write_int(repsert ? 1 : 0) # 1 if a repsert operation (upsert) - write_doc(sel) - write_doc(obj) - end - - def to_s - "db.#{@collection_name}.update(#{@sel.inspect}, #{@obj.inspect})" - end - end -end diff --git a/lib/mongo/types/objectid.rb b/lib/mongo/types/objectid.rb index b28bb94..e701d2a 100644 --- a/lib/mongo/types/objectid.rb +++ b/lib/mongo/types/objectid.rb @@ -42,6 +42,11 @@ module Mongo str && str.length == len && match == str end + # Adds a primary key to the given document if needed. + def self.create_pk(doc) + doc[:_id] || doc['_id'] ? doc : doc.merge!(:_id => self.new) + end + # +data+ is an array of bytes. If nil, a new id will be generated. def initialize(data=nil) @data = data || generate diff --git a/lib/mongo/util/byte_buffer.rb b/lib/mongo/util/byte_buffer.rb index 7b539cb..6b24bf5 100644 --- a/lib/mongo/util/byte_buffer.rb +++ b/lib/mongo/util/byte_buffer.rb @@ -54,6 +54,18 @@ class ByteBuffer end alias_method :length, :size + # Appends a second ByteBuffer object, +buffer+, to the current buffer. + def append!(buffer) + @buf = @buf + buffer.to_a + self + end + + # Prepends a second ByteBuffer object, +buffer+, to the current buffer. + def prepend!(buffer) + @buf = buffer.to_a + @buf + self + end + def put(byte, offset=nil) @cursor = offset if offset @buf[@cursor] = byte diff --git a/test/test_byte_buffer.rb b/test/test_byte_buffer.rb index 0590778..86da23f 100644 --- a/test/test_byte_buffer.rb +++ b/test/test_byte_buffer.rb @@ -66,4 +66,18 @@ class ByteBufferTest < Test::Unit::TestCase assert_equal 4, @buf.position end + def test_prepend_byte_buffer + @buf.put_int(4) + new_buf = ByteBuffer.new([5, 0, 0, 0]) + @buf.prepend!(new_buf) + assert_equal [5, 0, 0, 0, 4, 0, 0, 0], @buf.to_a + end + + def test_append_byte_buffer + @buf.put_int(4) + new_buf = ByteBuffer.new([5, 0, 0, 0]) + @buf.append!(new_buf) + assert_equal [4, 0, 0, 0, 5, 0, 0, 0], @buf.to_a + end + end diff --git a/test/test_collection.rb b/test/test_collection.rb index 97945a3..65806b4 100644 --- a/test/test_collection.rb +++ b/test/test_collection.rb @@ -30,6 +30,22 @@ class TestCollection < Test::Unit::TestCase @@test.drop() end + def test_optional_pk_factory + @coll_default_pk = @@db.collection('stuff') + assert_equal Mongo::ObjectID, @coll_default_pk.pk_factory + @coll_default_pk = @@db.create_collection('more-stuff') + assert_equal Mongo::ObjectID, @coll_default_pk.pk_factory + + # Create a db with a pk_factory. + @db = Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost', + ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT).db('ruby-mongo-test', :pk => Object.new) + @coll = @db.collection('coll-with-pk') + assert @coll.pk_factory.is_a?(Object) + + @coll = @db.create_collection('created_coll_with_pk') + assert @coll.pk_factory.is_a?(Object) + end + def test_collection assert_raise InvalidName do @@db["te$t"] diff --git a/test/test_db.rb b/test/test_db.rb index 9f1dfd2..a5de5e6 100644 --- a/test/test_db.rb +++ b/test/test_db.rb @@ -61,7 +61,7 @@ class DBTest < Test::Unit::TestCase def test_full_coll_name coll = @@db.collection('test') - assert_equal 'ruby-mongo-test.test', @@db.full_coll_name(coll.name) + assert_equal 'ruby-mongo-test.test', @@db.full_collection_name(coll.name) end def test_collection_names diff --git a/test/test_objectid.rb b/test/test_objectid.rb index ccd03df..462579b 100644 --- a/test/test_objectid.rb +++ b/test/test_objectid.rb @@ -10,6 +10,16 @@ class ObjectIDTest < Test::Unit::TestCase @o = ObjectID.new() end + def test_create_pk_method + doc = {:name => 'Mongo'} + doc = ObjectID.create_pk(doc) + assert doc[:_id] + + doc = {:name => 'Mongo', :_id => '12345'} + doc = ObjectID.create_pk(doc) + assert_equal '12345', doc[:_id] + end + def test_different a = ObjectID.new b = ObjectID.new