Lots of fixes.
- Introduced OrderedHash because db commands require that the command key be first. - Fixed BSON (de)serialization of object ids. - Simplified @coll.drop_indexes. - Renamed some method parameters to make their type more clear (for example, "collection" became "collection_name"). - Got the index_information test working.
This commit is contained in:
parent
067ec66812
commit
7669900126
3
README
3
README
@ -52,6 +52,9 @@ type
|
||||
|
||||
= To Do
|
||||
|
||||
* Study src/main/ed/db/{dbcollection,dbcursor,db}.js in the Babble code.
|
||||
That's what I should be writing to.
|
||||
|
||||
* Capped collection support.
|
||||
|
||||
* More code comments. More text in this file.
|
||||
|
@ -67,7 +67,8 @@ module XGen
|
||||
end
|
||||
|
||||
def drop_indexes
|
||||
index_information.each { |info| @db.drop_index(@name, info.name) }
|
||||
# just need to call drop indexes with no args; will drop them all
|
||||
@db.drop_index(@name, '*')
|
||||
end
|
||||
|
||||
def index_information
|
||||
|
@ -17,6 +17,7 @@ require 'mongo/objectid'
|
||||
require 'mongo/collection'
|
||||
require 'mongo/message'
|
||||
require 'mongo/query'
|
||||
require 'mongo/util/ordered_hash.rb'
|
||||
|
||||
module XGen
|
||||
module Mongo
|
||||
@ -52,8 +53,9 @@ module XGen
|
||||
return Collection.new(self, name) if collection_names.include?(name)
|
||||
|
||||
# Create new collection
|
||||
sel = {:create => name}.merge(options)
|
||||
doc = db_command(sel)
|
||||
oh = OrderedHash.new
|
||||
oh[:create] = name
|
||||
doc = db_command(oh.merge(options))
|
||||
o = doc['ok']
|
||||
return Collection.new(self, name) if o.kind_of?(Numeric) && (o.to_i == 1 || o.to_i == 0)
|
||||
raise "Error creating collection: #{doc.inspect}"
|
||||
@ -89,86 +91,103 @@ module XGen
|
||||
send_to_db(MsgMessage.new(msg))
|
||||
end
|
||||
|
||||
def query(collection, query)
|
||||
def query(collection_name, query)
|
||||
# TODO synchronize
|
||||
send_to_db(QueryMessage.new(@name, collection, query))
|
||||
return Cursor.new(self, collection)
|
||||
send_to_db(QueryMessage.new(@name, collection_name, query))
|
||||
return Cursor.new(self, collection_name)
|
||||
end
|
||||
|
||||
def remove_from_db(collection, selector)
|
||||
def remove_from_db(collection_name, selector)
|
||||
# TODO synchronize
|
||||
send_to_db(RemoveMessage.new(@name, collection, selector))
|
||||
send_to_db(RemoveMessage.new(@name, collection_name, selector))
|
||||
end
|
||||
|
||||
def replace_in_db(collection, selector, obj)
|
||||
def replace_in_db(collection_name, selector, obj)
|
||||
# TODO synchronize
|
||||
send_to_db(UpdateMessage.new(@name, collection, selector, obj, false))
|
||||
send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, false))
|
||||
end
|
||||
alias_method :modify_in_db, :replace_in_db
|
||||
|
||||
def repsert_in_db(collection, selector, obj)
|
||||
def repsert_in_db(collection_name, selector, obj)
|
||||
# TODO if PKInjector, inject
|
||||
# TODO synchronize
|
||||
send_to_db(UpdateMessage.new(@name, collection, selector, obj, true))
|
||||
send_to_db(UpdateMessage.new(@name, collection_name, selector, obj, true))
|
||||
obj
|
||||
end
|
||||
|
||||
def count(collection, selector)
|
||||
doc = db_command(:count => collection, :query => selector)
|
||||
def count(collection_name, selector)
|
||||
oh = OrderedHash.new
|
||||
oh[:count] = collection_name
|
||||
oh[:query] = selector
|
||||
doc = db_command(oh)
|
||||
o = doc['ok']
|
||||
return doc['n'].to_i if o.to_i == 1
|
||||
raise "Error with count command: #{doc.inspect}"
|
||||
end
|
||||
|
||||
def drop_index(collection, name)
|
||||
db_command(:deleteIndexes => collection, :index => name)
|
||||
def drop_index(collection_name, name)
|
||||
oh = OrderedHash.new
|
||||
oh[:deleteIndexes] = collection_name
|
||||
oh[:index] = name
|
||||
doc = db_command(oh)
|
||||
o = doc['ok']
|
||||
raise "Error with drop_index command: #{doc.inspect}" unless o.kind_of?(Numeric) && o.to_i == 1
|
||||
end
|
||||
|
||||
def index_information(collection)
|
||||
sel = {:ns => full_coll_name(collection)}
|
||||
def index_information(collection_name)
|
||||
sel = {:ns => full_coll_name(collection_name)}
|
||||
# TODO synchronize
|
||||
query(SYSTEM_INDEX_COLLECTION, Query.new(sel)).collect { |row|
|
||||
h = {:name => row['name']}
|
||||
raise "Name of index on return from db was nil. Coll = #{full_coll_name(collection)}" unless h[:name]
|
||||
raise "Name of index on return from db was nil. Coll = #{full_coll_name(collection_name)}" unless h[:name]
|
||||
|
||||
h[:keys] = row['key']
|
||||
raise "Keys for index on return from db was nil. Coll = #{full_coll_name(collection)}" unless h[:keys]
|
||||
raise "Keys for index on return from db was nil. Coll = #{full_coll_name(collection_name)}" unless h[:keys]
|
||||
|
||||
h[:ns] = row['ns']
|
||||
raise "Namespace for index on return from db was nil. Coll = #{full_coll_name(collection)}" unless h[:ns]
|
||||
raise "Namespace for index on return from db was nil. Coll = #{full_coll_name(collection_name)}" unless h[:ns]
|
||||
h[:ns].sub!(/.*\./, '')
|
||||
raise "Error: ns != collection" unless h[:ns] == collection
|
||||
raise "Error: ns != collection" unless h[:ns] == collection_name
|
||||
|
||||
h
|
||||
}
|
||||
end
|
||||
|
||||
def create_index(collection, name, fields)
|
||||
sel = {:name => name, :ns => full_coll_name(collection)}
|
||||
def create_index(collection_name, index_name, fields)
|
||||
sel = {:name => index_name, :ns => full_coll_name(collection_name)}
|
||||
field_h = {}
|
||||
fields.each { |f| field_h[f] = 1 }
|
||||
sel['key'] = field_h
|
||||
sel[:key] = field_h
|
||||
# TODO synchronize
|
||||
send_to_db(InsertMessage.new(@name, SYSTEM_INDEX_COLLECTION, sel))
|
||||
end
|
||||
|
||||
def insert_into_db(collection, objects)
|
||||
def insert_into_db(collection_name, objects)
|
||||
objects.each { |o| o['_id'] ||= ObjectID.new }
|
||||
# TODO synchronize
|
||||
objects.each { |o| send_to_db(InsertMessage.new(@name, collection, o)) }
|
||||
objects.each { |o| send_to_db(InsertMessage.new(@name, collection_name, o)) }
|
||||
end
|
||||
|
||||
def send_to_db(message)
|
||||
@socket.print(message.buf.to_s)
|
||||
end
|
||||
|
||||
def full_coll_name(collection)
|
||||
"#{@name}.#{collection}"
|
||||
def full_coll_name(collection_name)
|
||||
"#{@name}.#{collection_name}"
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
# DB commands need to be ordered, so selector must be an OrderedHash
|
||||
# (or a Hash with only one element). What DB commands really need is
|
||||
# that the "command" key be first.
|
||||
def db_command(selector)
|
||||
if !selector.kind_of?(OrderedHash)
|
||||
if !selector.kind_of?(Hash) || selector.keys.length > 1
|
||||
raise "db_command must be given an OrderedHash when there is more than one key"
|
||||
end
|
||||
end
|
||||
|
||||
# TODO synchronize
|
||||
q = Query.new(selector)
|
||||
q.number_to_return = 1
|
||||
|
@ -7,10 +7,10 @@ module XGen
|
||||
|
||||
class GetMoreMessage < Message
|
||||
|
||||
def initialize(name, collection, cursor)
|
||||
def initialize(db_name, collection_name, cursor)
|
||||
super(OP_GET_MORE)
|
||||
write_int(0)
|
||||
write_string("#{name}.#{collection}")
|
||||
write_string("#{db_name}.#{collection_name}")
|
||||
write_int(0) # num to return; leave it up to the db for now
|
||||
write_long(cursor)
|
||||
end
|
||||
|
@ -7,14 +7,13 @@ module XGen
|
||||
|
||||
class InsertMessage < Message
|
||||
|
||||
def initialize(name, collection, *objs)
|
||||
def initialize(db_name, collection_name, *objs)
|
||||
super(OP_INSERT)
|
||||
write_int(0)
|
||||
write_string("#{name}.#{collection}")
|
||||
write_string("#{db_name}.#{collection_name}")
|
||||
objs.each { |o| write_doc(o) }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -7,15 +7,19 @@ module XGen
|
||||
|
||||
class QueryMessage < Message
|
||||
|
||||
def initialize(name, collection, query)
|
||||
def initialize(db_name, collection_name, query)
|
||||
super(OP_QUERY)
|
||||
write_int(0)
|
||||
write_string("#{name}.#{collection}")
|
||||
write_string("#{db_name}.#{collection_name}")
|
||||
write_int(query.number_to_skip)
|
||||
write_int(query.number_to_return)
|
||||
write_doc(query.selector)
|
||||
write_doc(query.fields) if query.fields
|
||||
end
|
||||
|
||||
def first_key(key)
|
||||
@first_key = key
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -7,10 +7,10 @@ module XGen
|
||||
|
||||
class RemoveMessage < Message
|
||||
|
||||
def initialize(name, collection, sel)
|
||||
def initialize(db_name, collection_name, sel)
|
||||
super(OP_DELETE)
|
||||
write_int(0)
|
||||
write_string("#{name}.#{collection}")
|
||||
write_string("#{db_name}.#{collection_name}")
|
||||
write_int(0) # flags?
|
||||
write_doc(sel)
|
||||
end
|
||||
|
@ -7,10 +7,10 @@ module XGen
|
||||
|
||||
class UpdateMessage < Message
|
||||
|
||||
def initialize(name, collection, sel, obj, repsert)
|
||||
def initialize(db_name, collection_name, sel, obj, repsert)
|
||||
super(OP_UPDATE)
|
||||
write_int(0)
|
||||
write_string("#{name}.#{collection}")
|
||||
write_string("#{db_name}.#{collection_name}")
|
||||
write_int(repsert ? 1 : 0) # 1 if a repsert operation (upsert)
|
||||
write_doc(sel)
|
||||
write_doc(obj)
|
||||
|
@ -20,8 +20,6 @@ module XGen
|
||||
|
||||
class ObjectID
|
||||
|
||||
UUID_STRING_LENGTH = 24
|
||||
|
||||
@@uuid_generator = UUID.new
|
||||
|
||||
# String UUID
|
||||
|
@ -156,7 +156,10 @@ class BSON
|
||||
end
|
||||
|
||||
def deserialize_oid_data(buf)
|
||||
XGen::Mongo::Driver::ObjectID.new(buf.get(XGen::Mongo::Driver::ObjectID::UUID_STRING_LENGTH).pack("C*"))
|
||||
high_bytes = buf.get_long
|
||||
low_bytes = buf.get_int
|
||||
hexval = (high_bytes << 32) + low_bytes
|
||||
XGen::Mongo::Driver::ObjectID.new('%012x' % hexval)
|
||||
end
|
||||
|
||||
def serialize_eoo_element(buf)
|
||||
@ -199,7 +202,12 @@ class BSON
|
||||
def serialize_oid_element(buf, key, val)
|
||||
buf.put(OID)
|
||||
self.class.serialize_cstr(buf, key)
|
||||
buf.put_array(val.to_s.unpack("C*"))
|
||||
|
||||
hexval = val.to_s.hex
|
||||
high_bytes = hexval >> 32
|
||||
low_bytes = hexval && 0xffffffff
|
||||
buf.put_long(high_bytes)
|
||||
buf.put_int(low_bytes)
|
||||
end
|
||||
|
||||
def serialize_string_element(buf, key, val, type)
|
||||
|
26
lib/mongo/util/ordered_hash.rb
Normal file
26
lib/mongo/util/ordered_hash.rb
Normal file
@ -0,0 +1,26 @@
|
||||
class OrderedHash < Hash
|
||||
|
||||
attr_accessor :ordered_keys
|
||||
|
||||
def keys
|
||||
@ordered_keys || []
|
||||
end
|
||||
|
||||
def []=(key, value)
|
||||
@ordered_keys ||= []
|
||||
@ordered_keys << key unless @ordered_keys.include?(key)
|
||||
super(key, value)
|
||||
end
|
||||
|
||||
def each
|
||||
@ordered_keys ||= []
|
||||
@ordered_keys.each { |k| yield k, self[k] }
|
||||
end
|
||||
|
||||
def merge(other)
|
||||
@ordered_keys ||= []
|
||||
@ordered_keys += other.keys # unordered if not an OrderedHash
|
||||
super(other)
|
||||
end
|
||||
|
||||
end
|
@ -14,7 +14,7 @@ class DBAPITest < Test::Unit::TestCase
|
||||
end
|
||||
|
||||
def teardown
|
||||
@coll.clear unless @db.socket.closed?
|
||||
@coll.clear unless @coll == nil || @db.socket.closed?
|
||||
end
|
||||
|
||||
def test_clear
|
||||
@ -30,9 +30,9 @@ class DBAPITest < Test::Unit::TestCase
|
||||
assert_equal 3, @coll.count
|
||||
docs = @coll.find().collect
|
||||
assert_equal 3, docs.length
|
||||
assert docs.include?('a' => 1)
|
||||
assert docs.include?('a' => 2)
|
||||
assert docs.include?('b' => 3)
|
||||
assert docs.detect { |row| row['a'] == 1 }
|
||||
assert docs.detect { |row| row['a'] == 2 }
|
||||
assert docs.detect { |row| row['b'] == 3 }
|
||||
end
|
||||
|
||||
def test_close
|
||||
@ -49,6 +49,7 @@ class DBAPITest < Test::Unit::TestCase
|
||||
def test_drop_collection
|
||||
assert @db.drop_collection(@coll.name), "drop of collection #{@coll.name} failed"
|
||||
assert !@db.collection_names.include?(@coll_full_name)
|
||||
@coll = nil
|
||||
end
|
||||
|
||||
def test_collection_names
|
||||
@ -79,15 +80,13 @@ class DBAPITest < Test::Unit::TestCase
|
||||
assert_equal @coll_full_name, @db.full_coll_name(@coll.name)
|
||||
end
|
||||
|
||||
# FIXME
|
||||
# def test_index_information
|
||||
# list = @db.index_information(@coll.name)
|
||||
# assert_equal 0, list.length
|
||||
def test_index_information
|
||||
@db.create_index(@coll.name, 'index_name', ['a'])
|
||||
list = @db.index_information(@coll.name)
|
||||
assert_equal 1, list.length
|
||||
|
||||
# @db.create_index(@coll, 'index_name', {'a' => 1})
|
||||
# $stderr.puts @db.create_index(@coll, 'index_name', {'a' => 1}).to_s # DEBUG
|
||||
# list = @db.index_information(@coll.name)
|
||||
# $stderr.puts "list = #{list.inspect}" # DEBUG
|
||||
# assert_equal 1, list.length
|
||||
# end
|
||||
info = list[0]
|
||||
assert_equal 'index_name', info[:name]
|
||||
assert_equal 1, info[:keys]['a']
|
||||
end
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user