Removed InsertMessage, RemoveMessage, and UpdateMessage, and refactored accordingly.

This commit is contained in:
Kyle Banker 2009-10-19 17:14:41 -04:00
parent f642b0b3bd
commit cced8ae5b7
12 changed files with 178 additions and 213 deletions

View File

@ -21,9 +21,9 @@ module Mongo
# A named collection of records in a database.
class Collection
attr_reader :db, :name, :hint
attr_reader :db, :name, :pk_factory, :hint
def initialize(db, name)
def initialize(db, name, pk_factory=nil)
case name
when Symbol, String
else
@ -42,7 +42,8 @@ module Mongo
raise InvalidName, "collection names must not start or end with '.'"
end
@db, @name = db, name
@db, @name = db, name
@pk_factory = pk_factory || ObjectID
@hint = nil
end
@ -200,21 +201,30 @@ module Mongo
# will be raised on an error. Checking for safety requires an extra
# round-trip to the database
def insert(doc_or_docs, options={})
doc_or_docs = [doc_or_docs] if !doc_or_docs.is_a?(Array)
res = @db.insert_into_db(@name, doc_or_docs)
doc_or_docs = [doc_or_docs] unless doc_or_docs.is_a?(Array)
doc_or_docs.collect! { |doc| @pk_factory.create_pk(doc) }
result = insert_documents(doc_or_docs)
if options.delete(:safe)
error = @db.error
if error
raise OperationFailure, error
end
end
res.size > 1 ? res : res.first
result.size > 1 ? result : result.first
end
alias_method :<<, :insert
# Remove the records that match +selector+.
def remove(selector={})
@db.remove_from_db(@name, selector)
# def remove(selector={})
# @db.remove_from_db(@name, selector)
# end
def remove(selector={}, check_keys=false)
message = ByteBuffer.new
message.put_int(0)
BSON.serialize_cstr(message, "#{@db.name}.#{@name}")
message.put_int(0)
message.put_array(BSON.new.serialize(selector, check_keys).to_a)
db.send_message_with_operation(OP_DELETE, message)
end
# Remove all records.
@ -236,19 +246,16 @@ module Mongo
# will be raised on an error. Checking for safety requires an extra
# round-trip to the database
def update(spec, document, options={})
upsert = options.delete(:upsert)
safe = options.delete(:safe)
message = ByteBuffer.new
message.put_int(0)
BSON.serialize_cstr(message, "#{@db.name}.#{@name}")
message.put_int(options[:upsert] ? 1 : 0) # 1 if a repsert operation (upsert)
message.put_array(BSON.new.serialize(spec, false).to_a)
message.put_array(BSON.new.serialize(document, false).to_a)
@db.send_message_with_operation(OP_UPDATE, message)
if upsert
@db.repsert_in_db(@name, spec, document)
else
@db.replace_in_db(@name, spec, document)
end
if safe
error = @db.error
if error
raise OperationFailure, error
end
if options[:safe] && error=@db.error
raise OperationFailure, error
end
end
@ -259,7 +266,20 @@ module Mongo
# +unique+ is an optional boolean indicating whether this index
# should enforce a uniqueness constraint.
def create_index(field_or_spec, unique=false)
@db.create_index(@name, field_or_spec, unique)
field_h = OrderedHash.new
if field_or_spec.is_a?(String) || field_or_spec.is_a?(Symbol)
field_h[field_or_spec.to_s] = 1
else
field_or_spec.each { |f| field_h[f[0].to_s] = f[1] }
end
name = generate_index_names(field_h)
sel = {
:name => name,
:ns => "#{@db.name}.#{@name}",
:key => field_h,
:unique => unique }
insert_documents([sel], Mongo::DB::SYSTEM_INDEX_COLLECTION, false)
name
end
# Drop index +name+.
@ -424,5 +444,27 @@ EOS
h
end
end
private
# Sends an OP_INSERT message to the database.
# Takes an array of +documents+, an optional +collection_name+, and a
# +check_keys+ setting.
def insert_documents(documents, collection_name=@name, check_keys=true)
message = ByteBuffer.new
message.put_int(0)
BSON.serialize_cstr(message, "#{@db.name}.#{collection_name}")
documents.each { |doc| message.put_array(BSON.new.serialize(doc, check_keys).to_a) }
@db.send_message_with_operation(OP_INSERT, message)
documents.collect { |o| o[:_id] || o['_id'] }
end
def generate_index_names(spec)
indexes = []
spec.each_pair do |field, direction|
indexes.push("#{field}_#{direction}")
end
indexes.join("_")
end
end
end

