From de5c078cec164094c7fdc4f6a545e64f1c534fd6 Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Wed, 21 Oct 2009 10:11:33 -0400 Subject: [PATCH] Finished removing Message classes. Made Cursor#close threadsafe. --- Rakefile | 2 +- lib/mongo.rb | 2 +- lib/mongo/collection.rb | 8 +-- lib/mongo/constants.rb | 15 +++++ lib/mongo/cursor.rb | 41 ++++++++---- lib/mongo/db.rb | 26 +++----- lib/mongo/message.rb | 20 ------ lib/mongo/message/kill_cursors_message.rb | 31 --------- lib/mongo/message/message.rb | 80 ----------------------- lib/mongo/message/message_header.rb | 45 ------------- lib/mongo/message/msg_message.rb | 29 -------- lib/mongo/message/opcodes.rb | 30 --------- lib/mongo/query.rb | 7 +- mongo-ruby-driver.gemspec | 10 +-- test/test_message.rb | 35 ---------- 15 files changed, 64 insertions(+), 317 deletions(-) create mode 100644 lib/mongo/constants.rb delete mode 100644 lib/mongo/message.rb delete mode 100644 lib/mongo/message/kill_cursors_message.rb delete mode 100644 lib/mongo/message/message.rb delete mode 100644 lib/mongo/message/message_header.rb delete mode 100644 lib/mongo/message/msg_message.rb delete mode 100644 lib/mongo/message/opcodes.rb delete mode 100644 test/test_message.rb diff --git a/Rakefile b/Rakefile index 8a59e49..ebee54a 100644 --- a/Rakefile +++ b/Rakefile @@ -18,7 +18,7 @@ gem_command = "gem1.9" if $0.match(/1\.9$/) # use gem1.9 if we used rake1.9 desc "Test the MongoDB Ruby driver." Rake::TestTask.new(:test) do |t| t.test_files = FileList['test/test*.rb'] - t.verbose = true + t.verbose = true end desc "Generate documentation" diff --git a/lib/mongo.rb b/lib/mongo.rb index 44eb9e4..93db0c7 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -6,8 +6,8 @@ require 'mongo/types/regexp_of_holding' require 'mongo/util/conversions' require 'mongo/errors' +require 'mongo/constants' require 'mongo/connection' -require 'mongo/message' require 'mongo/db' require 'mongo/cursor' require 'mongo/collection' diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index 3d073dc..448c9f8 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -229,7 +229,7 @@ module Mongo BSON.serialize_cstr(message, "#{@db.name}.#{@name}") message.put_int(0) message.put_array(BSON.new.serialize(selector, false).to_a) - @db.send_message_with_operation(OP_DELETE, message) + @db.send_message_with_operation(Mongo::Constants::OP_DELETE, message) end # Remove all records. @@ -259,7 +259,7 @@ module Mongo message.put_int(options[:upsert] ? 1 : 0) # 1 if a repsert operation (upsert) message.put_array(BSON.new.serialize(spec, false).to_a) message.put_array(BSON.new.serialize(document, false).to_a) - @db.send_message_with_operation(OP_UPDATE, message) + @db.send_message_with_operation(Mongo::Constants::OP_UPDATE, message) if options[:safe] && error=@db.error raise OperationFailure, error @@ -454,7 +454,7 @@ EOS private - # Sends an OP_INSERT message to the database. + # Sends an Mongo::Constants::OP_INSERT message to the database. # Takes an array of +documents+, an optional +collection_name+, and a # +check_keys+ setting. def insert_documents(documents, collection_name=@name, check_keys=true) @@ -462,7 +462,7 @@ EOS message.put_int(0) BSON.serialize_cstr(message, "#{@db.name}.#{collection_name}") documents.each { |doc| message.put_array(BSON.new.serialize(doc, check_keys).to_a) } - @db.send_message_with_operation(OP_INSERT, message) + @db.send_message_with_operation(Mongo::Constants::OP_INSERT, message) documents.collect { |o| o[:_id] || o['_id'] } end diff --git a/lib/mongo/constants.rb b/lib/mongo/constants.rb new file mode 100644 index 0000000..aa47e1f --- /dev/null +++ b/lib/mongo/constants.rb @@ -0,0 +1,15 @@ +module Mongo + module Constants + OP_REPLY = 1 + OP_MSG = 1000 + OP_UPDATE = 2001 + OP_INSERT = 2002 + OP_QUERY = 2004 + OP_GET_MORE = 2005 + OP_DELETE = 2006 + OP_KILL_CURSORS = 2007 + + OP_QUERY_SLAVE_OK = 4 + OP_QUERY_NO_CURSOR_TIMEOUT = 16 + end +end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 14c354d..2121188 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -require 'mongo/message' require 'mongo/util/byte_buffer' require 'mongo/util/bson' @@ -173,17 +172,23 @@ module Mongo # Close the cursor. # - # Note: if a cursor is read until exhausted (read until OP_QUERY or - # OP_GETMORE returns zero for the cursor id), there is no need to + # Note: if a cursor is read until exhausted (read until Mongo::Constants::OP_QUERY or + # Mongo::Constants::OP_GETMORE returns zero for the cursor id), there is no need to # close it by calling this method. # # Collection#find takes an optional block argument which can be used to # ensure that your cursors get closed. See the documentation for # Collection#find for details. def close - @db.send_to_db(KillCursorsMessage.new(@cursor_id)) if @cursor_id + if @cursor_id + message = ByteBuffer.new + message.put_int(0) + message.put_int(1) + message.put_long(@cursor_id) + @db.send_message_with_operation(Mongo::Constants::OP_KILL_CURSORS, message) + end @cursor_id = 0 - @closed = true + @closed = true end # Returns true if this cursor is closed, false otherwise. @@ -204,7 +209,16 @@ module Mongo end def read_message_header - MessageHeader.new.read_header(@db) + message = ByteBuffer.new + message.put_array(@db.receive_full(16).unpack("C*")) + unless message.size == 16 #HEADER_SIZE + raise "Short read for DB response header: expected #{16} bytes, saw #{message.size}" + end + message.rewind + size = message.get_int + request_id = message.get_int + response_to = message.get_int + op = message.get_int end def read_response_header @@ -221,9 +235,6 @@ module Mongo else @n_received = @n_remaining end - if @query.number_to_return > 0 and @n_received >= @query.number_to_return - close() - end end def num_remaining @@ -262,9 +273,10 @@ module Mongo # Cursor id. message.put_long(@cursor_id) - @db.send_message_with_operation_without_synchronize(OP_GET_MORE, message) + @db.send_message_with_operation_without_synchronize(Mongo::Constants::OP_GET_MORE, message) read_all } + close_cursor_if_query_complete end def object_from_stream @@ -283,12 +295,13 @@ module Mongo if @query_run false else + message = construct_query_message(@query) @db._synchronize { - message = construct_query_message(@query) + @db.send_message_with_operation_without_synchronize(Mongo::Constants::OP_QUERY, message) @query_run = true - @db.send_message_with_operation_without_synchronize(OP_QUERY, message) read_all } + close_cursor_if_query_complete true end end @@ -337,6 +350,10 @@ module Mongo "DBResponse(flags=#@result_flags, cursor_id=#@cursor_id, start=#@starting_from)" end + def close_cursor_if_query_complete + close if @query.number_to_return > 0 && @n_received >= @query.number_to_return + end + def check_modifiable if @query_run || @closed raise InvalidOperation, "Cannot modify the query once it has been run or closed." diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index 09e1341..7b6d116 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -18,7 +18,6 @@ require 'socket' require 'digest/md5' require 'thread' require 'mongo/collection' -require 'mongo/message' require 'mongo/query' require 'mongo/util/ordered_hash.rb' require 'mongo/admin' @@ -358,11 +357,6 @@ module Mongo message end - # Send a MsgMessage to the database. - def send_message(msg) - send_to_db(MsgMessage.new(msg)) - end - # Returns a Cursor over the query results. # # Note that the query gets sent lazily; the cursor calls @@ -470,16 +464,16 @@ module Mongo end def send_message_with_operation_without_synchronize(operation, message) - connect_to_master if !connected? && @auto_reconnect - begin - message_with_headers = add_message_headers(operation, message) - @logger.debug(" MONGODB #{operation} #{message}") if @logger - @socket.print(message_with_headers.to_s) - @socket.flush - rescue => ex - close - raise ex - end + connect_to_master if !connected? && @auto_reconnect + begin + message_with_headers = add_message_headers(operation, message) + @logger.debug(" MONGODB #{operation} #{message}") if @logger + @socket.print(message_with_headers.to_s) + @socket.flush + rescue => ex + close + raise ex + end end def receive_message_with_operation(operation, message) diff --git a/lib/mongo/message.rb b/lib/mongo/message.rb deleted file mode 100644 index 41b1c7e..0000000 --- a/lib/mongo/message.rb +++ /dev/null @@ -1,20 +0,0 @@ -# -- -# Copyright (C) 2008-2009 10gen Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ++ - -%w(kill_cursors_message message_header - msg_message).each { |f| - require "mongo/message/#{f}" -} diff --git a/lib/mongo/message/kill_cursors_message.rb b/lib/mongo/message/kill_cursors_message.rb deleted file mode 100644 index c1f5597..0000000 --- a/lib/mongo/message/kill_cursors_message.rb +++ /dev/null @@ -1,31 +0,0 @@ -# -- -# Copyright (C) 2008-2009 10gen Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ++ - -require 'mongo/message/message' -require 'mongo/message/opcodes' - -module Mongo - - class KillCursorsMessage < Message - - def initialize(*cursors) - super(OP_KILL_CURSORS) - write_int(0) - write_int(cursors.length) - cursors.each { |c| write_long c } - end - end -end diff --git a/lib/mongo/message/message.rb b/lib/mongo/message/message.rb deleted file mode 100644 index 8a8cc9e..0000000 --- a/lib/mongo/message/message.rb +++ /dev/null @@ -1,80 +0,0 @@ -# -- -# Copyright (C) 2008-2009 10gen Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ++ - -require 'mongo/util/bson' -require 'mongo/util/byte_buffer' - -module Mongo - - class Message - - HEADER_SIZE = 16 # size, id, response_to, opcode - - @@class_req_id = 0 - - attr_reader :buf # for testing - - def initialize(op) - @op = op - @message_length = HEADER_SIZE - @data_length = 0 - @request_id = (@@class_req_id += 1) - @response_id = 0 - @buf = ByteBuffer.new - - @buf.put_int(16) # holder for length - @buf.put_int(@request_id) - @buf.put_int(0) # response_to - @buf.put_int(op) - end - - def write_int(i) - @buf.put_int(i) - update_message_length - end - - def write_long(i) - @buf.put_long(i) - update_message_length - end - - def write_string(s) - BSON.serialize_cstr(@buf, s) - update_message_length - end - - def write_doc(hash, check_keys=false) - @buf.put_array(BSON.new.serialize(hash, check_keys).to_a) - update_message_length - end - - def to_a - @buf.to_a - end - - def dump - @buf.dump - end - - # Do not call. Private, but kept public for testing. - def update_message_length - pos = @buf.position - @buf.put_int(@buf.size, 0) - @buf.position = pos - end - - end -end diff --git a/lib/mongo/message/message_header.rb b/lib/mongo/message/message_header.rb deleted file mode 100644 index 09c2691..0000000 --- a/lib/mongo/message/message_header.rb +++ /dev/null @@ -1,45 +0,0 @@ -# -- -# Copyright (C) 2008-2009 10gen Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ++ - -require 'mongo/util/byte_buffer' - -module Mongo - - class MessageHeader - - HEADER_SIZE = 16 - - def initialize() - @buf = ByteBuffer.new - end - - def read_header(db) - @buf.rewind - @buf.put_array(db.receive_full(HEADER_SIZE).unpack("C*")) - raise "Short read for DB response header: expected #{HEADER_SIZE} bytes, saw #{@buf.size}" unless @buf.size == HEADER_SIZE - @buf.rewind - @size = @buf.get_int - @request_id = @buf.get_int - @response_to = @buf.get_int - @op = @buf.get_int - self - end - - def dump - @buf.dump - end - end -end diff --git a/lib/mongo/message/msg_message.rb b/lib/mongo/message/msg_message.rb deleted file mode 100644 index 48370af..0000000 --- a/lib/mongo/message/msg_message.rb +++ /dev/null @@ -1,29 +0,0 @@ -# -- -# Copyright (C) 2008-2009 10gen Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ++ - -require 'mongo/message/message' -require 'mongo/message/opcodes' - -module Mongo - - class MsgMessage < Message - - def initialize(msg) - super(OP_MSG) - write_string(msg) - end - end -end diff --git a/lib/mongo/message/opcodes.rb b/lib/mongo/message/opcodes.rb deleted file mode 100644 index 4a19bc9..0000000 --- a/lib/mongo/message/opcodes.rb +++ /dev/null @@ -1,30 +0,0 @@ -# -- -# Copyright (C) 2008-2009 10gen Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ++ - -module Mongo - OP_REPLY = 1 # reply. responseTo is set. - OP_MSG = 1000 # generic msg command followed by a string - OP_UPDATE = 2001 # update object - OP_INSERT = 2002 - # GET_BY_OID = 2003 - OP_QUERY = 2004 - OP_GET_MORE = 2005 - OP_DELETE = 2006 - OP_KILL_CURSORS = 2007 - - OP_QUERY_SLAVE_OK = 4 - OP_QUERY_NO_CURSOR_TIMEOUT = 16 -end diff --git a/lib/mongo/query.rb b/lib/mongo/query.rb index af056c3..f686c2e 100644 --- a/lib/mongo/query.rb +++ b/lib/mongo/query.rb @@ -15,7 +15,6 @@ # ++ require 'mongo/collection' -require 'mongo/message' require 'mongo/types/code' module Mongo @@ -119,10 +118,10 @@ module Mongo end # Returns an integer indicating which query options have been selected. - # See http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-OPQUERY + # See http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY def query_opts - timeout = @timeout ? 0 : OP_QUERY_NO_CURSOR_TIMEOUT - slave_ok = @slave_ok ? OP_QUERY_SLAVE_OK : 0 + timeout = @timeout ? 0 : Mongo::Constants::OP_QUERY_NO_CURSOR_TIMEOUT + slave_ok = @slave_ok ? Mongo::Constants::OP_QUERY_SLAVE_OK : 0 slave_ok + timeout end diff --git a/mongo-ruby-driver.gemspec b/mongo-ruby-driver.gemspec index 95657cf..2471471 100644 --- a/mongo-ruby-driver.gemspec +++ b/mongo-ruby-driver.gemspec @@ -20,20 +20,13 @@ PACKAGE_FILES = ['README.rdoc', 'Rakefile', 'mongo-ruby-driver.gemspec', 'lib/mongo/admin.rb', 'lib/mongo/collection.rb', 'lib/mongo/connection.rb', + 'lib/mongo/constants.rb', 'lib/mongo/cursor.rb', 'lib/mongo/db.rb', 'lib/mongo/gridfs/chunk.rb', 'lib/mongo/gridfs/grid_store.rb', 'lib/mongo/gridfs.rb', 'lib/mongo/errors.rb', - 'lib/mongo/message/get_more_message.rb', - 'lib/mongo/message/kill_cursors_message.rb', - 'lib/mongo/message/message.rb', - 'lib/mongo/message/message_header.rb', - 'lib/mongo/message/msg_message.rb', - 'lib/mongo/message/opcodes.rb', - 'lib/mongo/message/query_message.rb', - 'lib/mongo/message.rb', 'lib/mongo/query.rb', 'lib/mongo/types/binary.rb', 'lib/mongo/types/code.rb', @@ -72,7 +65,6 @@ TEST_FILES = ['test/mongo-qa/_common.rb', 'test/test_db_api.rb', 'test/test_db_connection.rb', 'test/test_grid_store.rb', - 'test/test_message.rb', 'test/test_objectid.rb', 'test/test_ordered_hash.rb', 'test/test_threading.rb', diff --git a/test/test_message.rb b/test/test_message.rb deleted file mode 100644 index 4765658..0000000 --- a/test/test_message.rb +++ /dev/null @@ -1,35 +0,0 @@ -$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') -require 'mongo' -require 'test/unit' - -class MessageTest < Test::Unit::TestCase - - include Mongo - - def setup - @msg = Message.new(42) - end - - def test_initial_info - assert_equal Message::HEADER_SIZE, @msg.buf.length - @msg.write_long(1029) - @msg.buf.rewind - assert_equal Message::HEADER_SIZE + 8, @msg.buf.get_int - @msg.buf.get_int # skip message id - assert_equal 0, @msg.buf.get_int - assert_equal 42, @msg.buf.get_int - assert_equal 1029, @msg.buf.get_long - end - - def test_update_length - @msg.update_message_length - @msg.buf.rewind - assert_equal Message::HEADER_SIZE, @msg.buf.get_int - end - - def test_long_length - @msg.write_long(1027) - assert_equal Message::HEADER_SIZE + 8, @msg.buf.length - end - -end