diff --git a/bin/standard_benchmark b/bin/standard_benchmark index 410135a..a977da9 100755 --- a/bin/standard_benchmark +++ b/bin/standard_benchmark @@ -64,11 +64,11 @@ def benchmark(str, n, coll_name, data, create_index=false) coll.create_index('x') if create_index profile(str) do times = [] - GC.start - t1 = Time.now - n.times { |i| yield(coll, i) } - t2 = Time.now - times << t2 - t1 + GC.start + t1 = Time.now + n.times { |i| yield(coll, i) } + t2 = Time.now + times << t2 - t1 report(str, times.min) end end diff --git a/lib/mongo.rb b/lib/mongo.rb index 787de49..f2a65e0 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -31,5 +31,5 @@ module Mongo ASCENDING = 1 DESCENDING = -1 - VERSION = "0.17.2" + VERSION = "0.18" end diff --git a/lib/mongo/admin.rb b/lib/mongo/admin.rb index 5141efa..66e6f63 100644 --- a/lib/mongo/admin.rb +++ b/lib/mongo/admin.rb @@ -30,7 +30,7 @@ module Mongo def profiling_level oh = OrderedHash.new oh[:profile] = -1 - doc = @db.db_command(oh) + doc = @db.command(oh) raise "Error with profile command: #{doc.inspect}" unless @db.ok?(doc) && doc['was'].kind_of?(Numeric) case doc['was'].to_i when 0 @@ -57,7 +57,7 @@ module Mongo else raise "Error: illegal profiling level value #{level}" end - doc = @db.db_command(oh) + doc = @db.command(oh) raise "Error with profile command: #{doc.inspect}" unless @db.ok?(doc) end @@ -71,7 +71,7 @@ module Mongo # problem or returning an interesting hash (see especially the # 'result' string value) if all is well. def validate_collection(name) - doc = @db.db_command(:validate => name) + doc = @db.command(:validate => name) raise "Error with validate command: #{doc.inspect}" unless @db.ok?(doc) result = doc['result'] raise "Error with validation data: #{doc.inspect}" unless result.kind_of?(String) diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index ce6aaa6..704d629 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -41,6 +41,7 @@ module Mongo end @db, @name = db, name + @connection = @db.connection @pk_factory = pk_factory || ObjectID @hint = nil end @@ -222,7 +223,7 @@ module Mongo BSON.serialize_cstr(message, "#{@db.name}.#{@name}") message.put_int(0) message.put_array(BSON_SERIALIZER.serialize(selector, false).unpack("C*")) - @db.send_message_with_operation(Mongo::Constants::OP_DELETE, message, + @connection.send_message_with_operation(Mongo::Constants::OP_DELETE, message, "db.#{@db.name}.remove(#{selector.inspect})") end @@ -261,10 +262,10 @@ module Mongo message.put_array(BSON_SERIALIZER.serialize(selector, false).unpack("C*")) message.put_array(BSON_SERIALIZER.serialize(document, false).unpack("C*")) if options[:safe] - @db.send_message_with_safe_check(Mongo::Constants::OP_UPDATE, message, + @connection.send_message_with_safe_check(Mongo::Constants::OP_UPDATE, message, @db.name, "db.#{@name}.update(#{selector.inspect}, #{document.inspect})") else - @db.send_message_with_operation(Mongo::Constants::OP_UPDATE, message, + @connection.send_message_with_operation(Mongo::Constants::OP_UPDATE, message, "db.#{@name}.update(#{selector.inspect}, #{document.inspect})") end end @@ -333,7 +334,7 @@ module Mongo reduce = Code.new(reduce) end - result = @db.db_command({"group" => + result = @db.command({"group" => { "ns" => @name, "$reduce" => reduce, @@ -406,7 +407,7 @@ EOS command[:distinct] = @name command[:key] = key.to_s - @db.db_command(command)["values"] + @db.command(command)["values"] end # Rename this collection. @@ -488,10 +489,10 @@ EOS BSON.serialize_cstr(message, "#{@db.name}.#{collection_name}") documents.each { |doc| message.put_array(BSON_SERIALIZER.serialize(doc, check_keys).unpack("C*")) } if safe - @db.send_message_with_safe_check(Mongo::Constants::OP_INSERT, message, + @connection.send_message_with_safe_check(Mongo::Constants::OP_INSERT, message, @db.name, "db.#{collection_name}.insert(#{documents.inspect})") else - @db.send_message_with_operation(Mongo::Constants::OP_INSERT, message, + @connection.send_message_with_operation(Mongo::Constants::OP_INSERT, message, "db.#{collection_name}.insert(#{documents.inspect})") end documents.collect { |o| o[:_id] || o['_id'] } diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index e28650b..d2ba204 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -14,12 +14,27 @@ # limitations under the License. # ++ +require 'set' +require 'socket' +require 'monitor' + module Mongo # A connection to MongoDB. class Connection DEFAULT_PORT = 27017 + STANDARD_HEADER_SIZE = 16 + RESPONSE_HEADER_SIZE = 20 + + attr_reader :logger, :size, :host, :port, :nodes, :sockets, :checked_out, :reserved_connections + + + def slave_ok?; @slave_ok; end + def auto_reconnect?; @auto_reconnect; end + + # Counter for generating unique request ids. + @@current_request_id = 0 # Create a Mongo database server instance. You specify either one or a # pair of servers. If one, you also say if connecting to a slave is @@ -69,41 +84,50 @@ module Mongo # When a DB object first connects to a pair, it will find the master # instance and connect to that one. def initialize(pair_or_host=nil, port=nil, options={}) - @pair = case pair_or_host - when String - [[pair_or_host, port ? port.to_i : DEFAULT_PORT]] - when Hash - connections = [] - connections << pair_val_to_connection(pair_or_host[:left]) - connections << pair_val_to_connection(pair_or_host[:right]) - connections - when nil - [['localhost', DEFAULT_PORT]] - end + @nodes = format_pair(pair_or_host) - @options = options + # Host and port of current master. + @host = @port = nil + + # Lock for request ids. + @id_lock = Mutex.new + + # Lock for checking master. + @master_lock = Mutex.new + + # Pool size and timeout. + @size = options[:pool_size] || 1 + @timeout = options[:timeout] || 1.0 + + # Cache of reserved sockets mapped to threads + @reserved_connections = {} + + # Mutex for synchronizing pool access + @connection_mutex = Monitor.new + + # Condition variable for signal and wait + @queue = @connection_mutex.new_cond + + @sockets = [] + @checked_out = [] + + # Slave ok can be true only if one node is specified + @auto_reconnect = options[:auto_reconnect] + @slave_ok = options[:slave_ok] && @nodes.length == 1 + @logger = options[:logger] || nil + @options = options + + should_connect = options[:connect].nil? ? true : options[:connect] + connect_to_master if should_connect end - # Return the Mongo::DB named +db_name+. The slave_ok and - # auto_reconnect options passed in via #new may be overridden here. - # See DB#new for other options you can pass in. - def db(db_name, options={}) - DB.new(db_name, @pair, @options.merge(options)) - end - - def logger - @options[:logger] - end - - # Returns a hash containing database names as keys and disk space for - # each as values. + # Returns a hash with all database names and their respective sizes on + # disk. def database_info - doc = single_db_command('admin', :listDatabases => 1) - h = {} - doc['databases'].each { |db| - h[db['name']] = db['sizeOnDisk'].to_i - } - h + doc = self['admin'].command(:listDatabases => 1) + returning({}) do |info| + doc['databases'].each { |db| info[db['name']] = db['sizeOnDisk'].to_i } + end end # Returns an array of database names. @@ -111,9 +135,21 @@ module Mongo database_info.keys end + # Return the database named +db_name+. The slave_ok and + # auto_reconnect options passed in via #new may be overridden here. + # See DB#new for other options you can pass in. + def db(db_name, options={}) + DB.new(db_name, self, options.merge(:logger => @logger)) + end + + # Return the database named +db_name+. + def [](db_name) + DB.new(db_name, self, :logger => @logger) + end + # Drops the database +name+. def drop_database(name) - single_db_command(name, :dropDatabase => 1) + self[name].command(:dropDatabase => 1) end # Copies the database +from+ on the local server to +to+ on the specified +host+. @@ -124,7 +160,35 @@ module Mongo oh[:fromhost] = host oh[:fromdb] = from oh[:todb] = to - single_db_command('admin', oh) + self["admin"].command(oh) + end + + # Increments and returns the next available request id. + def get_request_id + request_id = '' + @id_lock.synchronize do + request_id = @@current_request_id += 1 + end + request_id + end + + # Prepares a message for transmission to MongoDB by + # constructing a valid message header. + def add_message_headers(operation, message) + headers = ByteBuffer.new + + # Message size. + headers.put_int(16 + message.size) + + # Unique request id. + headers.put_int(get_request_id) + + # Response id. + headers.put_int(0) + + # Opcode. + headers.put_int(operation) + message.prepend!(headers) end # Return the build information for the current connection. @@ -132,14 +196,375 @@ module Mongo db("admin").command({:buildinfo => 1}, {:admin => true, :check_response => true}) end - # Returns the build version of the current server, using - # a ServerVersion object for comparability. + # Get the build version of the current server. + # Returns a ServerVersion object for comparability. def server_version ServerVersion.new(server_info["version"]) end - protected + ## Connections and pooling ## + + # Sends a message to MongoDB. + # + # Takes a MongoDB opcode, +operation+, a message of class ByteBuffer, + # +message+, and an optional formatted +log_message+. + def send_message(operation, message, log_message=nil) + @logger.debug(" MONGODB #{log_message || message}") if @logger + + packed_message = pack_message(operation, message) + socket = checkout + send_message_on_socket(packed_message, socket) + end + + # Sends a message to MongoDB and returns the response. + # + # Takes a MongoDB opcode, +operation+, a message of class ByteBuffer, + # +message+, an optional formatted +log_message+, and an optional + # socket. + def receive_message(operation, message, log_msg=nil, sock=nil) + @logger.debug(" MONGODB #{log_message || message}") if @logger + packed_message = pack_message(operation, message) + + # This code is used only if we're checking for master. + if sock + @master_lock.synchronize do + response = send_and_receive(packed_message, sock) + end + else + socket = checkout + response = send_and_receive(packed_message, socket) + end + response + end + + # Sends a message to MongoDB. + # + # Takes a MongoDB opcode, +operation+, a message of class ByteBuffer, + # +message+, and an optional formatted +log_message+. + # Sends the message to the databse, adding the necessary headers. + def send_message_with_operation(operation, message, log_message=nil) + @logger.debug(" MONGODB #{log_message || message}") if @logger + packed_message = pack_message(operation, message) + socket = checkout + response = send_message_on_socket(packed_message, socket) + checkin(socket) + response + end + + # Sends a message to the database, waits for a response, and raises + # and exception if the operation has failed. + def send_message_with_safe_check(operation, message, db_name, log_message=nil) + message_with_headers = add_message_headers(operation, message) + message_with_check = last_error_message(db_name) + @logger.debug(" MONGODB #{log_message || message}") if @logger + sock = checkout + msg = message_with_headers.append!(message_with_check).to_s + send_message_on_socket(msg, sock) + docs, num_received, cursor_id = receive(sock) + if num_received == 1 && error = docs[0]['err'] + raise Mongo::OperationFailure, error + end + checkin(sock) + [docs, num_received, cursor_id] + end + + # Send a message to the database and waits for the response. + def receive_message_with_operation(operation, message, log_message=nil, socket=nil) + message_with_headers = add_message_headers(operation, message).to_s + @logger.debug(" MONGODB #{log_message || message}") if @logger + sock = socket || checkout + + send_message_on_socket(message_with_headers, sock) + receive(sock) + end + + # Creates a new socket and tries to connect to master. + # If successful, sets @host and @port to master and returns the socket. + def connect_to_master + @host = @port = nil + for node_pair in @nodes + host, port = *node_pair + begin + socket = TCPSocket.new(host, port) + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + + result = self['admin'].command({:ismaster => 1}, false, false, socket) + if result['ok'] == 1 && ((is_master = result['ismaster'] == 1) || @slave_ok) + @host, @port = host, port + end + + # Note: slave_ok can be true only when connecting to a single node. + if !@slave_ok && !is_master + raise ConfigurationError, "Trying to connect directly to slave; " + + "if this is what you want, specify :slave_ok => true." + end + + break if is_master || @slave_ok + rescue SocketError, SystemCallError, IOError => ex + socket.close if socket + false + end + end + raise ConnectionError, "failed to connect to any given host:port" unless socket + end + + def master? + doc = self['admin'].command(:ismaster => 1) + doc['ok'] == 1 && doc['ismaster'] == 1 + end + + # Returns a string of the form "host:port" that points to the master + # database. Works even if this is the master database. + def master + doc = self['admin'].command(:ismaster => 1) + if doc['ok'] == 1 && doc['ismaster'] == 1 + "#@host:#@port" + elsif doc['remote'] + doc['remote'] + else + raise "Error retrieving master database: #{doc.inspect}" + end + end + + def connected? + @sockets.detect do |sock| + sock.is_a? Socket + end || (@host && @port) + end + + # Close the connection to the database. + def close + @sockets.each do |sock| + sock.close + end + @host = @port = nil + @sockets.clear + @checked_out.clear + @reserved_connections.clear + end + + # Get a socket from the pool, mapped to the current thread. + def checkout + if sock = @reserved_connections[Thread.current.object_id] + sock + else + sock = obtain_socket + @reserved_connections[Thread.current.object_id] = sock + end + sock + end + + # Return a socket to the pool. + def checkin(socket) + @connection_mutex.synchronize do + @checked_out.delete(socket) + @reserved_connections.delete Thread.current.object_id + @queue.signal + end + end + + # Releases connection for any dead threads. + # Called when the connection pool grows too large + # and we need additional sockets. + def clear_stale_cached_connections! + keys = Set.new(@reserved_connections.keys) + + Thread.list.each do |thread| + keys.delete(thread.object_id) if thread.alive? + end + + keys.each do |key| + next unless @reserved_connections.has_key?(key) + checkin(@reserved_connections[key]) + @reserved_connections.delete(key) + end + end + + # Adds a new socket to the pool and checks it out. + # + # This method is called exclusively from #obtain_socket; + # therefore, it runs within a mutex, as it must. + def checkout_new_socket + socket = TCPSocket.new(@host, @port) + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + @sockets << socket + @checked_out << socket + socket + end + + # Checks out the first available socket from the pool. + # + # This method is called exclusively from #obtain_socket; + # therefore, it runs within a mutex, as it must. + def checkout_existing_socket + socket = (@sockets - @checked_out).first + @checked_out << socket + socket + end + + # Check out an existing socket or create a new socket if the maximum + # pool size has not been exceeded. Otherwise, wait for the next + # available socket. + def obtain_socket + @connection_mutex.synchronize do + + # NOTE: Not certain that this is the best place for reconnect + connect_to_master if !connected? && @auto_reconnect + loop do + socket = if @checked_out.size < @sockets.size + checkout_existing_socket + elsif @sockets.size < @size + checkout_new_socket + end + + return socket if socket + # No connections available; wait. + if @queue.wait(@timeout) + next + else + # Try to clear out any stale threads to free up some connections + clear_stale_cached_connections! + if @size == @sockets.size + raise ConnectionTimeoutError, "could not obtain connection within " + + "#{@timeout} seconds. The max pool size is currently #{@size}; " + + "consider increasing it." + end + end # if + end # loop + end #sync + end + + def receive(sock) + receive_header(sock) + number_received, cursor_id = receive_response_header(sock) + read_documents(number_received, cursor_id, sock) + end + + def receive_header(sock) + header = ByteBuffer.new + header.put_array(receive_message_on_socket(16, sock).unpack("C*")) + unless header.size == STANDARD_HEADER_SIZE + raise "Short read for DB response header: " + + "expected #{STANDARD_HEADER_SIZE} bytes, saw #{header.size}" + end + header.rewind + size = header.get_int + request_id = header.get_int + response_to = header.get_int + op = header.get_int + end + + def receive_response_header(sock) + header_buf = ByteBuffer.new + header_buf.put_array(receive_message_on_socket(RESPONSE_HEADER_SIZE, sock).unpack("C*")) + if header_buf.length != RESPONSE_HEADER_SIZE + raise "Short read for DB response header; " + + "expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}" + end + header_buf.rewind + result_flags = header_buf.get_int + cursor_id = header_buf.get_long + starting_from = header_buf.get_int + number_remaining = header_buf.get_int + [number_remaining, cursor_id] + end + + def read_documents(number_received, cursor_id, sock) + docs = [] + number_remaining = number_received + while number_remaining > 0 do + buf = ByteBuffer.new + buf.put_array(receive_message_on_socket(4, sock).unpack("C*")) + buf.rewind + size = buf.get_int + buf.put_array(receive_message_on_socket(size - 4, sock).unpack("C*"), 4) + number_remaining -= 1 + buf.rewind + docs << BSON.new.deserialize(buf) + end + [docs, number_received, cursor_id] + end + + def last_error_message(db_name) + message = ByteBuffer.new + message.put_int(0) + BSON.serialize_cstr(message, "#{db_name}.$cmd") + message.put_int(0) + message.put_int(-1) + message.put_array(BSON_SERIALIZER.serialize({:getlasterror => 1}, false).unpack("C*")) + add_message_headers(Mongo::Constants::OP_QUERY, message) + end + + # Prepares a message for transmission to MongoDB by + # constructing a message header with a new request id. + def pack_message(operation, message) + headers = ByteBuffer.new + + # Message size. + headers.put_int(16 + message.size) + + # Unique request id. + headers.put_int(get_request_id) + + # Response id. + headers.put_int(0) + + # Opcode. + headers.put_int(operation) + message.prepend!(headers) + message.to_s + end + + #def send_and_receive + # send_message_on_socket(packed_message, socket) + # receive_message_on_socket() + #end + + # Low-level method for sending a message on a socket. + # Requires a packed message and an available socket, + def send_message_on_socket(packed_message, socket) + #socket will be connected to master when we receive it + #begin + socket.send(packed_message, 0) + #rescue => ex + # close + # need to find a way to release the socket here + # checkin(socket) + # raise ex + #end + end + + # Low-level method for receiving data from socket. + # Requires length and an available socket. + def receive_message_on_socket(length, socket) + message = "" + while message.length < length do + chunk = socket.recv(length - message.length) + raise "connection closed" unless chunk.length > 0 + message += chunk + end + message + end + + + ## Private helper methods + + # Returns an array of host-port pairs. + def format_pair(pair_or_host) + case pair_or_host + when String + [[pair_or_host, port ? port.to_i : DEFAULT_PORT]] + when Hash + connections = [] + connections << pair_val_to_connection(pair_or_host[:left]) + connections << pair_val_to_connection(pair_or_host[:right]) + connections + when nil + [['localhost', DEFAULT_PORT]] + end + end + # Turns an array containing a host name string and a # port number integer into a [host, port] pair array. def pair_val_to_connection(a) @@ -154,20 +579,5 @@ module Mongo a end end - - # Send cmd (a hash, possibly ordered) to the admin database and return - # the answer. Raises an error unless the return is "ok" (DB#ok? - # returns +true+). - def single_db_command(db_name, cmd) - db = nil - begin - db = db(db_name) - doc = db.db_command(cmd) - raise "error retrieving database info: #{doc.inspect}" unless db.ok?(doc) - doc - ensure - db.close if db - end - end end end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index ce4adc7..4bba2cb 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -32,6 +32,7 @@ module Mongo def initialize(collection, options={}) @db = collection.db @collection = collection + @connection = @db.connection @selector = convert_selector_for_query(options[:selector]) @fields = convert_fields_for_query(options[:fields]) @@ -43,8 +44,9 @@ module Mongo @snapshot = options[:snapshot] @timeout = options[:timeout] || false @explain = options[:explain] + @socket = options[:socket] - @full_collection_name = "#{@collection.db.name}.#{@collection.name}" + @full_collection_name = "#{@collection.db.name}.#{@collection.name}" @cache = [] @closed = false @query_run = false @@ -83,7 +85,7 @@ module Mongo command = OrderedHash["count", @collection.name, "query", @selector, "fields", @fields] - response = @db.db_command(command) + response = @db.command(command) return response['n'].to_i if response['ok'] == 1 return 0 if response['errmsg'] == "ns missing" raise OperationFailure, "Count failed: #{response['errmsg']}" @@ -199,7 +201,7 @@ module Mongo message.put_int(0) message.put_int(1) message.put_long(@cursor_id) - @db.send_message_with_operation(Mongo::Constants::OP_KILL_CURSORS, message, "cursor.close()") + @connection.send_message_with_operation(Mongo::Constants::OP_KILL_CURSORS, message, "cursor.close()") end @cursor_id = 0 @closed = true @@ -212,7 +214,7 @@ module Mongo # See http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY def query_opts timeout = @timeout ? 0 : Mongo::Constants::OP_QUERY_NO_CURSOR_TIMEOUT - slave_ok = @db.slave_ok? ? Mongo::Constants::OP_QUERY_SLAVE_OK : 0 + slave_ok = @connection.slave_ok? ? Mongo::Constants::OP_QUERY_SLAVE_OK : 0 slave_ok + timeout end @@ -294,7 +296,7 @@ module Mongo # Cursor id. message.put_long(@cursor_id) - results, @n_received, @cursor_id = @db.receive_message_with_operation(Mongo::Constants::OP_GET_MORE, message, "cursor.get_more()") + results, @n_received, @cursor_id = @connection.receive_message_with_operation(Mongo::Constants::OP_GET_MORE, message, "cursor.get_more()", @socket) @cache += results close_cursor_if_query_complete end @@ -305,8 +307,8 @@ module Mongo false else message = construct_query_message - results, @n_received, @cursor_id = @db.receive_message_with_operation(Mongo::Constants::OP_QUERY, message, - (query_log_message if @db.logger)) + results, @n_received, @cursor_id = @connection.receive_message_with_operation(Mongo::Constants::OP_QUERY, message, + (query_log_message if @connection.logger), @socket) @cache += results @query_run = true close_cursor_if_query_complete diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index 4033903..c39630c 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -27,8 +27,6 @@ module Mongo # A Mongo database. class DB - STANDARD_HEADER_SIZE = 16 - RESPONSE_HEADER_SIZE = 20 SYSTEM_NAMESPACE_COLLECTION = "system.namespaces" SYSTEM_INDEX_COLLECTION = "system.indexes" SYSTEM_PROFILE_COLLECTION = "system.profile" @@ -39,11 +37,10 @@ module Mongo @@current_request_id = 0 # Strict mode enforces collection existence checks. When +true+, - # asking for a collection that does not exist or trying to create a - # collection that already exists raises an error. + # asking for a collection that does not exist, or trying to create a + # collection that already exists, raises an error. # - # Strict mode is off (+false+) by default. Its value can be changed at - # any time. + # Strict mode is disabled by default, but enabled (+true+) at any time. attr_writer :strict # Returns the value of the +strict+ flag. @@ -52,26 +49,16 @@ module Mongo # The name of the database. attr_reader :name + # The Mongo::Connection instance connecting to the MongoDB server. attr_reader :connection - - # Host to which we are currently connected. - attr_reader :host - # Port to which we are currently connected. - attr_reader :port - + # An array of [host, port] pairs. attr_reader :nodes - # The database's socket. For internal (and Cursor) use only. - attr_reader :socket - - # The logger instance if :logger is passed to initialize + # The logger instance if :logger is passed to initialize. attr_reader :logger - def slave_ok?; @slave_ok; end - - # A primary key factory object (or +nil+). See the README.doc file or - # DB#new for details. + # The primary key factory object (or +nil+). attr_reader :pk_factory def pk_factory=(pk_factory) @@ -91,8 +78,7 @@ module Mongo # Options: # # :strict :: If true, collections must exist to be accessed and must - # not exist to be created. See #collection and - # #create_collection. + # not exist to be created. See #collection and #create_collection. # # :pk :: A primary key factory object that must respond to :create_pk, # which should take a hash and return a hash which merges the @@ -119,72 +105,18 @@ module Mongo # When a DB object first connects to a pair, it will find the master # instance and connect to that one. On socket error or if we recieve a # "not master" error, we again find the master of the pair. - def initialize(db_name, nodes, options={}) - case db_name - when Symbol, String - else - raise TypeError, "db_name must be a string or symbol" - end - - [" ", ".", "$", "/", "\\"].each do |invalid_char| - if db_name.include? invalid_char - raise InvalidName, "database names cannot contain the character '#{invalid_char}'" - end - end - if db_name.empty? - raise InvalidName, "database name cannot be the empty string" - end - - @connection = options[:connection] - - @name, @nodes = db_name, nodes - @strict = options[:strict] + def initialize(db_name, connection, options={}) + @name = validate_db_name(db_name) + @connection = connection + @strict = options[:strict] @pk_factory = options[:pk] - @slave_ok = options[:slave_ok] && @nodes.length == 1 # only OK if one node - if options[:auto_reconnect] - warn(":auto_reconnect is deprecated. henceforth, any time an operation fails, " + - "the driver will attempt to reconnect master on subsequent operations.") - end - @semaphore = Mutex.new - @socket = nil - @logger = options[:logger] - @network_timeout = 20 - connect_to_master - end - - def connect_to_master - close if @socket - @host = @port = nil - @nodes.detect { |hp| - @host, @port = *hp - begin - @socket = TCPSocket.new(@host, @port) - @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - - # Check for master. Can't call master? because it uses mutex, - # which may already be in use during this call. - semaphore_is_locked = @semaphore.locked? - @semaphore.unlock if semaphore_is_locked - is_master = master? - @semaphore.lock if semaphore_is_locked - - if @nodes.length == 1 && !is_master && !@slave_ok - raise ConfigurationError, "Trying to connect directly to slave; if this is what you want, specify :slave_ok => true." - end - is_master || @slave_ok - rescue SocketError, SystemCallError, IOError => ex - close if @socket - false - end - } - raise ConnectionFailure, "error: failed to connect to any given host:port" unless @socket end # Returns true if +username+ has +password+ in # +SYSTEM_USER_COLLECTION+. +name+ is username, +password+ is # plaintext password. def authenticate(username, password) - doc = db_command(:getnonce => 1) + doc = command(:getnonce => 1) raise "error retrieving nonce: #{doc}" unless ok?(doc) nonce = doc['nonce'] @@ -193,12 +125,12 @@ module Mongo auth['user'] = username auth['nonce'] = nonce auth['key'] = Digest::MD5.hexdigest("#{nonce}#{username}#{hash_password(username, password)}") - ok?(db_command(auth)) + ok?(command(auth)) end # Deauthorizes use for this database for this connection. def logout - doc = db_command(:logout => 1) + doc = command(:logout => 1) raise "error logging out: #{doc.inspect}" unless ok?(doc) end @@ -252,7 +184,7 @@ module Mongo # Create new collection oh = OrderedHash.new oh[:create] = name - doc = db_command(oh.merge(options || {})) + doc = command(oh.merge(options || {})) ok = doc['ok'] return Collection.new(self, name, @pk_factory) if ok.kind_of?(Numeric) && (ok.to_i == 1 || ok.to_i == 0) raise "Error creating collection: #{doc.inspect}" @@ -276,20 +208,20 @@ module Mongo def drop_collection(name) return true unless collection_names.include?(name) - ok?(db_command(:drop => name)) + ok?(command(:drop => name)) end # Returns the error message from the most recently executed database # operation for this connection, or +nil+ if there was no error. def error - doc = db_command(:getlasterror => 1) + doc = command(:getlasterror => 1) raise "error retrieving last error: #{doc}" unless ok?(doc) doc['err'] end # Get status information from the last operation on this connection. def last_status - db_command(:getlasterror => 1) + command(:getlasterror => 1) end # Returns +true+ if an error was caused by the most recently executed @@ -303,7 +235,7 @@ module Mongo # Only returns errors that have occured since the last call to # DB#reset_error_history - returns +nil+ if there is no such error. def previous_error - error = db_command(:getpreverror => 1) + error = command(:getpreverror => 1) if error["err"] error else @@ -316,42 +248,7 @@ module Mongo # Calls to DB#previous_error will only return errors that have occurred # since the most recent call to this method. def reset_error_history - db_command(:reseterror => 1) - end - - # Returns true if this database is a master (or is not paired with any - # other database), false if it is a slave. - def master? - doc = db_command(:ismaster => 1) - is_master = doc['ismaster'] - ok?(doc) && is_master.kind_of?(Numeric) && is_master.to_i == 1 - end - - # Returns a string of the form "host:port" that points to the master - # database. Works even if this is the master database. - def master - doc = db_command(:ismaster => 1) - is_master = doc['ismaster'] - raise "Error retrieving master database: #{doc.inspect}" unless ok?(doc) && is_master.kind_of?(Numeric) - case is_master.to_i - when 1 - "#@host:#@port" - else - doc['remote'] - end - end - - # Close the connection to the database. - def close - if @socket - s = @socket - @socket = nil - s.close - end - end - - def connected? - @socket != nil + command(:reseterror => 1) end # Returns a Cursor over the query results. @@ -380,7 +277,7 @@ module Mongo oh = OrderedHash.new oh[:$eval] = code oh[:args] = args - doc = db_command(oh) + doc = command(oh) return doc['retval'] if ok?(doc) raise OperationFailure, "eval failed: #{doc['errmsg']}" end @@ -391,7 +288,7 @@ module Mongo oh = OrderedHash.new oh[:renameCollection] = "#{@name}.#{from}" oh[:to] = "#{@name}.#{to}" - doc = db_command(oh, true) + doc = command(oh, true) raise "Error renaming collection: #{doc.inspect}" unless ok?(doc) end @@ -401,7 +298,7 @@ module Mongo oh = OrderedHash.new oh[:deleteIndexes] = collection_name oh[:index] = name - doc = db_command(oh) + doc = command(oh) raise "Error with drop_index command: #{doc.inspect}" unless ok?(doc) end @@ -428,59 +325,7 @@ module Mongo def create_index(collection_name, field_or_spec, unique=false) self.collection(collection_name).create_index(field_or_spec, unique) end - - # Sends a message to MongoDB. - # - # Takes a MongoDB opcode, +operation+, a message of class ByteBuffer, - # +message+, and an optional formatted +log_message+. - # Sends the message to the databse, adding the necessary headers. - def send_message_with_operation(operation, message, log_message=nil) - message_with_headers = add_message_headers(operation, message).to_s - @logger.debug(" MONGODB #{log_message || message}") if @logger - @semaphore.synchronize do - send_message_on_socket(message_with_headers) - end - end - - def send_message_with_operation_raw(operation, message, log_message=nil) - message_with_headers = add_message_headers_raw(operation, message) - @logger.debug(" MONGODB #{log_message || message}") if @logger - @semaphore.synchronize do - send_message_on_socket(message_with_headers) - end - end - - # Sends a message to the database, waits for a response, and raises - # and exception if the operation has failed. - def send_message_with_safe_check(operation, message, log_message=nil) - message_with_headers = add_message_headers(operation, message) - message_with_check = last_error_message - @logger.debug(" MONGODB #{log_message || message}") if @logger - @semaphore.synchronize do - send_message_on_socket(message_with_headers.append!(message_with_check).to_s) - docs, num_received, cursor_id = receive - if num_received == 1 && error = docs[0]['err'] - if docs[0]['err'] == 'not master' - raise ConnectionFailure - else - raise Mongo::OperationFailure, error - end - else - true - end - end - end - - # Send a message to the database and waits for the response. - def receive_message_with_operation(operation, message, log_message=nil) - message_with_headers = add_message_headers(operation, message).to_s - @logger.debug(" MONGODB #{log_message || message}") if @logger - @semaphore.synchronize do - send_message_on_socket(message_with_headers) - receive - end - end - + # Return +true+ if +doc+ contains an 'ok' field with the value 1. def ok?(doc) ok = doc['ok'] @@ -490,14 +335,14 @@ module Mongo # DB commands need to be ordered, so selector must be an OrderedHash # (or a Hash with only one element). What DB commands really need is # that the "command" key be first. - def db_command(selector, use_admin_db=false) + def command(selector, use_admin_db=false, sock=nil) if !selector.kind_of?(OrderedHash) if !selector.kind_of?(Hash) || selector.keys.length > 1 - raise "db_command must be given an OrderedHash when there is more than one key" + raise "command must be given an OrderedHash when there is more than one key" end end - cursor = Cursor.new(Collection.new(self, SYSTEM_COMMAND_COLLECTION), :admin => use_admin_db, :limit => -1, :selector => selector) + cursor = Cursor.new(Collection.new(self, SYSTEM_COMMAND_COLLECTION), :admin => use_admin_db, :limit => -1, :selector => selector, :socket => sock) cursor.next_object end @@ -514,14 +359,14 @@ module Mongo # # Note: DB commands must start with the "command" key. For this reason, # any selector containing more than one key must be an OrderedHash. - def command(selector, admin=false, check_response=false) + def command(selector, admin=false, check_response=false, sock=nil) raise MongoArgumentError, "command must be given a selector" unless selector.is_a?(Hash) && !selector.empty? if selector.class.eql?(Hash) && selector.keys.length > 1 raise MongoArgumentError, "DB#command requires an OrderedHash when hash contains multiple keys" end result = Cursor.new(system_command_collection, :admin => admin, - :limit => -1, :selector => selector).next_object + :limit => -1, :selector => selector, :socket => sock).next_object if check_response && !ok?(result) raise OperationFailure, "Database command '#{selector.keys.first}' failed." @@ -530,147 +375,18 @@ module Mongo end end + # DEPRECATED: please use DB#command instead. + def db_command(*args) + warn "DB#db_command has been DEPRECATED. Please use DB#command instead." + command(args[0], args[1]) + end + def full_collection_name(collection_name) "#{@name}.#{collection_name}" end private - def receive - receive_header - number_received, cursor_id = receive_response_header - read_documents(number_received, cursor_id) - end - - def receive_header - header = ByteBuffer.new - header.put_array(receive_data_on_socket(16).unpack("C*")) - unless header.size == STANDARD_HEADER_SIZE - raise "Short read for DB response header: " + - "expected #{STANDARD_HEADER_SIZE} bytes, saw #{header.size}" - end - header.rewind - size = header.get_int - request_id = header.get_int - response_to = header.get_int - op = header.get_int - end - - def receive_response_header - header_buf = ByteBuffer.new - header_buf.put_array(receive_data_on_socket(RESPONSE_HEADER_SIZE).unpack("C*")) - if header_buf.length != RESPONSE_HEADER_SIZE - raise "Short read for DB response header; " + - "expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}" - end - header_buf.rewind - result_flags = header_buf.get_int - cursor_id = header_buf.get_long - starting_from = header_buf.get_int - number_remaining = header_buf.get_int - [number_remaining, cursor_id] - end - - def read_documents(number_received, cursor_id) - docs = [] - number_remaining = number_received - while number_remaining > 0 do - buf = ByteBuffer.new - buf.put_array(receive_data_on_socket(4).unpack("C*")) - buf.rewind - size = buf.get_int - buf.put_array(receive_data_on_socket(size - 4).unpack("C*"), 4) - number_remaining -= 1 - buf.rewind - docs << BSON.new.deserialize(buf) - end - [docs, number_received, cursor_id] - end - - # Sending a message on socket. - def send_message_on_socket(packed_message) - connect_to_master if !connected? - begin - @socket.print(packed_message) - @socket.flush - rescue => ex - close - raise ConnectionFailure, "Operation failed with the following exception: #{ex}." - end - end - - # Receive data of specified length on socket. - def receive_data_on_socket(length) - connect_to_master if !connected? - message = "" - chunk = "" - while message.length < length do - begin - chunk = @socket.read(length - message.length) - raise ConnectionFailure, "connection closed" unless chunk && chunk.length > 0 - message += chunk - rescue => ex - raise ConnectionFailure, "Operation failed with the following exception: #{ex}" - end - end - message - end - - # Prepares a message for transmission to MongoDB by - # constructing a valid message header. - def add_message_headers(operation, message) - headers = ByteBuffer.new - - # Message size. - headers.put_int(16 + message.size) - - # Unique request id. - headers.put_int(get_request_id) - - # Response id. - headers.put_int(0) - - # Opcode. - headers.put_int(operation) - message.prepend!(headers) - end - - # Increments and then returns the next available request id. - # Note: this method should be called from within a lock. - def get_request_id - @@current_request_id += 1 - @@current_request_id - end - - # Creates a getlasterror message. - def last_error_message - generate_last_error_message - end - - def generate_last_error_message - message = ByteBuffer.new - message.put_int(0) - BSON.serialize_cstr(message, "#{@name}.$cmd") - message.put_int(0) - message.put_int(-1) - message.put_array(BSON_SERIALIZER.serialize({:getlasterror => 1}, false).unpack("C*")) - add_message_headers(Mongo::Constants::OP_QUERY, message) - end - - def reset_error_message - @@reset_error_message ||= generate_reset_error_message - end - - def generate_reset_error_message - message = ByteBuffer.new - message.put_int(0) - BSON.serialize_cstr(message, "#{@name}.$cmd") - message.put_int(0) - message.put_int(-1) - message.put_array(BSON_SERIALIZER.serialize({:reseterror => 1}, false).unpack("C*")) - add_message_headers(Mongo::Constants::OP_QUERY, message) - end - def hash_password(username, plaintext) Digest::MD5.hexdigest("#{username}:mongo:#{plaintext}") end @@ -678,5 +394,19 @@ module Mongo def system_command_collection Collection.new(self, SYSTEM_COMMAND_COLLECTION) end + + def validate_db_name(db_name) + unless [String, Symbol].include?(db_name.class) + raise TypeError, "db_name must be a string or symbol" + end + + [" ", ".", "$", "/", "\\"].each do |invalid_char| + if db_name.include? invalid_char + raise InvalidName, "database names cannot contain the character '#{invalid_char}'" + end + end + raise InvalidName, "database name cannot be the empty string" if db_name.empty? + db_name + end end end diff --git a/lib/mongo/errors.rb b/lib/mongo/errors.rb index 326f9d2..69cc9e5 100644 --- a/lib/mongo/errors.rb +++ b/lib/mongo/errors.rb @@ -27,6 +27,12 @@ module Mongo # Raised when invalid arguments are sent to Mongo Ruby methods. class MongoArgumentError < MongoRubyError; end + # Raised on failures in connection to the database server. + class ConnectionError < MongoRubyError; end + + # Raised on failures in connection to the database server. + class ConnectionTimeoutError < MongoRubyError; end + # Raised when a database operation fails. class OperationFailure < MongoDBError; end diff --git a/lib/mongo/gridfs/grid_store.rb b/lib/mongo/gridfs/grid_store.rb index 4ae53fa..126026a 100644 --- a/lib/mongo/gridfs/grid_store.rb +++ b/lib/mongo/gridfs/grid_store.rb @@ -442,7 +442,7 @@ module GridFS md5_command = OrderedHash.new md5_command['filemd5'] = @files_id md5_command['root'] = @root - h['md5'] = @db.db_command(md5_command)['md5'] + h['md5'] = @db.command(md5_command)['md5'] h end diff --git a/test/test_collection.rb b/test/test_collection.rb index 2ddf4cb..34f1efc 100644 --- a/test/test_collection.rb +++ b/test/test_collection.rb @@ -144,7 +144,7 @@ class TestCollection < Test::Unit::TestCase # Can't duplicate an index. assert_raise OperationFailure do - @@test.update({}, {"x" => 10}, :safe => true, :upsert => true) + @@test.update({}, {"x" => 10}, :safe => true) end end end @@ -154,7 +154,6 @@ class TestCollection < Test::Unit::TestCase @@test.save("hello" => "world") @@test.save("hello" => "world") - assert(@@db.error.include?("E11000")) assert_raise OperationFailure do @@test.save({"hello" => "world"}, :safe => true) diff --git a/test/test_connection.rb b/test/test_connection.rb index 42a5c18..499de84 100644 --- a/test/test_connection.rb +++ b/test/test_connection.rb @@ -79,8 +79,7 @@ class TestConnection < Test::Unit::TestCase logger = Logger.new(output) logger.level = Logger::DEBUG db = Connection.new(@host, @port, :logger => logger).db('ruby-mongo-test') - - assert output.string.include?("$cmd.find") + assert output.string.include?("admin.$cmd.find") end def test_connection_logger @@ -106,23 +105,23 @@ class TestConnection < Test::Unit::TestCase assert !@mongo.database_names.include?('ruby-mongo-will-be-deleted') end - def test_pair - db = Connection.new({:left => ['foo', 123]}) - pair = db.instance_variable_get('@pair') - assert_equal 2, pair.length - assert_equal ['foo', 123], pair[0] - assert_equal ['localhost', Connection::DEFAULT_PORT], pair[1] + def test_nodes + db = Connection.new({:left => ['foo', 123]}, nil, :connect => false) + nodes = db.nodes + assert_equal 2, db.nodes.length + assert_equal ['foo', 123], nodes[0] + assert_equal ['localhost', Connection::DEFAULT_PORT], nodes[1] - db = Connection.new({:right => 'bar'}) - pair = db.instance_variable_get('@pair') - assert_equal 2, pair.length - assert_equal ['localhost', Connection::DEFAULT_PORT], pair[0] - assert_equal ['bar', Connection::DEFAULT_PORT], pair[1] + db = Connection.new({:right => 'bar'}, nil, :connect => false) + nodes = db.nodes + assert_equal 2, nodes.length + assert_equal ['localhost', Connection::DEFAULT_PORT], nodes[0] + assert_equal ['bar', Connection::DEFAULT_PORT], nodes[1] - db = Connection.new({:right => ['foo', 123], :left => 'bar'}) - pair = db.instance_variable_get('@pair') - assert_equal 2, pair.length - assert_equal ['bar', Connection::DEFAULT_PORT], pair[0] - assert_equal ['foo', 123], pair[1] + db = Connection.new({:right => ['foo', 123], :left => 'bar'}, nil, :connect => false) + nodes = db.nodes + assert_equal 2, nodes.length + assert_equal ['bar', Connection::DEFAULT_PORT], nodes[0] + assert_equal ['foo', 123], nodes[1] end end diff --git a/test/test_cursor.rb b/test/test_cursor.rb index fda57ba..30da2f4 100644 --- a/test/test_cursor.rb +++ b/test/test_cursor.rb @@ -246,26 +246,26 @@ class CursorTest < Test::Unit::TestCase def test_kill_cursors @@coll.drop - client_cursors = @@db.db_command("cursorInfo" => 1)["clientCursors_size"] - by_location = @@db.db_command("cursorInfo" => 1)["byLocation_size"] + client_cursors = @@db.command("cursorInfo" => 1)["clientCursors_size"] + by_location = @@db.command("cursorInfo" => 1)["byLocation_size"] 10000.times do |i| @@coll.insert("i" => i) end assert_equal(client_cursors, - @@db.db_command("cursorInfo" => 1)["clientCursors_size"]) + @@db.command("cursorInfo" => 1)["clientCursors_size"]) assert_equal(by_location, - @@db.db_command("cursorInfo" => 1)["byLocation_size"]) + @@db.command("cursorInfo" => 1)["byLocation_size"]) 10.times do |i| @@coll.find_one() end assert_equal(client_cursors, - @@db.db_command("cursorInfo" => 1)["clientCursors_size"]) + @@db.command("cursorInfo" => 1)["clientCursors_size"]) assert_equal(by_location, - @@db.db_command("cursorInfo" => 1)["byLocation_size"]) + @@db.command("cursorInfo" => 1)["byLocation_size"]) 10.times do |i| a = @@coll.find() @@ -274,49 +274,49 @@ class CursorTest < Test::Unit::TestCase end assert_equal(client_cursors, - @@db.db_command("cursorInfo" => 1)["clientCursors_size"]) + @@db.command("cursorInfo" => 1)["clientCursors_size"]) assert_equal(by_location, - @@db.db_command("cursorInfo" => 1)["byLocation_size"]) + @@db.command("cursorInfo" => 1)["byLocation_size"]) a = @@coll.find() a.next_object() assert_not_equal(client_cursors, - @@db.db_command("cursorInfo" => 1)["clientCursors_size"]) + @@db.command("cursorInfo" => 1)["clientCursors_size"]) assert_not_equal(by_location, - @@db.db_command("cursorInfo" => 1)["byLocation_size"]) + @@db.command("cursorInfo" => 1)["byLocation_size"]) a.close() assert_equal(client_cursors, - @@db.db_command("cursorInfo" => 1)["clientCursors_size"]) + @@db.command("cursorInfo" => 1)["clientCursors_size"]) assert_equal(by_location, - @@db.db_command("cursorInfo" => 1)["byLocation_size"]) + @@db.command("cursorInfo" => 1)["byLocation_size"]) a = @@coll.find({}, :limit => 10).next_object() assert_equal(client_cursors, - @@db.db_command("cursorInfo" => 1)["clientCursors_size"]) + @@db.command("cursorInfo" => 1)["clientCursors_size"]) assert_equal(by_location, - @@db.db_command("cursorInfo" => 1)["byLocation_size"]) + @@db.command("cursorInfo" => 1)["byLocation_size"]) @@coll.find() do |cursor| cursor.next_object() end assert_equal(client_cursors, - @@db.db_command("cursorInfo" => 1)["clientCursors_size"]) + @@db.command("cursorInfo" => 1)["clientCursors_size"]) assert_equal(by_location, - @@db.db_command("cursorInfo" => 1)["byLocation_size"]) + @@db.command("cursorInfo" => 1)["byLocation_size"]) @@coll.find() { |cursor| cursor.next_object() } assert_equal(client_cursors, - @@db.db_command("cursorInfo" => 1)["clientCursors_size"]) + @@db.command("cursorInfo" => 1)["clientCursors_size"]) assert_equal(by_location, - @@db.db_command("cursorInfo" => 1)["byLocation_size"]) + @@db.command("cursorInfo" => 1)["byLocation_size"]) end def test_count_with_fields diff --git a/test/test_db.rb b/test/test_db.rb index fdf1e0b..0f1c574 100644 --- a/test/test_db.rb +++ b/test/test_db.rb @@ -17,9 +17,10 @@ class DBTest < Test::Unit::TestCase include Mongo - @@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost' - @@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT - @@db = Connection.new(@@host, @@port).db('ruby-mongo-test') + @@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost' + @@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT + @@conn = Connection.new(@@host, @@port) + @@db = @@conn.db('ruby-mongo-test') @@users = @@db.collection('system.users') def setup @@ -35,8 +36,8 @@ class DBTest < Test::Unit::TestCase end def test_close - @@db.close - assert !@@db.connected? + @@conn.close + assert !@@conn.connected? begin @@db.collection('test').insert('a' => 1) fail "expected 'NilClass' exception" @@ -52,10 +53,10 @@ class DBTest < Test::Unit::TestCase output = StringIO.new logger = Logger.new(output) logger.level = Logger::DEBUG - db = Connection.new(@host, @port, :logger => logger).db('ruby-mongo-test') - assert_equal logger, db.logger + conn = Connection.new(@host, @port, :logger => logger) + assert_equal logger, conn.logger - db.logger.debug 'testing' + conn.logger.debug 'testing' assert output.string.include?('testing') end @@ -89,12 +90,16 @@ class DBTest < Test::Unit::TestCase end def test_pair - @@db.close + @@conn.close @@users = nil - @@db = Connection.new({:left => "this-should-fail", :right => [@@host, @@port]}).db('ruby-mongo-test') - assert @@db.connected? + @@conn = Connection.new({:left => "this-should-fail", :right => [@@host, @@port]}) + @@db = @@conn['ruby-mongo-test'] + assert @@conn.connected? ensure - @@db = Connection.new(@@host, @@port).db('ruby-mongo-test') unless @@db.connected? + unless @@conn.connected? + @@conn = Connection.new(@@host, @@port) + @@db = @@conn.db('ruby-mongo-test') + end @@users = @@db.collection('system.users') end @@ -122,7 +127,8 @@ class DBTest < Test::Unit::TestCase end def test_pk_factory_reset - db = Connection.new(@@host, @@port).db('ruby-mongo-test') + conn = Connection.new(@@host, @@port) + db = conn.db('ruby-mongo-test') db.pk_factory = Object.new # first time begin db.pk_factory = Object.new @@ -130,7 +136,7 @@ class DBTest < Test::Unit::TestCase rescue => ex assert_match /can not change PK factory/, ex.to_s ensure - db.close + conn.close end end @@ -150,12 +156,12 @@ class DBTest < Test::Unit::TestCase assert !@@db.error? assert_nil @@db.previous_error - @@db.send(:db_command, :forceerror => 1) + @@db.send(:command, :forceerror => 1) assert @@db.error? assert_not_nil @@db.error assert_not_nil @@db.previous_error - @@db.send(:db_command, :forceerror => 1) + @@db.send(:command, :forceerror => 1) assert @@db.error? assert @@db.error prev_error = @@db.previous_error @@ -192,10 +198,10 @@ class DBTest < Test::Unit::TestCase assert !@@db.last_status()["updatedExisting"] end - def test_text_port_number - db = DB.new('ruby-mongo-test', [[@@host, @@port.to_s]]) - # If there is no error, all is well - db.collection('users').remove + def test_text_port_number_raises_no_errors + conn = Connection.new(@@host, @@port.to_s) + db = conn['ruby-mongo-test'] + assert db.collection('users').remove end end diff --git a/test/test_db_api.rb b/test/test_db_api.rb index 6b187ff..b291c04 100644 --- a/test/test_db_api.rb +++ b/test/test_db_api.rb @@ -6,11 +6,11 @@ require 'test/unit' class DBAPITest < Test::Unit::TestCase include Mongo - @@connection = Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost', + @@conn = Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost', ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT) - @@db = @@connection.db("ruby-mongo-test") + @@db = @@conn.db("ruby-mongo-test") @@coll = @@db.collection('test') - @@version = @@connection.server_version + @@version = @@conn.server_version def setup @@coll.remove @@ -95,7 +95,7 @@ class DBAPITest < Test::Unit::TestCase # Can't compare _id values because at insert, an _id was added to @r1 by # the database but we don't know what it is without re-reading the record # (which is what we are doing right now). -# assert_equal doc['_id'], @r1['_id'] +# assert_equal doc['_id'], @r1['_id'] assert_equal doc['a'], @r1['a'] end @@ -489,11 +489,11 @@ class DBAPITest < Test::Unit::TestCase end def test_ismaster - assert @@db.master? + assert @@conn.master? end def test_master - assert_equal "#{@@db.host}:#{@@db.port}", @@db.master + assert_equal "#{@@conn.host}:#{@@conn.port}", @@conn.master end def test_where diff --git a/test/test_slave_connection.rb b/test/test_slave_connection.rb index caf9af8..84b9340 100644 --- a/test/test_slave_connection.rb +++ b/test/test_slave_connection.rb @@ -9,8 +9,8 @@ class SlaveConnectionTest < Test::Unit::TestCase def self.connect_to_slave @@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost' @@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT - db = Connection.new(@@host, @@port, :slave_ok => true).db('ruby-mongo-demo') - !db.master? + conn = Connection.new(@@host, @@port, :slave_ok => true) + !conn.master? end if self.connect_to_slave @@ -30,8 +30,8 @@ class SlaveConnectionTest < Test::Unit::TestCase puts "Not connected to slave; skipping slave connection tests." def test_slave_ok_false_on_queries - @db = Connection.new(@@host, @@port).db('ruby-mongo-demo') - assert !@db.slave_ok? + @conn = Connection.new(@@host, @@port) + assert !@conn.slave_ok? end end end diff --git a/test/test_threading.rb b/test/test_threading.rb index fb2a78c..0e0c7c7 100644 --- a/test/test_threading.rb +++ b/test/test_threading.rb @@ -4,14 +4,14 @@ class TestThreading < Test::Unit::TestCase include Mongo - @@db = Connection.new.db('ruby-mongo-test') + @@db = Connection.new('localhost', 27017, :pool_size => 150, :timeout => 1).db('ruby-mongo-test') @@coll = @@db.collection('thread-test-collection') def set_up_safe_data @@db.drop_collection('duplicate') @@db.drop_collection('unique') @duplicate = @@db.collection('duplicate') - @unique = @@db.collection('unique') + @unique = @@db.collection('unique') @duplicate.insert("test" => "insert") @duplicate.insert("test" => "update") diff --git a/test/unit/collection_test.rb b/test/unit/collection_test.rb index 2d8c0e8..0289d39 100644 --- a/test/unit/collection_test.rb +++ b/test/unit/collection_test.rb @@ -1,12 +1,6 @@ require 'test/test_helper' -class CollectionTest < Test::Unit::TestCase - - class MockDB < DB - def connect_to_master - true - end - end +class ConnectionTest < Test::Unit::TestCase context "Basic operations: " do setup do @@ -14,36 +8,40 @@ class CollectionTest < Test::Unit::TestCase end should "send update message" do - @db = MockDB.new("testing", ['localhost', 27017], :logger => @logger) + @conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false) + @db = @conn['testing'] @coll = @db.collection('books') - @db.expects(:send_message_with_operation).with do |op, msg, log| + @conn.expects(:send_message_with_operation).with do |op, msg, log| op == 2001 && log.include?("db.books.update") end @coll.update({}, {:title => 'Moby Dick'}) end should "send insert message" do - @db = MockDB.new("testing", ['localhost', 27017], :logger => @logger) + @conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false) + @db = @conn['testing'] @coll = @db.collection('books') - @db.expects(:send_message_with_operation).with do |op, msg, log| + @conn.expects(:send_message_with_operation).with do |op, msg, log| op == 2002 && log.include?("db.books.insert") end @coll.insert({:title => 'Moby Dick'}) end should "send safe update message" do - @db = MockDB.new("testing", ['localhost', 27017], :logger => @logger) + @conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false) + @db = @conn['testing'] @coll = @db.collection('books') - @db.expects(:send_message_with_safe_check).with do |op, msg, log| + @conn.expects(:send_message_with_safe_check).with do |op, msg, db_name, log| op == 2001 && log.include?("db.books.update") end @coll.update({}, {:title => 'Moby Dick'}, :safe => true) end should "send safe insert message" do - @db = MockDB.new("testing", ['localhost', 27017], :logger => @logger) + @conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false) + @db = @conn['testing'] @coll = @db.collection('books') - @db.expects(:send_message_with_safe_check).with do |op, msg, log| + @conn.expects(:send_message_with_safe_check).with do |op, msg, db_name, log| op == 2001 && log.include?("db.books.update") end @coll.update({}, {:title => 'Moby Dick'}, :safe => true) diff --git a/test/unit/connection_test.rb b/test/unit/connection_test.rb new file mode 100644 index 0000000..a6690b8 --- /dev/null +++ b/test/unit/connection_test.rb @@ -0,0 +1,133 @@ +require 'test/test_helper' + +class ConnectionTest < Test::Unit::TestCase + + def new_mock_socket + socket = Object.new + socket.stubs(:setsockopt).with(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + socket + end + + def new_mock_db + db = Object.new + end + + context "Initialization: " do + + context "given a single node" do + setup do + @conn = Connection.new('localhost', 27107, :connect => false) + + admin_db = new_mock_db + admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1}) + @conn.expects(:[]).with('admin').returns(admin_db) + @conn.connect_to_master + end + + should "set localhost and port to master" do + assert_equal 'localhost', @conn.host + assert_equal 27017, @conn.port + end + + should "set connection pool to 1" do + assert_equal 1, @conn.size + end + + should "default slave_ok to false" do + assert !@conn.slave_ok? + end + + should "default auto_reconnect to false" do + assert !@conn.auto_reconnect? + end + end + end + + context "Connection pooling: " do + setup do + @conn = Connection.new('localhost', 27107, :connect => false, + :pool_size => 3) + + admin_db = new_mock_db + admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1}) + @conn.expects(:[]).with('admin').returns(admin_db) + @conn.connect_to_master + end + + should "check out a new connection" do + socket = @conn.checkout + assert @conn.reserved_connections.keys.include? Thread.current.object_id + end + + context "with multiple threads" do + setup do + @thread1 = Object.new + @thread2 = Object.new + @thread3 = Object.new + @thread4 = Object.new + + Thread.stubs(:current).returns(@thread1) + @socket1 = @conn.checkout + Thread.stubs(:current).returns(@thread2) + @socket2 = @conn.checkout + Thread.stubs(:current).returns(@thread3) + @socket3 = @conn.checkout + end + + should "add each thread to the reserved pool" do + assert @conn.reserved_connections.keys.include?(@thread1.object_id) + assert @conn.reserved_connections.keys.include?(@thread2.object_id) + assert @conn.reserved_connections.keys.include?(@thread3.object_id) + end + + should "only add one socket per thread" do + @conn.reserved_connections.values do |socket| + assert [@socket1, @socket2, @socket3].include?(socket) + end + end + + should "check out all sockets" do + assert_equal @conn.sockets.size, @conn.checked_out.size + @conn.sockets.each do |sock| + assert @conn.checked_out.include?(sock) + end + end + + should "raise an error if no more sockets can be checked out" do + # This can't be tested with mock threads. + # Will test in integration tests. + end + + context "when releasing dead threads" do + setup do + @thread1.expects(:alive?).returns(false) + @thread2.expects(:alive?).returns(true) + @thread3.expects(:alive?).returns(true) + Thread.expects(:list).returns([@thread1, @thread2, @thread3]) + @conn.clear_stale_cached_connections! + end + + should "return connections for dead threads" do + assert !@conn.checked_out.include?(@socket1) + assert_nil @conn.reserved_connections[@thread1.object_id] + end + + should "maintain connection for live threads" do + assert @conn.checked_out.include?(@socket2) + assert @conn.checked_out.include?(@socket3) + end + end + + context "when checking in a socket" do + setup do + @conn.checkin(@socket3) + end + + should "reduce the number checked out by one" do + assert_equal @conn.checked_out.size, (@conn.sockets.size - 1) + end + end + end + end +end + diff --git a/test/unit/cursor_test.rb b/test/unit/cursor_test.rb index 1fe011f..cc2e489 100644 --- a/test/unit/cursor_test.rb +++ b/test/unit/cursor_test.rb @@ -4,7 +4,8 @@ class TestCursor < Test::Unit::TestCase context "Cursor options" do setup do - @db = stub(:name => "testing", :slave_ok? => false) + @connection = stub(:class => Connection) + @db = stub(:name => "testing", :slave_ok? => false, :connection => @connection) @collection = stub(:db => @db, :name => "items") @cursor = Cursor.new(@collection) end @@ -64,39 +65,10 @@ class TestCursor < Test::Unit::TestCase end end - context "Query options" do - should "test timeout true and slave_ok false" do - @db = stub(:slave_ok? => false, :name => "testing") - @collection = stub(:db => @db, :name => "items") - @cursor = Cursor.new(@collection, :timeout => true) - assert_equal 0, @cursor.query_opts - end - - should "test timeout false and slave_ok false" do - @db = stub(:slave_ok? => false, :name => "testing") - @collection = stub(:db => @db, :name => "items") - @cursor = Cursor.new(@collection, :timeout => false) - assert_equal 16, @cursor.query_opts - end - - should "set timeout true and slave_ok true" do - @db = stub(:slave_ok? => true, :name => "testing") - @collection = stub(:db => @db, :name => "items") - @cursor = Cursor.new(@collection, :timeout => true) - assert_equal 4, @cursor.query_opts - end - - should "set timeout false and slave_ok true" do - @db = stub(:slave_ok? => true, :name => "testing") - @collection = stub(:db => @db, :name => "items") - @cursor = Cursor.new(@collection, :timeout => false) - assert_equal 20, @cursor.query_opts - end - end - context "Query fields" do setup do - @db = stub(:slave_ok? => true, :name => "testing") + @connection = stub(:class => Collection) + @db = stub(:slave_ok? => true, :name => "testing", :connection => @connection) @collection = stub(:db => @db, :name => "items") end diff --git a/test/unit/db_test.rb b/test/unit/db_test.rb index 2622cbf..7444778 100644 --- a/test/unit/db_test.rb +++ b/test/unit/db_test.rb @@ -2,16 +2,6 @@ require 'test/test_helper' class DBTest < Test::Unit::TestCase - class MockDB < DB - attr_accessor :socket - - def connect_to_master - true - end - - public :add_message_headers - end - def insert_message(db, documents) documents = [documents] unless documents.is_a?(Array) message = ByteBuffer.new @@ -23,7 +13,8 @@ class DBTest < Test::Unit::TestCase context "DB commands" do setup do - @db = MockDB.new("testing", ['localhost', 27017]) + @conn = stub() + @db = DB.new("testing", @conn) @collection = mock() @db.stubs(:system_command_collection).returns(@collection) end @@ -43,7 +34,7 @@ class DBTest < Test::Unit::TestCase should "create the proper cursor" do @cursor = mock(:next_object => {"ok" => 1}) Cursor.expects(:new).with(@collection, :admin => true, - :limit => -1, :selector => {:buildinfo => 1}).returns(@cursor) + :limit => -1, :selector => {:buildinfo => 1}, :socket => nil).returns(@cursor) command = {:buildinfo => 1} @db.command(command, true) end @@ -51,32 +42,13 @@ class DBTest < Test::Unit::TestCase should "raise an error when the command fails" do @cursor = mock(:next_object => {"ok" => 0}) Cursor.expects(:new).with(@collection, :admin => true, - :limit => -1, :selector => {:buildinfo => 1}).returns(@cursor) + :limit => -1, :selector => {:buildinfo => 1}, :socket => nil).returns(@cursor) assert_raise OperationFailure do command = {:buildinfo => 1} @db.command(command, true, true) end end end - - context "safe messages" do - setup do - @db = MockDB.new("testing", ['localhost', 27017]) - @collection = mock() - @db.stubs(:system_command_collection).returns(@collection) - end - - should "receive getlasterror message" do - @socket = mock() - @socket.stubs(:close) - @socket.expects(:flush) - @socket.expects(:print).with { |message| message.include?('getlasterror') } - @db.socket = @socket - @db.stubs(:receive) - message = insert_message(@db, {:a => 1}) - @db.send_message_with_safe_check(Mongo::Constants::OP_QUERY, message) - end - end end