View File

@ -34,6 +34,9 @@ module Mongo
SYSTEM_USER_COLLECTION = "system.users"
SYSTEM_COMMAND_COLLECTION = "$cmd"
# Counter for generating unique request ids.
@@current_request_id = 0
# Strict mode enforces collection existence checks. When +true+,
# asking for a collection that does not exist or trying to create a
# collection that already exists raises an error.
@ -213,7 +216,7 @@ module Mongo
# specified, an array of length 1 is returned.
def collections_info(coll_name=nil)
selector = {}
selector[:name] = full_coll_name(coll_name) if coll_name
selector[:name] = full_collection_name(coll_name) if coll_name
query(Collection.new(self, SYSTEM_NAMESPACE_COLLECTION), Query.new(selector))
end
@ -245,7 +248,7 @@ module Mongo
oh[:create] = name
doc = db_command(oh.merge(options || {}))
ok = doc['ok']
return Collection.new(self, name) if ok.kind_of?(Numeric) && (ok.to_i == 1 || ok.to_i == 0)
return Collection.new(self, name, @pk_factory) if ok.kind_of?(Numeric) && (ok.to_i == 1 || ok.to_i == 0)
raise "Error creating collection: #{doc.inspect}"
end
@ -257,7 +260,7 @@ module Mongo
# new collection. If +strict+ is true, will raise an error if
# collection +name+ does not already exists.
def collection(name)
return Collection.new(self, name) if !strict? || collection_names.include?(name)
return Collection.new(self, name, @pk_factory) if !strict? || collection_names.include?(name)
raise "Collection #{name} doesn't exist. Currently in strict mode."
end
alias_method :[], :collection
@ -374,33 +377,6 @@ module Mongo
send_to_db(query_message)
end
# Remove the records that match +selector+ from +collection_name+.
# Normally called by Collection#remove or Collection#clear.
def remove_from_db(collection_name, selector)
_synchronize {
send_to_db(RemoveMessage.new(@name, collection_name, selector))
}
end
# Update records in +collection_name+ that match +selector+ by
# applying +obj+ as an update. Normally called by Collection#replace.
def replace_in_db(collection_name, selector, obj)
_synchronize {
send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, false))
}
end
# Update records in +collection_name+ that match +selector+ by
# applying +obj+ as an update. If no match, inserts (???). Normally
# called by Collection#repsert.
def repsert_in_db(collection_name, selector, obj)
_synchronize {
obj = @pk_factory.create_pk(obj) if @pk_factory
send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, true))
obj
}
end
# Dereference a DBRef, getting the document it points to.
def dereference(dbref)
collection(dbref.namespace).find_one("_id" => dbref.object_id)
@ -449,7 +425,7 @@ module Mongo
# the values are lists of [key, direction] pairs specifying the index
# (as passed to Collection#create_index).
def index_information(collection_name)
sel = {:ns => full_coll_name(collection_name)}
sel = {:ns => full_collection_name(collection_name)}
info = {}
query(Collection.new(self, SYSTEM_INDEX_COLLECTION), Query.new(sel)).each { |index|
info[index['name']] = index['key'].to_a
@ -464,42 +440,7 @@ module Mongo
# by Collection#create_index. If +unique+ is true the index will
# enforce a uniqueness constraint.
def create_index(collection_name, field_or_spec, unique=false)
field_h = OrderedHash.new
if field_or_spec.is_a?(String) || field_or_spec.is_a?(Symbol)
field_h[field_or_spec.to_s] = 1
else
field_or_spec.each { |f| field_h[f[0].to_s] = f[1] }
end
name = gen_index_name(field_h)
sel = {
:name => name,
:ns => full_coll_name(collection_name),
:key => field_h,
:unique => unique
}
_synchronize {
send_to_db(InsertMessage.new(@name, SYSTEM_INDEX_COLLECTION, false, sel))
}
name
end
# Insert +objects+ into +collection_name+. Normally called by
# Collection#insert. Returns a new array containing the _ids
# of the inserted documents.
def insert_into_db(collection_name, objects)
_synchronize {
if @pk_factory
objects.collect! { |o|
@pk_factory.create_pk(o)
}
else
objects = objects.collect do |o|
o[:_id] || o['_id'] ? o : o.merge!(:_id => ObjectID.new)
end
end
send_to_db(InsertMessage.new(@name, collection_name, true, *objects))
objects.collect { |o| o[:_id] || o['_id'] }
}
self.collection(collection_name).create_index(field_or_spec, unique)
end
def send_to_db(message)
@ -514,8 +455,23 @@ module Mongo
end
end
def full_coll_name(collection_name)
"#{@name}.#{collection_name}"
# Sends a message to MongoDB.
#
# Takes a MongoDB opcode, +operation+, and a message of class ByteBuffer,
# +message+, and sends the message to the databse, adding the necessary headers.
def send_message_with_operation(operation, message)
_synchronize do
connect_to_master if !connected? && @auto_reconnect
begin
message_with_headers = add_message_headers(operation, message)
@logger.debug(" MONGODB #{message}") if @logger
@socket.print(message_with_headers.to_s)
@socket.flush
rescue => ex
close
raise ex
end
end
end
# Return +true+ if +doc+ contains an 'ok' field with the value 1.
@ -543,18 +499,40 @@ module Mongo
@semaphore.synchronize &block
end
def full_collection_name(collection_name)
"#{@name}.#{collection_name}"
end
private
# Prepares a message for transmission to MongoDB by
# constructing a valid message header.
def add_message_headers(operation, message)
headers = ByteBuffer.new
# Message size.
headers.put_int(16 + message.size)
# Unique request id.
headers.put_int(get_request_id)
# Response id.
headers.put_int(0)
# Opcode.
headers.put_int(operation)
message.prepend!(headers)
end
# Increments and then returns the next available request id.
# Note: this method should be called from within a lock.
def get_request_id
@@current_request_id += 1
@@current_request_id
end
def hash_password(username, plaintext)
Digest::MD5.hexdigest("#{username}:mongo:#{plaintext}")
end
def gen_index_name(spec)
temp = []
spec.each_pair { |field, direction|
temp = temp.push("#{field}_#{direction}")
}
return temp.join("_")
end
end
end

View File

@ -14,7 +14,7 @@
# limitations under the License.
# ++
%w(get_more_message insert_message kill_cursors_message message_header
msg_message query_message remove_message update_message).each { |f|
%w(get_more_message kill_cursors_message message_header
msg_message query_message).each { |f|
require "mongo/message/#{f}"
}

View File

@ -1,37 +0,0 @@
# --
# Copyright (C) 2008-2009 10gen Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ++
require 'mongo/message/message'
require 'mongo/message/opcodes'
module Mongo
class InsertMessage < Message
def initialize(db_name, collection_name, check_keys=true, *objs)
@collection_name = collection_name
@objs = objs
super(OP_INSERT)
write_int(0)
write_string("#{db_name}.#{collection_name}")
objs.each { |o| write_doc(o, check_keys) }
end
def to_s
"db.#{@collection_name}.insert(#{@objs.inspect})"
end
end
end

View File

@ -1,37 +0,0 @@
# --
# Copyright (C) 2008-2009 10gen Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ++
require 'mongo/message/message'
require 'mongo/message/opcodes'
module Mongo
class RemoveMessage < Message
def initialize(db_name, collection_name, sel)
@collection_name = collection_name
super(OP_DELETE)
write_int(0)
write_string("#{db_name}.#{collection_name}")
write_int(0) # flags?
write_doc(sel)
end
def to_s
"#{@collection_name}.clear()"
end
end
end

View File

@ -1,38 +0,0 @@
# --
# Copyright (C) 2008-2009 10gen Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ++
require 'mongo/message/message'
require 'mongo/message/opcodes'
module Mongo
class UpdateMessage < Message
def initialize(db_name, collection_name, sel, obj, repsert)
@collection_name = collection_name
super(OP_UPDATE)
write_int(0)
write_string("#{db_name}.#{collection_name}")
write_int(repsert ? 1 : 0) # 1 if a repsert operation (upsert)
write_doc(sel)
write_doc(obj)
end
def to_s
"db.#{@collection_name}.update(#{@sel.inspect}, #{@obj.inspect})"
end
end
end

View File

@ -42,6 +42,11 @@ module Mongo
str && str.length == len && match == str
end
# Adds a primary key to the given document if needed.
def self.create_pk(doc)
doc[:_id] || doc['_id'] ? doc : doc.merge!(:_id => self.new)
end
# +data+ is an array of bytes. If nil, a new id will be generated.
def initialize(data=nil)
@data = data || generate

View File

@ -54,6 +54,18 @@ class ByteBuffer
end
alias_method :length, :size
# Appends a second ByteBuffer object, +buffer+, to the current buffer.
def append!(buffer)
@buf = @buf + buffer.to_a
self
end
# Prepends a second ByteBuffer object, +buffer+, to the current buffer.
def prepend!(buffer)
@buf = buffer.to_a + @buf
self
end
def put(byte, offset=nil)
@cursor = offset if offset
@buf[@cursor] = byte

View File

@ -66,4 +66,18 @@ class ByteBufferTest < Test::Unit::TestCase
assert_equal 4, @buf.position
end
def test_prepend_byte_buffer
@buf.put_int(4)
new_buf = ByteBuffer.new([5, 0, 0, 0])
@buf.prepend!(new_buf)
assert_equal [5, 0, 0, 0, 4, 0, 0, 0], @buf.to_a
end
def test_append_byte_buffer
@buf.put_int(4)
new_buf = ByteBuffer.new([5, 0, 0, 0])
@buf.append!(new_buf)
assert_equal [4, 0, 0, 0, 5, 0, 0, 0], @buf.to_a
end
end

View File

@ -30,6 +30,22 @@ class TestCollection < Test::Unit::TestCase
@@test.drop()
end
def test_optional_pk_factory
@coll_default_pk = @@db.collection('stuff')
assert_equal Mongo::ObjectID, @coll_default_pk.pk_factory
@coll_default_pk = @@db.create_collection('more-stuff')
assert_equal Mongo::ObjectID, @coll_default_pk.pk_factory
# Create a db with a pk_factory.
@db = Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT).db('ruby-mongo-test', :pk => Object.new)
@coll = @db.collection('coll-with-pk')
assert @coll.pk_factory.is_a?(Object)
@coll = @db.create_collection('created_coll_with_pk')
assert @coll.pk_factory.is_a?(Object)
end
def test_collection
assert_raise InvalidName do
@@db["te$t"]

View File

@ -61,7 +61,7 @@ class DBTest < Test::Unit::TestCase
def test_full_coll_name
coll = @@db.collection('test')
assert_equal 'ruby-mongo-test.test', @@db.full_coll_name(coll.name)
assert_equal 'ruby-mongo-test.test', @@db.full_collection_name(coll.name)
end
def test_collection_names

View File

@ -10,6 +10,16 @@ class ObjectIDTest < Test::Unit::TestCase
@o = ObjectID.new()
end
def test_create_pk_method
doc = {:name => 'Mongo'}
doc = ObjectID.create_pk(doc)
assert doc[:_id]
doc = {:name => 'Mongo', :_id => '12345'}
doc = ObjectID.create_pk(doc)
assert_equal '12345', doc[:_id]
end
def test_different
a = ObjectID.new
b = ObjectID.new