diff --git a/lib/mongo.rb b/lib/mongo.rb index 2ab18df..44eb9e4 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -3,6 +3,8 @@ require 'mongo/types/dbref' require 'mongo/types/objectid' require 'mongo/types/regexp_of_holding' +require 'mongo/util/conversions' + require 'mongo/errors' require 'mongo/connection' require 'mongo/message' diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index cbd3df1..14c354d 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -20,6 +20,7 @@ module Mongo # A cursor over query results. Returned objects are hashes. class Cursor + include Mongo::Conversions include Enumerable @@ -246,11 +247,22 @@ module Mongo end def refill_via_get_more - if send_query_if_needed or @cursor_id == 0 - return - end + return if send_query_if_needed || @cursor_id.zero? @db._synchronize { - @db.send_to_db(GetMoreMessage.new(@admin ? 'admin' : @db.name, @collection.name, @cursor_id)) + message = ByteBuffer.new + # Reserved. + message.put_int(0) + + # DB name. + db_name = @admin ? 'admin' : @db.name + BSON.serialize_cstr(message, "#{db_name}.#{@collection.name}") + + # Number of results to return; db decides for now. + message.put_int(0) + + # Cursor id. + message.put_long(@cursor_id) + @db.send_message_with_operation_without_synchronize(OP_GET_MORE, message) read_all } end @@ -272,14 +284,55 @@ module Mongo false else @db._synchronize { - @db.send_query_message(QueryMessage.new(@admin ? 'admin' : @db.name, @collection.name, @query)) + message = construct_query_message(@query) @query_run = true + @db.send_message_with_operation_without_synchronize(OP_QUERY, message) read_all } true end end + def construct_query_message(query) + message = ByteBuffer.new + message.put_int(query.query_opts) + db_name = @admin ? 'admin' : @db.name + BSON.serialize_cstr(message, "#{db_name}.#{@collection.name}") + message.put_int(query.number_to_skip) + message.put_int(query.number_to_return) + sel = query.selector + if query.contains_special_fields + sel = add_special_query_fields(sel, query) + end + message.put_array(BSON.new.serialize(sel).to_a) + message.put_array(BSON.new.serialize(query.fields).to_a) if query.fields + message + end + + def add_special_query_fields(sel, query) + sel = OrderedHash.new + sel['query'] = query.selector + order_by = query.order_by + sel['orderby'] = get_query_order_by(order_by) if order_by + sel['$hint'] = query.hint if query.hint && query.hint.length > 0 + sel['$explain'] = true if query.explain + sel['$snapshot'] = true if query.snapshot + sel + end + + def get_query_order_by(order_by) + case order_by + when String then string_as_sort_parameters(order_by) + when Symbol then symbol_as_sort_parameters(order_by) + when Array then array_as_sort_parameters(order_by) + when Hash # Should be an ordered hash, but this message doesn't care + warn_if_deprecated(order_by) + order_by + else + raise InvalidSortValueError, "Illegal order_by, '#{query.order_by.class.name}'; must be String, Array, Hash, or OrderedHash" + end + end + def to_s "DBResponse(flags=#@result_flags, cursor_id=#@cursor_id, start=#@starting_from)" end diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index df7efb8..09e1341 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -372,11 +372,6 @@ module Mongo Cursor.new(self, collection, query, admin) end - # Used by a Cursor to lazily send the query to the database. - def send_query_message(query_message) - send_to_db(query_message) - end - # Dereference a DBRef, getting the document it points to. def dereference(dbref) collection(dbref.namespace).find_one("_id" => dbref.object_id) @@ -460,7 +455,7 @@ module Mongo # Takes a MongoDB opcode, +operation+, and a message of class ByteBuffer, # +message+, and sends the message to the databse, adding the necessary headers. def send_message_with_operation(operation, message) - _synchronize do + @semaphore.synchronize do connect_to_master if !connected? && @auto_reconnect begin message_with_headers = add_message_headers(operation, message) @@ -474,6 +469,26 @@ module Mongo end end + def send_message_with_operation_without_synchronize(operation, message) + connect_to_master if !connected? && @auto_reconnect + begin + message_with_headers = add_message_headers(operation, message) + @logger.debug(" MONGODB #{operation} #{message}") if @logger + @socket.print(message_with_headers.to_s) + @socket.flush + rescue => ex + close + raise ex + end + end + + def receive_message_with_operation(operation, message) + @semaphore.synchronize do + + + end + end + # Return +true+ if +doc+ contains an 'ok' field with the value 1. def ok?(doc) ok = doc['ok'] diff --git a/lib/mongo/errors.rb b/lib/mongo/errors.rb index 7fd6b43..d4ce9ef 100644 --- a/lib/mongo/errors.rb +++ b/lib/mongo/errors.rb @@ -31,5 +31,5 @@ module Mongo class InvalidName < RuntimeError; end # Raised when the client supplies an invalid value to sort by. - class InvalidSortValueError < RuntimeError; end + class InvalidSortValueError < MongoRubyError; end end diff --git a/lib/mongo/message.rb b/lib/mongo/message.rb index 928915d..41b1c7e 100644 --- a/lib/mongo/message.rb +++ b/lib/mongo/message.rb @@ -14,7 +14,7 @@ # limitations under the License. # ++ -%w(get_more_message kill_cursors_message message_header - msg_message query_message).each { |f| +%w(kill_cursors_message message_header + msg_message).each { |f| require "mongo/message/#{f}" } diff --git a/lib/mongo/message/get_more_message.rb b/lib/mongo/message/get_more_message.rb deleted file mode 100644 index f45198b..0000000 --- a/lib/mongo/message/get_more_message.rb +++ /dev/null @@ -1,32 +0,0 @@ -# -- -# Copyright (C) 2008-2009 10gen Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ++ - -require 'mongo/message/message' -require 'mongo/message/opcodes' - -module Mongo - - class GetMoreMessage < Message - - def initialize(db_name, collection_name, cursor) - super(OP_GET_MORE) - write_int(0) - write_string("#{db_name}.#{collection_name}") - write_int(0) # num to return; leave it up to the db for now - write_long(cursor) - end - end -end diff --git a/lib/mongo/message/query_message.rb b/lib/mongo/message/query_message.rb deleted file mode 100644 index bfcb86f..0000000 --- a/lib/mongo/message/query_message.rb +++ /dev/null @@ -1,69 +0,0 @@ -# -- -# Copyright (C) 2008-2009 10gen Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ++ - -require 'mongo/message/message' -require 'mongo/message/opcodes' -require 'mongo/util/conversions' -require 'mongo/util/ordered_hash' - -module Mongo - class QueryMessage < Message - include Mongo::Conversions - - attr_reader :query - - def initialize(db_name, collection_name, query) - super(OP_QUERY) - @query = query - @collection_name = collection_name - write_int(@query.query_opts) - write_string("#{db_name}.#{collection_name}") - write_int(query.number_to_skip) - write_int(query.number_to_return) - sel = query.selector - if query.contains_special_fields - sel = OrderedHash.new - sel['query'] = query.selector - if query.order_by - order_by = query.order_by - sel['orderby'] = case order_by - when String then string_as_sort_parameters(order_by) - when Symbol then symbol_as_sort_parameters(order_by) - when Array then array_as_sort_parameters(order_by) - when Hash # Should be an ordered hash, but this message doesn't care - warn_if_deprecated(order_by) - order_by - else - raise InvalidSortValueError.new("illegal order_by: is a #{query.order_by.class.name}, must be String, Array, Hash, or OrderedHash") - end - end - sel['$hint'] = query.hint if query.hint && query.hint.length > 0 - sel['$explain'] = true if query.explain - sel['$snapshot'] = true if query.snapshot - end - write_doc(sel) - write_doc(query.fields) if query.fields - end - - def first_key(key) - @first_key = key - end - - def to_s - "db.#{@collection_name}.#{@query}" - end - end -end diff --git a/test/messages/test_query_message.rb b/test/messages/test_query_message.rb deleted file mode 100644 index 572a1f9..0000000 --- a/test/messages/test_query_message.rb +++ /dev/null @@ -1,58 +0,0 @@ -# -- -# Copyright (C) 2008-2009 10gen Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ++ - -$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') -require 'mongo' -require 'test/unit' - -class TestQueryMessage < Test::Unit::TestCase - - include Mongo - - def test_timeout_opcodes - @timeout = true - @query = Query.new({}, nil, 0, 0, nil, nil, nil, @timeout) - @query_message = QueryMessage.new('db', 'collection', @query) - buf = @query_message.buf.instance_variable_get(:@buf) - assert_equal 0, buf[16] - - - @timeout = false - @query = Query.new({}, nil, 0, 0, nil, nil, nil, @timeout) - @query_message = QueryMessage.new('db', 'collection', @query) - buf = @query_message.buf.instance_variable_get(:@buf) - assert_equal 16, buf[16] - end - - def test_timeout_opcodes - @timeout = true - @slave_ok = true - @query = Query.new({}, nil, 0, 0, nil, nil, nil, @timeout, @slave_ok) - @query_message = QueryMessage.new('db', 'collection', @query) - buf = @query_message.buf.instance_variable_get(:@buf) - assert_equal 4, buf[16] - - - @timeout = false - @slave_ok = true - @query = Query.new({}, nil, 0, 0, nil, nil, nil, @timeout, @slave_ok) - @query_message = QueryMessage.new('db', 'collection', @query) - buf = @query_message.buf.instance_variable_get(:@buf) - assert_equal 20, buf[16] - end - - -end diff --git a/test/test_connection.rb b/test/test_connection.rb index 790bd58..ea88468 100644 --- a/test/test_connection.rb +++ b/test/test_connection.rb @@ -61,10 +61,8 @@ class TestConnection < Test::Unit::TestCase logger = Logger.new(output) logger.level = Logger::DEBUG db = Connection.new(@host, @port, :logger => logger).db('ruby-mongo-test') - db['test'].find().to_a - assert output.string.include?("db.test.find") - assert !output.string.include?("db.test.remove") + assert output.string.include?("2004") end def test_connection_logger