From 5e3815bf69770851db63fda8bb6d02ba2a09aa83 Mon Sep 17 00:00:00 2001 From: Jim Menard Date: Fri, 21 Nov 2008 20:00:51 -0500 Subject: [PATCH] first commit --- README | 1 + Rakefile | 8 + examples/demo.rb | 16 ++ lib/mongo.rb | 5 + lib/mongo/collection.rb | 84 +++++++ lib/mongo/cursor.rb | 127 +++++++++++ lib/mongo/db.rb | 176 +++++++++++++++ lib/mongo/message.rb | 4 + lib/mongo/message/get_more_message.rb | 21 ++ lib/mongo/message/insert_message.rb | 20 ++ lib/mongo/message/kill_cursors_message.rb | 20 ++ lib/mongo/message/message.rb | 68 ++++++ lib/mongo/message/message_header.rb | 34 +++ lib/mongo/message/msg_message.rb | 17 ++ lib/mongo/message/opcodes.rb | 16 ++ lib/mongo/message/query_message.rb | 22 ++ lib/mongo/message/remove_message.rb | 20 ++ lib/mongo/message/update_message.rb | 21 ++ lib/mongo/mongo.rb | 45 ++++ lib/mongo/query.rb | 52 +++++ lib/mongo/util/bson.rb | 257 ++++++++++++++++++++++ lib/mongo/util/byte_buffer.rb | 132 +++++++++++ tests/test_byte_buffer.rb | 45 ++++ tests/test_db_connection.rb | 13 ++ tests/test_message.rb | 33 +++ 25 files changed, 1257 insertions(+) create mode 100644 README create mode 100644 Rakefile create mode 100644 examples/demo.rb create mode 100644 lib/mongo.rb create mode 100644 lib/mongo/collection.rb create mode 100644 lib/mongo/cursor.rb create mode 100644 lib/mongo/db.rb create mode 100644 lib/mongo/message.rb create mode 100644 lib/mongo/message/get_more_message.rb create mode 100644 lib/mongo/message/insert_message.rb create mode 100644 lib/mongo/message/kill_cursors_message.rb create mode 100644 lib/mongo/message/message.rb create mode 100644 lib/mongo/message/message_header.rb create mode 100644 lib/mongo/message/msg_message.rb create mode 100644 lib/mongo/message/opcodes.rb create mode 100644 lib/mongo/message/query_message.rb create mode 100644 lib/mongo/message/remove_message.rb create mode 100644 lib/mongo/message/update_message.rb create mode 100644 lib/mongo/mongo.rb create mode 100644 lib/mongo/query.rb create mode 100644 lib/mongo/util/bson.rb create mode 100644 lib/mongo/util/byte_buffer.rb create mode 100644 tests/test_byte_buffer.rb create mode 100644 tests/test_db_connection.rb create mode 100644 tests/test_message.rb diff --git a/README b/README new file mode 100644 index 0000000..0906b22 --- /dev/null +++ b/README @@ -0,0 +1 @@ +This is a simple pure-Ruby driver for the 10gen MongoDB. diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..53fdf76 --- /dev/null +++ b/Rakefile @@ -0,0 +1,8 @@ +require 'rake/testtask' + +task :default => [:test] + +# NOTE: some of the tests assume Mongo is running +Rake::TestTask.new do |t| + t.test_files = FileList['tests/test*.rb'] +end diff --git a/examples/demo.rb b/examples/demo.rb new file mode 100644 index 0000000..c1e1de5 --- /dev/null +++ b/examples/demo.rb @@ -0,0 +1,16 @@ +$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') +require 'mongo' + +include XGen::Mongo::Driver + +db = Mongo.new.db('ruby-mongo-demo') +coll = db.collection('test') +coll.clear + +doc = {'a' => 1} +coll.insert(doc) + +doc = {'a' => 2} +coll.insert(doc) + +coll.find().each { |doc| puts doc.inspect } diff --git a/lib/mongo.rb b/lib/mongo.rb new file mode 100644 index 0000000..a35e535 --- /dev/null +++ b/lib/mongo.rb @@ -0,0 +1,5 @@ +require 'mongo/mongo' +require 'mongo/message' +require 'mongo/db' +require 'mongo/cursor' +require 'mongo/collection' diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb new file mode 100644 index 0000000..7447941 --- /dev/null +++ b/lib/mongo/collection.rb @@ -0,0 +1,84 @@ +# Copyright (C) 2008 10gen Inc. +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License, version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License +# for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +require 'mongo/query' + +module XGen + module Mongo + module Driver + class Collection + + attr_reader :db, :name + + def initialize(db, name) + @db = db + @name = name + end + + def find(selector={}, fields=nil, options={}) + fields = nil if fields && fields.empty? + @db.query(@name, Query.new(selector, fields, options[:offset] || 0, options[:limit] || 0, options[:sort])) + end + + def insert(*objects) + @db.insert_into_db(@name, objects) + end + + def remove(selector={}) + @db.remove_from_db(@name, selector) + end + + def clear + remove({}) + end + + def repsert(selector, obj) + @db.repsert_in_db(@name, selector, obj) + end + + def replace(selector, obj) + @db.replace_in_db(@name, selector, obj) + end + + def modify(selector, modifierObj) + raise "no object" unless modifierObj + raise "no selector" unless selector + @db.modify_in_db(@name, selector, modifierObj) + end + + def create_index(name, *fields) + @db.create_index(@name, name, fields) + end + + def drop_index(name) + @db.drop_index(@name, name) + end + + def drop_indexes + index_information.each { |info| @db.drop_index(@name, info.name) } + end + + def index_information + @db.index_information(@name) + end + + def count(selector={}) + @db.count(@name, selector || {}) + end + + end + end + end +end + diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb new file mode 100644 index 0000000..0ea1b07 --- /dev/null +++ b/lib/mongo/cursor.rb @@ -0,0 +1,127 @@ +# Copyright (C) 2008 10gen Inc. +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License, version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License +# for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +require 'mongo/message' +require 'mongo/util/byte_buffer' +require 'mongo/util/bson' + +module XGen + module Mongo + module Driver + + class Cursor + + include Enumerable + + RESPONSE_HEADER_SIZE = 20 + + def initialize(db, collection) + @db, @collection = db, collection + @objects = [] + @closed = false + read_all + end + + def more? + num_remaining > 0 + end + + def next_object + refill_via_get_more if num_remaining == 0 + @objects.shift + end + + def each + while more? + yield next_object() + end + end + + def close + @db.send_to_db(KillCursorMessage(@cursor_id)) if @cursor_id + @objects = [] + @cursor_id = 0 + @closed = true + end + + protected + + def read_all + read_message_header + read_response_header + read_objects_off_wire + end + + def read_objects_off_wire + while doc = next_object_on_wire + @objects << doc + end + end + + def read_message_header + MessageHeader.new.read_header(@db.socket) + end + + def read_response_header + header_buf = ByteBuffer.new + header_buf.put_array(@db.socket.recv(RESPONSE_HEADER_SIZE).unpack("C*")) + raise "Short read for DB response header; expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}" unless header_buf.length == RESPONSE_HEADER_SIZE + header_buf.rewind + @result_flags = header_buf.get_int + @cursor_id = header_buf.get_long + @starting_from = header_buf.get_int + @n_returned = header_buf.get_int + @n_remaining = @n_returned + end + + def num_remaining + refill_via_get_more if @objects.length == 0 + @objects.length + end + + private + + def next_object_on_wire + # if @n_remaining is 0 but we have a non-zero cursor, there are more + # to fetch, so do a GetMore operation, but don't do it here - do it + # when someone pulls an object out of the cache and it's empty + return nil if @n_remaining == 0 + object_from_stream + end + + def refill_via_get_more + return if @cursor_id == 0 + @db.send_to_db(GetMoreMessage.new(@db.name, @collection, @cursor_id)) + read_all + end + + def object_from_stream + buf = ByteBuffer.new + buf.put_array(@db.socket.recv(4).unpack("C*")) + buf.rewind + size = buf.get_int + buf.put_array(@db.socket.recv(size-4).unpack("C*"), 4) + @n_remaining -= 1 + buf.rewind + BSON.new.deserialize(buf) + end + + def to_s + "DBResponse(flags=#@result_flags, cursor_id=#@cursor_id, start=#@starting_from, n_returned=#@n_returned)" + end + end + end + end +end + diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb new file mode 100644 index 0000000..a3f6a64 --- /dev/null +++ b/lib/mongo/db.rb @@ -0,0 +1,176 @@ +# Copyright (C) 2008 10gen Inc. +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License, version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License +# for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +require 'socket' +require 'mongo/collection' +require 'mongo/message' +require 'mongo/query' + +module XGen + module Mongo + module Driver + + class DB + SYSTEM_NAMESPACE_COLLECTION = "system.namespaces" + SYSTEM_INDEX_COLLECTION = "system.indexes" + SYSTEM_COMMAND_COLLECTION = "$cmd" + + attr_reader :name, :socket + + def initialize(db_name, host, port) + raise "Invalid DB name" if !db_name || (db_name && db_name.length > 0 && db_name.include?(".")) + @name, @host, @port = db_name, host, port + @socket = TCPSocket.new(@host, @port) + end + + def collection_names + names = collections_info.collect { |doc| doc['name'] || '' } + names.delete('') + names + end + + def collections_info(coll_name=nil) + selector = {} + selector[:name] = "#{@name}.#{coll_name}" if coll_name + query(SYSTEM_NAMESPACE_COLLECTION, Query.new(selector)) + end + + def create_collection(name, options={}) + # First check existence + return Collection.new(self, name) if collection_names.include?(name) + + # Create new collection + sel = {:create => name}.merge(options) + doc = db_command(sel) + o = doc['ok'] + return Collection.new(self, name) if o.kind_of?(Numeric) && (o.to_i == 1 || o.to_i == 0) + raise "Error creating collection: #{doc.inspect}" + end + + def admin + Admin.new(self) + end + + def collection(name) + create_collection(name) + end + + def drop_collection(name) + coll = collection(name) + return true if coll == nil + col.drop_indexes + + doc = db_command(:drop => name) + o = md['ok'] + return o.kind_of?(Numeric) && o.to_i == 1 + end + + def close + @socket.close + end + + def send_message(msg) + send_to_db(MsgMessage.new(msg)) + end + + def query(collection, query) + # TODO synchronize + send_to_db(QueryMessage.new(@name, collection, query)) + return Cursor.new(self, collection) + end + + def remove_from_db(collection, selector) + # TODO synchronize + send_to_db(RemoveMessage.new(@name, collection, selector)) + end + + def replace_in_db(collection, selector, obj) + # TODO synchronize + send_to_db(UpdateMessage.new(@name, collection, selector, obj, false)) + end + alias_method :modify_in_db, :replace_in_db + + def repsert_in_db(collection, selector, obj) + # TODO if PKInjector, inject + # TODO synchronize + send_to_db(UpdateMessage.new(@name, collection, selector, obj, true)) + obj + end + + def count(collection, selector) + doc = db_command(:count => collection, :query => selector) + o = doc['ok'] + return doc['n'].to_i if o.to_i == 1 + raise "Error with count command: #{doc.to_s}" unless o.kind_of?(Numeric) + end + + def drop_index(collection, name) + db_command(:deleteIndexes => collection, :index => name) + end + + def index_information(collection) + sel = {:ns => full_coll_name(collection)} + # TODO synchronize + query(SYSTEM_INDEX_COLLECTION, Query.new(sel)).collect { |row| + h = {:name => row['name']} + raise "Name of index on return from db was nil. Coll = #{full_coll_name(collection)}" unless h[:name] + + h[:keys] = row['keys'] + raise "Keys for index on return from db was nil. Coll = #{full_coll_name(collection)}" unless h[:keys] + + h[:ns] = row['ns'] + raise "Namespace for index on return from db was nil. Coll = #{full_coll_name(collection)}" unless h[:ns] + h[:ns].sub!(/.*\./, '') + raise "Error: ns != collection" unless h[:ns] == collection + + h + } + end + + def create_index(collection, name, fields) + sel = {:name => name, :ns => full_coll_name(collection)} + field_h = {} + fields.each { |f| field_h[f] = 1 } + sel['key'] = field_h + # TODO synchronize + send_to_db(InsertMessage.new(@name, SYSTEM_INDEX_COLLECTION, sel)) + end + + def insert_into_db(collection, objects) + # TODO synchronize + objects.each { |o| send_to_db(InsertMessage.new(@name, collection, o)) } + end + + def send_to_db(message) + @socket.print(message.buf.to_s) + end + + protected + + def full_coll_name(collection) + "#{@name}.#{collection}" + end + + def db_command(selector) + # TODO synchronize + q = Query.new(selector) + q.number_to_return = 1 + query(SYSTEM_COMMAND_COLLECTION, q).next_object + end + + end + end + end +end + diff --git a/lib/mongo/message.rb b/lib/mongo/message.rb new file mode 100644 index 0000000..461e35a --- /dev/null +++ b/lib/mongo/message.rb @@ -0,0 +1,4 @@ +%w(get_more_message insert_message kill_cursors_message message_header + msg_message query_message remove_message update_message).each { |f| + require "mongo/message/#{f}" +} diff --git a/lib/mongo/message/get_more_message.rb b/lib/mongo/message/get_more_message.rb new file mode 100644 index 0000000..15ee19f --- /dev/null +++ b/lib/mongo/message/get_more_message.rb @@ -0,0 +1,21 @@ +require 'mongo/message/message' +require 'mongo/message/opcodes' + +module XGen + module Mongo + module Driver + + class GetMoreMessage < Message + + def initialize(name, collection, cursor) + super(OP_GET_MORE) + write_int(0) + write_string("#{name}.#{collection}") + write_int(0) # num to return; leave it up to the db for now + write_long(cursor) + end + end + end + end +end + diff --git a/lib/mongo/message/insert_message.rb b/lib/mongo/message/insert_message.rb new file mode 100644 index 0000000..5d809ec --- /dev/null +++ b/lib/mongo/message/insert_message.rb @@ -0,0 +1,20 @@ +require 'mongo/message/message' +require 'mongo/message/opcodes' + +module XGen + module Mongo + module Driver + + class InsertMessage < Message + + def initialize(name, collection, *objs) + super(OP_INSERT) + write_int(0) + write_string("#{name}.#{collection}") + objs.each { |o| write_doc(o) } + end + end + end + end +end + diff --git a/lib/mongo/message/kill_cursors_message.rb b/lib/mongo/message/kill_cursors_message.rb new file mode 100644 index 0000000..4f9adfa --- /dev/null +++ b/lib/mongo/message/kill_cursors_message.rb @@ -0,0 +1,20 @@ +require 'mongo/message/message' +require 'mongo/message/opcodes' + +module XGen + module Mongo + module Driver + + class KillCursorsMessage < Message + + def initialize(*cursors) + super(OP_KILL_CURSORS) + write_int(0) + write_int(cursors.length) + cursors.each { |c| write_long c } + end + end + end + end +end + diff --git a/lib/mongo/message/message.rb b/lib/mongo/message/message.rb new file mode 100644 index 0000000..84ce7aa --- /dev/null +++ b/lib/mongo/message/message.rb @@ -0,0 +1,68 @@ +require 'mongo/util/bson' +require 'mongo/util/byte_buffer' + +module XGen + module Mongo + module Driver + + class Message + + HEADER_SIZE = 16 # size, id, response_to, opcode + + @@class_req_id = 0 + + attr_reader :buf # for testing + + def initialize(op) + @op = op + @message_length = HEADER_SIZE + @data_length = 0 + @request_id = (@@class_req_id += 1) + @response_id = 0 + @buf = ByteBuffer.new + + @buf.put_int(16) # holder for length + @buf.put_int(@@class_req_id += 1) + @buf.put_int(0) # response_to + @buf.put_int(op) + end + + def write_int(i) + @buf.put_int(i) + update_message_length + end + + def write_long(i) + @buf.put_long(i) + update_message_length + end + + def write_string(s) + BSON.serialize_cstr(@buf, s) + update_message_length + end + + def write_doc(hash) + @buf.put_array(BSON.new.serialize(hash).to_a) + update_message_length + end + + def to_a + @buf.to_a + end + + def dump + @buf.dump + end + + # Do not call. Private, but kept public for testing. + def update_message_length + pos = @buf.position + @buf.put_int(@buf.size, 0) + @buf.position = pos + end + + end + end + end +end diff --git a/lib/mongo/message/message_header.rb b/lib/mongo/message/message_header.rb new file mode 100644 index 0000000..19cb837 --- /dev/null +++ b/lib/mongo/message/message_header.rb @@ -0,0 +1,34 @@ +require 'mongo/util/byte_buffer' + +module XGen + module Mongo + module Driver + + class MessageHeader + + HEADER_SIZE = 16 + + def initialize() + @buf = ByteBuffer.new + end + + def read_header(socket) + @buf.rewind + @buf.put_array(socket.recv(HEADER_SIZE).unpack("C*")) + raise "Short read for DB response header: expected #{HEADER_SIZE} bytes, saw #{@buf.size}" unless @buf.size == HEADER_SIZE + @buf.rewind + @size = @buf.get_int + @request_id = @buf.get_int + @response_to = @buf.get_int + @op = @buf.get_int + self + end + + def dump + @buf.dump + end + end + end + end +end + diff --git a/lib/mongo/message/msg_message.rb b/lib/mongo/message/msg_message.rb new file mode 100644 index 0000000..a7f3702 --- /dev/null +++ b/lib/mongo/message/msg_message.rb @@ -0,0 +1,17 @@ +require 'mongo/message/message' +require 'mongo/message/opcodes' + +module XGen + module Mongo + module Driver + + class MsgMessage < Message + + def initialize(msg) + super(OP_MSG) + write_string(msg) + end + end + end + end +end diff --git a/lib/mongo/message/opcodes.rb b/lib/mongo/message/opcodes.rb new file mode 100644 index 0000000..9980400 --- /dev/null +++ b/lib/mongo/message/opcodes.rb @@ -0,0 +1,16 @@ +module XGen + module Mongo + module Driver + OP_REPLY = 1 # reply. responseTo is set. + OP_MSG = 1000 # generic msg command followed by a string + OP_UPDATE = 2001 # update object + OP_INSERT = 2002 + # GET_BY_OID = 2003 + OP_QUERY = 2004 + OP_GET_MORE = 2005 + OP_DELETE = 2006 + OP_KILL_CURSORS = 2007 + end + end +end + diff --git a/lib/mongo/message/query_message.rb b/lib/mongo/message/query_message.rb new file mode 100644 index 0000000..07c2f51 --- /dev/null +++ b/lib/mongo/message/query_message.rb @@ -0,0 +1,22 @@ +require 'mongo/message/message' +require 'mongo/message/opcodes' + +module XGen + module Mongo + module Driver + + class QueryMessage < Message + + def initialize(name, collection, query) + super(OP_QUERY) + write_int(0) + write_string("#{name}.#{collection}") + write_int(query.number_to_skip) + write_int(query.number_to_return) + write_doc(query.selector) + write_doc(query.fields) if query.fields + end + end + end + end +end diff --git a/lib/mongo/message/remove_message.rb b/lib/mongo/message/remove_message.rb new file mode 100644 index 0000000..abfba39 --- /dev/null +++ b/lib/mongo/message/remove_message.rb @@ -0,0 +1,20 @@ +require 'mongo/message/message' +require 'mongo/message/opcodes' + +module XGen + module Mongo + module Driver + + class RemoveMessage < Message + + def initialize(name, collection, sel) + super(OP_DELETE) + write_int(0) + write_string("#{name}.#{collection}") + write_int(0) # flags? + write_doc(sel) + end + end + end + end +end diff --git a/lib/mongo/message/update_message.rb b/lib/mongo/message/update_message.rb new file mode 100644 index 0000000..f1e9fb6 --- /dev/null +++ b/lib/mongo/message/update_message.rb @@ -0,0 +1,21 @@ +require 'mongo/message/message' +require 'mongo/message/opcodes' + +module XGen + module Mongo + module Driver + + class UpdateMessage < Message + + def initialize(name, collection, sel, obj, repsert) + super(OP_UPDATE) + write_int(0) + write_string("#{name}.#{collection}") + write_int(repsert ? 1 : 0) # 1 if a repsert operation (upsert) + write_doc(sel) + write_doc(obj) + end + end + end + end +end diff --git a/lib/mongo/mongo.rb b/lib/mongo/mongo.rb new file mode 100644 index 0000000..773c1b4 --- /dev/null +++ b/lib/mongo/mongo.rb @@ -0,0 +1,45 @@ +# Copyright (C) 2008 10gen Inc. +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License, version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License +# for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +require 'mongo/db' + +module XGen + module Mongo + module Driver + + class Mongo + + DEFAULT_PORT = 27017 + + def initialize(host='localhost', port=DEFAULT_PORT) + @host, @port = host, port + end + + def db(db_name) + XGen::Mongo::Driver::DB.new(db_name, @host, @port) + end + + def clone_database(from) + raise "not implemented" + end + + def copy_database(from_host, from_db, to_db) + raise "not implemented" + end + + end + end + end +end + diff --git a/lib/mongo/query.rb b/lib/mongo/query.rb new file mode 100644 index 0000000..259e762 --- /dev/null +++ b/lib/mongo/query.rb @@ -0,0 +1,52 @@ +# Copyright (C) 2008 10gen Inc. +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License, version 3, as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License +# for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +require 'socket' +require 'mongo/collection' +require 'mongo/message' + +module XGen + module Mongo + module Driver + + class Query + + attr_accessor :number_to_skip, :number_to_return, :order_by + attr_reader :selector, :fields # writers defined below + + def initialize(sel={}, return_fields=nil, number_to_skip=0, number_to_return=0, order_by=nil) + @number_to_skip, @number_to_return, @order_by = number_to_skip, number_to_return, order_by + self.selector = sel + self.fields = return_fields + end + + def selector=(sel) + @selector = case selector + when nil + {} + when String + {"$where" => "function() { return #{sel}; }"} + when Hash + sel + end + end + + def fields=(val) + @fields = val + @fields = nil if @fields && @fields.empty? + end + end + end + end +end diff --git a/lib/mongo/util/bson.rb b/lib/mongo/util/bson.rb new file mode 100644 index 0000000..84ffbe6 --- /dev/null +++ b/lib/mongo/util/bson.rb @@ -0,0 +1,257 @@ +require 'mongo/util/byte_buffer' + +class BSON + + EOO = 0 # x + MAXKEY = -1 # x + NUMBER = 1 # x t + STRING = 2 # x t + OBJECT = 3 # x t + ARRAY = 4 + BINARY = 5 + UNDEFINED = 6 + OID = 7 # x + BOOLEAN = 8 # x t + DATE = 9 # x t + NULL = 10 # x t + REGEX = 11 + REF = 12 + CODE = 13 + SYMBOL = 14 + CODE_W_SCOPE = 15 + NUMBER_INT = 16 + + def self.serialize_cstr(buf, val) + buf.put_array(val.to_s.unpack("C*") + [0]) + end + + def initialize + @private_buf = ByteBuffer.new + @buf = ByteBuffer.new + end + + def to_a + @buf.to_a + end + + def serialize(obj) + raise "Document is null" unless obj + + @buf.rewind + # put in a placeholder for the total size + @buf.put_int(0) + + obj.each {|k, v| + type = bson_type(v, k) + case type + when STRING, CODE + serialize_string_element(@buf, k, v, type) + when NUMBER, NUMBER_INT + serialize_number_element(@buf, k, v, type) + when OBJECT + serialize_object_element(@buf, k, v) + when OID + serialize_oid_element(@buf, k, v) + when BOOLEAN + serialize_boolean_element(@buf, k, v) + when DATE + serialize_date_element(@buf, k, v) + when NULL + serialize_null_element(@buf, k) + else + raise "Unhandled Type #{type}" + end + } + serialize_eoo_element(@buf) + @buf.put_int(@buf.size, 0) + self + end + + def deserialize(buf) + @buf = ByteBuffer.new(buf.to_a) + @buf.rewind + @buf.get_int # eat message size + doc = {} + while @buf.more? + type = @buf.get + case type + when STRING + key = deserialize_element_name(@buf) + doc[key] = deserialize_string_data(@buf) + when NUMBER + key = deserialize_element_name(@buf) + doc[key] = deserialize_number_data(@buf) + when NUMBER_INT + key = deserialize_element_name(@buf) + doc[key] = deserialize_number_int_data(@buf) + when OID + key = deserialize_element_name(@buf) + doc[key] = deserialize_oid_data(@buf) + when OBJECT + key = deserialize_element_name(@buf) + doc[key] = deserialize_object_data(@buf) + when BOOLEAN + key = deserialize_element_name(@buf) + doc[key] = deserialize_boolean_data(@buf) + when DATE + key = deserialize_element_name(@buf) + doc[key] = deserialize_date_data(@buf) + when NULL + key = deserialize_element_name(@buf) + doc[key] = nil + when EOO + break + else + raise "Unknown type #{type}, key = #{key}" + end + end + @buf.rewind + doc + end + + def hex_dump + str = '' + @buf.to_a.each_with_index { |b,i| + if (i % 8) == 0 + str << "\n" if i > 0 + str << '%4d: ' % i + else + str << ' ' + end + str << '%02X' % b + } + str + end + + def deserialize_date_data(buf) + Time.at(buf.get_long) + end + + def deserialize_boolean_data(buf) + buf.get == 1 + end + + def deserialize_number_data(buf) + buf.get_double + end + + def deserialize_number_int_data(buf) + buf.get_int + end + + def deserialize_object_data(buf) + size = buf.get_int + buf.position -= 4 + BSON.new.deserialize(buf.get(size)) + end + + def deserialize_string_data(buf) + len = buf.get_int + bytes = buf.get(len) + bytes[0..-2].pack("C*") + end + + def deserialize_oid_data + ObjectID.new(buf.get(12).pack("C*")) + end + + def serialize_eoo_element(buf) + buf.put(EOO) + end + + def serialize_null_element(buf, key) + buf.put(NULL) + self.class.serialize_cstr(buf, key) + end + + def serialize_boolean_element(buf, key, val) + buf.put(BOOLEAN) + self.class.serialize_cstr(buf, key) + buf.put(val ? 1 : 0) + end + + def serialize_date_element(buf, key, val) + buf.put(DATE) + self.class.serialize_cstr(buf, key) + buf.put_long(val.to_i) + end + + def serialize_number_element(buf, key, val, type) + buf.put(type) + self.class.serialize_cstr(buf, key) + if type == NUMBER + buf.put_double(val) + else + buf.put_int(val) + end + end + + def serialize_object_element(buf, key, val) + buf.put(OBJECT) + self.class.serialize_cstr(buf, key) + BSON.new(buf).serialize(val) + end + + def serialize_oid_element(buf, key, val) + buf.put(OID) + self.class.serialize_cstr(buf, key) + buf.put_array(val.to_a) + end + + def serialize_string_element(buf, key, val, type) + buf.put(type) + self.class.serialize_cstr(buf, key) + + # Make a hole for the length + len_pos = buf.position + buf.put_int(0) + + # Save the string + start_pos = buf.position + self.class.serialize_cstr(buf, val) + end_pos = buf.position + + # Put the string size in front + buf.put_int(end_pos - start_pos - 1, len_pos) + + # Go back to where we were + buf.position = end_pos + end + + def deserialize_element_name(buf) + chars = "" + while 1 + b = buf.get + break if b == 0 + chars << b.chr + end + chars + end + + def bson_type(o, key) + case o + when nil + NULL + when Integer + NUMBER_INT + when Numeric + NUMBER + when String + # magic awful stuff - the DB requires that a where clause is sent as CODE + key == "$where" ? CODE : STRING + when Array + ARRAY + when ObjectID + OID + when true, false + Boolean + when Time + DATE + when Hash + OBJECT + else + raise "Unknown type of object: #{o.class.name}" + end + end + +end diff --git a/lib/mongo/util/byte_buffer.rb b/lib/mongo/util/byte_buffer.rb new file mode 100644 index 0000000..55ef39c --- /dev/null +++ b/lib/mongo/util/byte_buffer.rb @@ -0,0 +1,132 @@ +class ByteBuffer + + attr_reader :order + + def initialize(initial_data=[]) + @buf = initial_data + @cursor = 0 + self.order = :little_endian + end + + # +endianness+ should be :little_endian or :big_endian. Default is :little_endian + def order=(endianness) + @order = endianness + @int_pack_order = endianness == :little_endian ? 'V' : 'N' + @double_pack_order = endianness == :little_endian ? 'E' : 'G' + end + + def rewind + @cursor = 0 + end + + def position + @cursor + end + + def position=(val) + @cursor = val + end + + def clear + @buf = [] + rewind + end + + def size + @buf.size + end + alias_method :length, :size + + def put(byte, offset=nil) + @cursor = offset if offset + @buf[@cursor] = byte + @cursor += 1 + end + + def put_array(array, offset=nil) + @cursor = offset if offset + @buf[@cursor, array.length] = array + @cursor += array.length + end + + def put_int(i, offset=nil) + put_array([i].pack(@int_pack_order).split(//).collect{|c| c[0]}, offset) + end + + def put_long(i, offset=nil) + offset = @cursor unless offset + if @int_pack_order == 'N' + put_int(i >> 32, offset) + put_int(i & 0xffffffff, offset + 4) + else + put_int(i & 0xffffffff, offset) + put_int(i >> 32, offset + 4) + end + end + + def put_double(d, offset=nil) + put_array([d].pack(@double_pack_order).split(//), offset) + end + + # If +size+ == 1, returns one byte. Else returns array of bytes of length + # +size+. + def get(len=1) + check_read_length(len) + start = @cursor + @cursor += len + if len == 1 + @buf[start] + else + @buf[start, len] + end + end + + def get_int + check_read_length(4) + vals = "" + (@cursor..@cursor+3).each { |i| vals << @buf[i].chr } + @cursor += 4 + vals.unpack(@int_pack_order)[0] + end + + def get_long + i1 = get_int + i2 = get_int + if @int_pack_order == 'N' + (i1 << 32) + i2 + else + (i2 << 32) + i1 + end + end + + def get_double + check_read_length(8) + vals = "" + (@cursor..@cursor+7).each { |i| vals << @buf[i].chr } + @cursor += 8 + vals.unpack(@double_pack_order)[0] + end + + def more? + @cursor < @buf.size + end + + def to_a + @buf + end + + def to_s + @buf.pack("C*") + end + + def dump + @buf.each_with_index { |c, i| $stderr.puts "#{'%04d' % i}: #{'%02x' % c} #{'%03o' % c} #{'%s' % c.chr} #{'%3d' % c}" } + end + + private + + def check_read_length(len) + raise "attempt to read past end of buffer" if @cursor + len > @buf.length + end + +end diff --git a/tests/test_byte_buffer.rb b/tests/test_byte_buffer.rb new file mode 100644 index 0000000..63ac6d9 --- /dev/null +++ b/tests/test_byte_buffer.rb @@ -0,0 +1,45 @@ +$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') +require 'mongo' +require 'test/unit' + +class ByteBufferTest < Test::Unit::TestCase + + def setup + @buf = ByteBuffer.new + end + + def test_empty + assert_equal 0, @buf.length + end + + def test_length + @buf.put_int 3 + assert_equal 4, @buf.length + end + + def test_default_order + assert_equal :little_endian, @buf.order + end + + def test_long_length + @buf.put_long 1027 + assert_equal 8, @buf.length + end + + def test_get_long + @buf.put_long 1027 + @buf.rewind + assert_equal 1027, @buf.get_long + end + + def test_rewrite + @buf.put_int(0) + @buf.rewind + @buf.put_int(1027) + assert_equal 4, @buf.length + @buf.rewind + assert_equal 1027, @buf.get_int + assert_equal 4, @buf.position + end + +end diff --git a/tests/test_db_connection.rb b/tests/test_db_connection.rb new file mode 100644 index 0000000..53a48ac --- /dev/null +++ b/tests/test_db_connection.rb @@ -0,0 +1,13 @@ +$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') +require 'mongo' +require 'test/unit' + +# NOTE: assumes Mongo is running +class DBConnectionTest < Test::Unit::TestCase + + def test_no_exceptions + db = XGen::Mongo::Driver::Mongo.new.db('ruby-mongo-demo') + coll = db.collection('test') + coll.clear + end +end diff --git a/tests/test_message.rb b/tests/test_message.rb new file mode 100644 index 0000000..5eac81c --- /dev/null +++ b/tests/test_message.rb @@ -0,0 +1,33 @@ +$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') +require 'mongo' +require 'test/unit' + +class MessageTest < Test::Unit::TestCase + + def setup + @msg = XGen::Mongo::Driver::Message.new(42) + end + + def test_initial_info + assert_equal XGen::Mongo::Driver::Message::HEADER_SIZE, @msg.buf.length + @msg.write_long(1029) + @msg.buf.rewind + assert_equal XGen::Mongo::Driver::Message::HEADER_SIZE + 8, @msg.buf.get_int + @msg.buf.get_int # skip message id + assert_equal 0, @msg.buf.get_int + assert_equal 42, @msg.buf.get_int + assert_equal 1029, @msg.buf.get_long + end + + def test_update_length + @msg.update_message_length + @msg.buf.rewind + assert_equal XGen::Mongo::Driver::Message::HEADER_SIZE, @msg.buf.get_int + end + + def test_long_length + @msg.write_long(1027) + assert_equal XGen::Mongo::Driver::Message::HEADER_SIZE + 8, @msg.buf.length + end + +end