Merged branch for initial connection pooling code

This commit is contained in:
Kyle Banker 2009-11-23 15:20:05 -05:00
parent 7890d6e146
commit 11a92349e9
20 changed files with 772 additions and 544 deletions

View File

@ -31,5 +31,5 @@ module Mongo
ASCENDING = 1 ASCENDING = 1
DESCENDING = -1 DESCENDING = -1
VERSION = "0.17.2" VERSION = "0.18"
end end

View File

@ -30,7 +30,7 @@ module Mongo
def profiling_level def profiling_level
oh = OrderedHash.new oh = OrderedHash.new
oh[:profile] = -1 oh[:profile] = -1
doc = @db.db_command(oh) doc = @db.command(oh)
raise "Error with profile command: #{doc.inspect}" unless @db.ok?(doc) && doc['was'].kind_of?(Numeric) raise "Error with profile command: #{doc.inspect}" unless @db.ok?(doc) && doc['was'].kind_of?(Numeric)
case doc['was'].to_i case doc['was'].to_i
when 0 when 0
@ -57,7 +57,7 @@ module Mongo
else else
raise "Error: illegal profiling level value #{level}" raise "Error: illegal profiling level value #{level}"
end end
doc = @db.db_command(oh) doc = @db.command(oh)
raise "Error with profile command: #{doc.inspect}" unless @db.ok?(doc) raise "Error with profile command: #{doc.inspect}" unless @db.ok?(doc)
end end
@ -71,7 +71,7 @@ module Mongo
# problem or returning an interesting hash (see especially the # problem or returning an interesting hash (see especially the
# 'result' string value) if all is well. # 'result' string value) if all is well.
def validate_collection(name) def validate_collection(name)
doc = @db.db_command(:validate => name) doc = @db.command(:validate => name)
raise "Error with validate command: #{doc.inspect}" unless @db.ok?(doc) raise "Error with validate command: #{doc.inspect}" unless @db.ok?(doc)
result = doc['result'] result = doc['result']
raise "Error with validation data: #{doc.inspect}" unless result.kind_of?(String) raise "Error with validation data: #{doc.inspect}" unless result.kind_of?(String)

View File

@ -41,6 +41,7 @@ module Mongo
end end
@db, @name = db, name @db, @name = db, name
@connection = @db.connection
@pk_factory = pk_factory || ObjectID @pk_factory = pk_factory || ObjectID
@hint = nil @hint = nil
end end
@ -222,7 +223,7 @@ module Mongo
BSON.serialize_cstr(message, "#{@db.name}.#{@name}") BSON.serialize_cstr(message, "#{@db.name}.#{@name}")
message.put_int(0) message.put_int(0)
message.put_array(BSON_SERIALIZER.serialize(selector, false).unpack("C*")) message.put_array(BSON_SERIALIZER.serialize(selector, false).unpack("C*"))
@db.send_message_with_operation(Mongo::Constants::OP_DELETE, message, @connection.send_message_with_operation(Mongo::Constants::OP_DELETE, message,
"db.#{@db.name}.remove(#{selector.inspect})") "db.#{@db.name}.remove(#{selector.inspect})")
end end
@ -261,10 +262,10 @@ module Mongo
message.put_array(BSON_SERIALIZER.serialize(selector, false).unpack("C*")) message.put_array(BSON_SERIALIZER.serialize(selector, false).unpack("C*"))
message.put_array(BSON_SERIALIZER.serialize(document, false).unpack("C*")) message.put_array(BSON_SERIALIZER.serialize(document, false).unpack("C*"))
if options[:safe] if options[:safe]
@db.send_message_with_safe_check(Mongo::Constants::OP_UPDATE, message, @connection.send_message_with_safe_check(Mongo::Constants::OP_UPDATE, message, @db.name,
"db.#{@name}.update(#{selector.inspect}, #{document.inspect})") "db.#{@name}.update(#{selector.inspect}, #{document.inspect})")
else else
@db.send_message_with_operation(Mongo::Constants::OP_UPDATE, message, @connection.send_message_with_operation(Mongo::Constants::OP_UPDATE, message,
"db.#{@name}.update(#{selector.inspect}, #{document.inspect})") "db.#{@name}.update(#{selector.inspect}, #{document.inspect})")
end end
end end
@ -333,7 +334,7 @@ module Mongo
reduce = Code.new(reduce) reduce = Code.new(reduce)
end end
result = @db.db_command({"group" => result = @db.command({"group" =>
{ {
"ns" => @name, "ns" => @name,
"$reduce" => reduce, "$reduce" => reduce,
@ -406,7 +407,7 @@ EOS
command[:distinct] = @name command[:distinct] = @name
command[:key] = key.to_s command[:key] = key.to_s
@db.db_command(command)["values"] @db.command(command)["values"]
end end
# Rename this collection. # Rename this collection.
@ -488,10 +489,10 @@ EOS
BSON.serialize_cstr(message, "#{@db.name}.#{collection_name}") BSON.serialize_cstr(message, "#{@db.name}.#{collection_name}")
documents.each { |doc| message.put_array(BSON_SERIALIZER.serialize(doc, check_keys).unpack("C*")) } documents.each { |doc| message.put_array(BSON_SERIALIZER.serialize(doc, check_keys).unpack("C*")) }
if safe if safe
@db.send_message_with_safe_check(Mongo::Constants::OP_INSERT, message, @connection.send_message_with_safe_check(Mongo::Constants::OP_INSERT, message, @db.name,
"db.#{collection_name}.insert(#{documents.inspect})") "db.#{collection_name}.insert(#{documents.inspect})")
else else
@db.send_message_with_operation(Mongo::Constants::OP_INSERT, message, @connection.send_message_with_operation(Mongo::Constants::OP_INSERT, message,
"db.#{collection_name}.insert(#{documents.inspect})") "db.#{collection_name}.insert(#{documents.inspect})")
end end
documents.collect { |o| o[:_id] || o['_id'] } documents.collect { |o| o[:_id] || o['_id'] }

View File

@ -14,12 +14,27 @@
# limitations under the License. # limitations under the License.
# ++ # ++
require 'set'
require 'socket'
require 'monitor'
module Mongo module Mongo
# A connection to MongoDB. # A connection to MongoDB.
class Connection class Connection
DEFAULT_PORT = 27017 DEFAULT_PORT = 27017
STANDARD_HEADER_SIZE = 16
RESPONSE_HEADER_SIZE = 20
attr_reader :logger, :size, :host, :port, :nodes, :sockets, :checked_out, :reserved_connections
def slave_ok?; @slave_ok; end
def auto_reconnect?; @auto_reconnect; end
# Counter for generating unique request ids.
@@current_request_id = 0
# Create a Mongo database server instance. You specify either one or a # Create a Mongo database server instance. You specify either one or a
# pair of servers. If one, you also say if connecting to a slave is # pair of servers. If one, you also say if connecting to a slave is
@ -69,41 +84,50 @@ module Mongo
# When a DB object first connects to a pair, it will find the master # When a DB object first connects to a pair, it will find the master
# instance and connect to that one. # instance and connect to that one.
def initialize(pair_or_host=nil, port=nil, options={}) def initialize(pair_or_host=nil, port=nil, options={})
@pair = case pair_or_host @nodes = format_pair(pair_or_host)
when String
[[pair_or_host, port ? port.to_i : DEFAULT_PORT]]
when Hash
connections = []
connections << pair_val_to_connection(pair_or_host[:left])
connections << pair_val_to_connection(pair_or_host[:right])
connections
when nil
[['localhost', DEFAULT_PORT]]
end
# Host and port of current master.
@host = @port = nil
# Lock for request ids.
@id_lock = Mutex.new
# Lock for checking master.
@master_lock = Mutex.new
# Pool size and timeout.
@size = options[:pool_size] || 1
@timeout = options[:timeout] || 1.0
# Cache of reserved sockets mapped to threads
@reserved_connections = {}
# Mutex for synchronizing pool access
@connection_mutex = Monitor.new
# Condition variable for signal and wait
@queue = @connection_mutex.new_cond
@sockets = []
@checked_out = []
# Slave ok can be true only if one node is specified
@auto_reconnect = options[:auto_reconnect]
@slave_ok = options[:slave_ok] && @nodes.length == 1
@logger = options[:logger] || nil
@options = options @options = options
should_connect = options[:connect].nil? ? true : options[:connect]
connect_to_master if should_connect
end end
# Return the Mongo::DB named +db_name+. The slave_ok and # Returns a hash with all database names and their respective sizes on
# auto_reconnect options passed in via #new may be overridden here. # disk.
# See DB#new for other options you can pass in.
def db(db_name, options={})
DB.new(db_name, @pair, @options.merge(options))
end
def logger
@options[:logger]
end
# Returns a hash containing database names as keys and disk space for
# each as values.
def database_info def database_info
doc = single_db_command('admin', :listDatabases => 1) doc = self['admin'].command(:listDatabases => 1)
h = {} returning({}) do |info|
doc['databases'].each { |db| doc['databases'].each { |db| info[db['name']] = db['sizeOnDisk'].to_i }
h[db['name']] = db['sizeOnDisk'].to_i end
}
h
end end
# Returns an array of database names. # Returns an array of database names.
@ -111,9 +135,21 @@ module Mongo
database_info.keys database_info.keys
end end
# Return the database named +db_name+. The slave_ok and
# auto_reconnect options passed in via #new may be overridden here.
# See DB#new for other options you can pass in.
def db(db_name, options={})
DB.new(db_name, self, options.merge(:logger => @logger))
end
# Return the database named +db_name+.
def [](db_name)
DB.new(db_name, self, :logger => @logger)
end
# Drops the database +name+. # Drops the database +name+.
def drop_database(name) def drop_database(name)
single_db_command(name, :dropDatabase => 1) self[name].command(:dropDatabase => 1)
end end
# Copies the database +from+ on the local server to +to+ on the specified +host+. # Copies the database +from+ on the local server to +to+ on the specified +host+.
@ -124,7 +160,35 @@ module Mongo
oh[:fromhost] = host oh[:fromhost] = host
oh[:fromdb] = from oh[:fromdb] = from
oh[:todb] = to oh[:todb] = to
single_db_command('admin', oh) self["admin"].command(oh)
end
# Increments and returns the next available request id.
def get_request_id
request_id = ''
@id_lock.synchronize do
request_id = @@current_request_id += 1
end
request_id
end
# Prepares a message for transmission to MongoDB by
# constructing a valid message header.
def add_message_headers(operation, message)
headers = ByteBuffer.new
# Message size.
headers.put_int(16 + message.size)
# Unique request id.
headers.put_int(get_request_id)
# Response id.
headers.put_int(0)
# Opcode.
headers.put_int(operation)
message.prepend!(headers)
end end
# Return the build information for the current connection. # Return the build information for the current connection.
@ -132,13 +196,374 @@ module Mongo
db("admin").command({:buildinfo => 1}, {:admin => true, :check_response => true}) db("admin").command({:buildinfo => 1}, {:admin => true, :check_response => true})
end end
# Returns the build version of the current server, using # Get the build version of the current server.
# a ServerVersion object for comparability. # Returns a ServerVersion object for comparability.
def server_version def server_version
ServerVersion.new(server_info["version"]) ServerVersion.new(server_info["version"])
end end
protected
## Connections and pooling ##
# Sends a message to MongoDB.
#
# Takes a MongoDB opcode, +operation+, a message of class ByteBuffer,
# +message+, and an optional formatted +log_message+.
def send_message(operation, message, log_message=nil)
@logger.debug(" MONGODB #{log_message || message}") if @logger
packed_message = pack_message(operation, message)
socket = checkout
send_message_on_socket(packed_message, socket)
end
# Sends a message to MongoDB and returns the response.
#
# Takes a MongoDB opcode, +operation+, a message of class ByteBuffer,
# +message+, an optional formatted +log_message+, and an optional
# socket.
def receive_message(operation, message, log_msg=nil, sock=nil)
@logger.debug(" MONGODB #{log_message || message}") if @logger
packed_message = pack_message(operation, message)
# This code is used only if we're checking for master.
if sock
@master_lock.synchronize do
response = send_and_receive(packed_message, sock)
end
else
socket = checkout
response = send_and_receive(packed_message, socket)
end
response
end
# Sends a message to MongoDB.
#
# Takes a MongoDB opcode, +operation+, a message of class ByteBuffer,
# +message+, and an optional formatted +log_message+.
# Sends the message to the databse, adding the necessary headers.
def send_message_with_operation(operation, message, log_message=nil)
@logger.debug(" MONGODB #{log_message || message}") if @logger
packed_message = pack_message(operation, message)
socket = checkout
response = send_message_on_socket(packed_message, socket)
checkin(socket)
response
end
# Sends a message to the database, waits for a response, and raises
# and exception if the operation has failed.
def send_message_with_safe_check(operation, message, db_name, log_message=nil)
message_with_headers = add_message_headers(operation, message)
message_with_check = last_error_message(db_name)
@logger.debug(" MONGODB #{log_message || message}") if @logger
sock = checkout
msg = message_with_headers.append!(message_with_check).to_s
send_message_on_socket(msg, sock)
docs, num_received, cursor_id = receive(sock)
if num_received == 1 && error = docs[0]['err']
raise Mongo::OperationFailure, error
end
checkin(sock)
[docs, num_received, cursor_id]
end
# Send a message to the database and waits for the response.
def receive_message_with_operation(operation, message, log_message=nil, socket=nil)
message_with_headers = add_message_headers(operation, message).to_s
@logger.debug(" MONGODB #{log_message || message}") if @logger
sock = socket || checkout
send_message_on_socket(message_with_headers, sock)
receive(sock)
end
# Creates a new socket and tries to connect to master.
# If successful, sets @host and @port to master and returns the socket.
def connect_to_master
@host = @port = nil
for node_pair in @nodes
host, port = *node_pair
begin
socket = TCPSocket.new(host, port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
result = self['admin'].command({:ismaster => 1}, false, false, socket)
if result['ok'] == 1 && ((is_master = result['ismaster'] == 1) || @slave_ok)
@host, @port = host, port
end
# Note: slave_ok can be true only when connecting to a single node.
if !@slave_ok && !is_master
raise ConfigurationError, "Trying to connect directly to slave; " +
"if this is what you want, specify :slave_ok => true."
end
break if is_master || @slave_ok
rescue SocketError, SystemCallError, IOError => ex
socket.close if socket
false
end
end
raise ConnectionError, "failed to connect to any given host:port" unless socket
end
def master?
doc = self['admin'].command(:ismaster => 1)
doc['ok'] == 1 && doc['ismaster'] == 1
end
# Returns a string of the form "host:port" that points to the master
# database. Works even if this is the master database.
def master
doc = self['admin'].command(:ismaster => 1)
if doc['ok'] == 1 && doc['ismaster'] == 1
"#@host:#@port"
elsif doc['remote']
doc['remote']
else
raise "Error retrieving master database: #{doc.inspect}"
end
end
def connected?
@sockets.detect do |sock|
sock.is_a? Socket
end || (@host && @port)
end
# Close the connection to the database.
def close
@sockets.each do |sock|
sock.close
end
@host = @port = nil
@sockets.clear
@checked_out.clear
@reserved_connections.clear
end
# Get a socket from the pool, mapped to the current thread.
def checkout
if sock = @reserved_connections[Thread.current.object_id]
sock
else
sock = obtain_socket
@reserved_connections[Thread.current.object_id] = sock
end
sock
end
# Return a socket to the pool.
def checkin(socket)
@connection_mutex.synchronize do
@checked_out.delete(socket)
@reserved_connections.delete Thread.current.object_id
@queue.signal
end
end
# Releases connection for any dead threads.
# Called when the connection pool grows too large
# and we need additional sockets.
def clear_stale_cached_connections!
keys = Set.new(@reserved_connections.keys)
Thread.list.each do |thread|
keys.delete(thread.object_id) if thread.alive?
end
keys.each do |key|
next unless @reserved_connections.has_key?(key)
checkin(@reserved_connections[key])
@reserved_connections.delete(key)
end
end
# Adds a new socket to the pool and checks it out.
#
# This method is called exclusively from #obtain_socket;
# therefore, it runs within a mutex, as it must.
def checkout_new_socket
socket = TCPSocket.new(@host, @port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
@sockets << socket
@checked_out << socket
socket
end
# Checks out the first available socket from the pool.
#
# This method is called exclusively from #obtain_socket;
# therefore, it runs within a mutex, as it must.
def checkout_existing_socket
socket = (@sockets - @checked_out).first
@checked_out << socket
socket
end
# Check out an existing socket or create a new socket if the maximum
# pool size has not been exceeded. Otherwise, wait for the next
# available socket.
def obtain_socket
@connection_mutex.synchronize do
# NOTE: Not certain that this is the best place for reconnect
connect_to_master if !connected? && @auto_reconnect
loop do
socket = if @checked_out.size < @sockets.size
checkout_existing_socket
elsif @sockets.size < @size
checkout_new_socket
end
return socket if socket
# No connections available; wait.
if @queue.wait(@timeout)
next
else
# Try to clear out any stale threads to free up some connections
clear_stale_cached_connections!
if @size == @sockets.size
raise ConnectionTimeoutError, "could not obtain connection within " +
"#{@timeout} seconds. The max pool size is currently #{@size}; " +
"consider increasing it."
end
end # if
end # loop
end #sync
end
def receive(sock)
receive_header(sock)
number_received, cursor_id = receive_response_header(sock)
read_documents(number_received, cursor_id, sock)
end
def receive_header(sock)
header = ByteBuffer.new
header.put_array(receive_message_on_socket(16, sock).unpack("C*"))
unless header.size == STANDARD_HEADER_SIZE
raise "Short read for DB response header: " +
"expected #{STANDARD_HEADER_SIZE} bytes, saw #{header.size}"
end
header.rewind
size = header.get_int
request_id = header.get_int
response_to = header.get_int
op = header.get_int
end
def receive_response_header(sock)
header_buf = ByteBuffer.new
header_buf.put_array(receive_message_on_socket(RESPONSE_HEADER_SIZE, sock).unpack("C*"))
if header_buf.length != RESPONSE_HEADER_SIZE
raise "Short read for DB response header; " +
"expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}"
end
header_buf.rewind
result_flags = header_buf.get_int
cursor_id = header_buf.get_long
starting_from = header_buf.get_int
number_remaining = header_buf.get_int
[number_remaining, cursor_id]
end
def read_documents(number_received, cursor_id, sock)
docs = []
number_remaining = number_received
while number_remaining > 0 do
buf = ByteBuffer.new
buf.put_array(receive_message_on_socket(4, sock).unpack("C*"))
buf.rewind
size = buf.get_int
buf.put_array(receive_message_on_socket(size - 4, sock).unpack("C*"), 4)
number_remaining -= 1
buf.rewind
docs << BSON.new.deserialize(buf)
end
[docs, number_received, cursor_id]
end
def last_error_message(db_name)
message = ByteBuffer.new
message.put_int(0)
BSON.serialize_cstr(message, "#{db_name}.$cmd")
message.put_int(0)
message.put_int(-1)
message.put_array(BSON_SERIALIZER.serialize({:getlasterror => 1}, false).unpack("C*"))
add_message_headers(Mongo::Constants::OP_QUERY, message)
end
# Prepares a message for transmission to MongoDB by
# constructing a message header with a new request id.
def pack_message(operation, message)
headers = ByteBuffer.new
# Message size.
headers.put_int(16 + message.size)
# Unique request id.
headers.put_int(get_request_id)
# Response id.
headers.put_int(0)
# Opcode.
headers.put_int(operation)
message.prepend!(headers)
message.to_s
end
#def send_and_receive
# send_message_on_socket(packed_message, socket)
# receive_message_on_socket()
#end
# Low-level method for sending a message on a socket.
# Requires a packed message and an available socket,
def send_message_on_socket(packed_message, socket)
#socket will be connected to master when we receive it
#begin
socket.send(packed_message, 0)
#rescue => ex
# close
# need to find a way to release the socket here
# checkin(socket)
# raise ex
#end
end
# Low-level method for receiving data from socket.
# Requires length and an available socket.
def receive_message_on_socket(length, socket)
message = ""
while message.length < length do
chunk = socket.recv(length - message.length)
raise "connection closed" unless chunk.length > 0
message += chunk
end
message
end
## Private helper methods
# Returns an array of host-port pairs.
def format_pair(pair_or_host)
case pair_or_host
when String
[[pair_or_host, port ? port.to_i : DEFAULT_PORT]]
when Hash
connections = []
connections << pair_val_to_connection(pair_or_host[:left])
connections << pair_val_to_connection(pair_or_host[:right])
connections
when nil
[['localhost', DEFAULT_PORT]]
end
end
# Turns an array containing a host name string and a # Turns an array containing a host name string and a
# port number integer into a [host, port] pair array. # port number integer into a [host, port] pair array.
@ -154,20 +579,5 @@ module Mongo
a a
end end
end end
# Send cmd (a hash, possibly ordered) to the admin database and return
# the answer. Raises an error unless the return is "ok" (DB#ok?
# returns +true+).
def single_db_command(db_name, cmd)
db = nil
begin
db = db(db_name)
doc = db.db_command(cmd)
raise "error retrieving database info: #{doc.inspect}" unless db.ok?(doc)
doc
ensure
db.close if db
end
end
end end
end end

View File

@ -32,6 +32,7 @@ module Mongo
def initialize(collection, options={}) def initialize(collection, options={})
@db = collection.db @db = collection.db
@collection = collection @collection = collection
@connection = @db.connection
@selector = convert_selector_for_query(options[:selector]) @selector = convert_selector_for_query(options[:selector])
@fields = convert_fields_for_query(options[:fields]) @fields = convert_fields_for_query(options[:fields])
@ -43,6 +44,7 @@ module Mongo
@snapshot = options[:snapshot] @snapshot = options[:snapshot]
@timeout = options[:timeout] || false @timeout = options[:timeout] || false
@explain = options[:explain] @explain = options[:explain]
@socket = options[:socket]
@full_collection_name = "#{@collection.db.name}.#{@collection.name}" @full_collection_name = "#{@collection.db.name}.#{@collection.name}"
@cache = [] @cache = []
@ -83,7 +85,7 @@ module Mongo
command = OrderedHash["count", @collection.name, command = OrderedHash["count", @collection.name,
"query", @selector, "query", @selector,
"fields", @fields] "fields", @fields]
response = @db.db_command(command) response = @db.command(command)
return response['n'].to_i if response['ok'] == 1 return response['n'].to_i if response['ok'] == 1
return 0 if response['errmsg'] == "ns missing" return 0 if response['errmsg'] == "ns missing"
raise OperationFailure, "Count failed: #{response['errmsg']}" raise OperationFailure, "Count failed: #{response['errmsg']}"
@ -199,7 +201,7 @@ module Mongo
message.put_int(0) message.put_int(0)
message.put_int(1) message.put_int(1)
message.put_long(@cursor_id) message.put_long(@cursor_id)
@db.send_message_with_operation(Mongo::Constants::OP_KILL_CURSORS, message, "cursor.close()") @connection.send_message_with_operation(Mongo::Constants::OP_KILL_CURSORS, message, "cursor.close()")
end end
@cursor_id = 0 @cursor_id = 0
@closed = true @closed = true
@ -212,7 +214,7 @@ module Mongo
# See http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY # See http://www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY
def query_opts def query_opts
timeout = @timeout ? 0 : Mongo::Constants::OP_QUERY_NO_CURSOR_TIMEOUT timeout = @timeout ? 0 : Mongo::Constants::OP_QUERY_NO_CURSOR_TIMEOUT
slave_ok = @db.slave_ok? ? Mongo::Constants::OP_QUERY_SLAVE_OK : 0 slave_ok = @connection.slave_ok? ? Mongo::Constants::OP_QUERY_SLAVE_OK : 0
slave_ok + timeout slave_ok + timeout
end end
@ -294,7 +296,7 @@ module Mongo
# Cursor id. # Cursor id.
message.put_long(@cursor_id) message.put_long(@cursor_id)
results, @n_received, @cursor_id = @db.receive_message_with_operation(Mongo::Constants::OP_GET_MORE, message, "cursor.get_more()") results, @n_received, @cursor_id = @connection.receive_message_with_operation(Mongo::Constants::OP_GET_MORE, message, "cursor.get_more()", @socket)
@cache += results @cache += results
close_cursor_if_query_complete close_cursor_if_query_complete
end end
@ -305,8 +307,8 @@ module Mongo
false false
else else
message = construct_query_message message = construct_query_message
results, @n_received, @cursor_id = @db.receive_message_with_operation(Mongo::Constants::OP_QUERY, message, results, @n_received, @cursor_id = @connection.receive_message_with_operation(Mongo::Constants::OP_QUERY, message,
(query_log_message if @db.logger)) (query_log_message if @connection.logger), @socket)
@cache += results @cache += results
@query_run = true @query_run = true
close_cursor_if_query_complete close_cursor_if_query_complete

View File

@ -27,8 +27,6 @@ module Mongo
# A Mongo database. # A Mongo database.
class DB class DB
STANDARD_HEADER_SIZE = 16
RESPONSE_HEADER_SIZE = 20
SYSTEM_NAMESPACE_COLLECTION = "system.namespaces" SYSTEM_NAMESPACE_COLLECTION = "system.namespaces"
SYSTEM_INDEX_COLLECTION = "system.indexes" SYSTEM_INDEX_COLLECTION = "system.indexes"
SYSTEM_PROFILE_COLLECTION = "system.profile" SYSTEM_PROFILE_COLLECTION = "system.profile"
@ -39,11 +37,10 @@ module Mongo
@@current_request_id = 0 @@current_request_id = 0
# Strict mode enforces collection existence checks. When +true+, # Strict mode enforces collection existence checks. When +true+,
# asking for a collection that does not exist or trying to create a # asking for a collection that does not exist, or trying to create a
# collection that already exists raises an error. # collection that already exists, raises an error.
# #
# Strict mode is off (+false+) by default. Its value can be changed at # Strict mode is disabled by default, but enabled (+true+) at any time.
# any time.
attr_writer :strict attr_writer :strict
# Returns the value of the +strict+ flag. # Returns the value of the +strict+ flag.
@ -52,26 +49,16 @@ module Mongo
# The name of the database. # The name of the database.
attr_reader :name attr_reader :name
# The Mongo::Connection instance connecting to the MongoDB server.
attr_reader :connection attr_reader :connection
# Host to which we are currently connected.
attr_reader :host
# Port to which we are currently connected.
attr_reader :port
# An array of [host, port] pairs. # An array of [host, port] pairs.
attr_reader :nodes attr_reader :nodes
# The database's socket. For internal (and Cursor) use only. # The logger instance if :logger is passed to initialize.
attr_reader :socket
# The logger instance if :logger is passed to initialize
attr_reader :logger attr_reader :logger
def slave_ok?; @slave_ok; end # The primary key factory object (or +nil+).
# A primary key factory object (or +nil+). See the README.doc file or
# DB#new for details.
attr_reader :pk_factory attr_reader :pk_factory
def pk_factory=(pk_factory) def pk_factory=(pk_factory)
@ -91,8 +78,7 @@ module Mongo
# Options: # Options:
# #
# :strict :: If true, collections must exist to be accessed and must # :strict :: If true, collections must exist to be accessed and must
# not exist to be created. See #collection and # not exist to be created. See #collection and #create_collection.
# #create_collection.
# #
# :pk :: A primary key factory object that must respond to :create_pk, # :pk :: A primary key factory object that must respond to :create_pk,
# which should take a hash and return a hash which merges the # which should take a hash and return a hash which merges the
@ -119,72 +105,18 @@ module Mongo
# When a DB object first connects to a pair, it will find the master # When a DB object first connects to a pair, it will find the master
# instance and connect to that one. On socket error or if we recieve a # instance and connect to that one. On socket error or if we recieve a
# "not master" error, we again find the master of the pair. # "not master" error, we again find the master of the pair.
def initialize(db_name, nodes, options={}) def initialize(db_name, connection, options={})
case db_name @name = validate_db_name(db_name)
when Symbol, String @connection = connection
else
raise TypeError, "db_name must be a string or symbol"
end
[" ", ".", "$", "/", "\\"].each do |invalid_char|
if db_name.include? invalid_char
raise InvalidName, "database names cannot contain the character '#{invalid_char}'"
end
end
if db_name.empty?
raise InvalidName, "database name cannot be the empty string"
end
@connection = options[:connection]
@name, @nodes = db_name, nodes
@strict = options[:strict] @strict = options[:strict]
@pk_factory = options[:pk] @pk_factory = options[:pk]
@slave_ok = options[:slave_ok] && @nodes.length == 1 # only OK if one node
if options[:auto_reconnect]
warn(":auto_reconnect is deprecated. henceforth, any time an operation fails, " +
"the driver will attempt to reconnect master on subsequent operations.")
end
@semaphore = Mutex.new
@socket = nil
@logger = options[:logger]
@network_timeout = 20
connect_to_master
end
def connect_to_master
close if @socket
@host = @port = nil
@nodes.detect { |hp|
@host, @port = *hp
begin
@socket = TCPSocket.new(@host, @port)
@socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
# Check for master. Can't call master? because it uses mutex,
# which may already be in use during this call.
semaphore_is_locked = @semaphore.locked?
@semaphore.unlock if semaphore_is_locked
is_master = master?
@semaphore.lock if semaphore_is_locked
if @nodes.length == 1 && !is_master && !@slave_ok
raise ConfigurationError, "Trying to connect directly to slave; if this is what you want, specify :slave_ok => true."
end
is_master || @slave_ok
rescue SocketError, SystemCallError, IOError => ex
close if @socket
false
end
}
raise ConnectionFailure, "error: failed to connect to any given host:port" unless @socket
end end
# Returns true if +username+ has +password+ in # Returns true if +username+ has +password+ in
# +SYSTEM_USER_COLLECTION+. +name+ is username, +password+ is # +SYSTEM_USER_COLLECTION+. +name+ is username, +password+ is
# plaintext password. # plaintext password.
def authenticate(username, password) def authenticate(username, password)
doc = db_command(:getnonce => 1) doc = command(:getnonce => 1)
raise "error retrieving nonce: #{doc}" unless ok?(doc) raise "error retrieving nonce: #{doc}" unless ok?(doc)
nonce = doc['nonce'] nonce = doc['nonce']
@ -193,12 +125,12 @@ module Mongo
auth['user'] = username auth['user'] = username
auth['nonce'] = nonce auth['nonce'] = nonce
auth['key'] = Digest::MD5.hexdigest("#{nonce}#{username}#{hash_password(username, password)}") auth['key'] = Digest::MD5.hexdigest("#{nonce}#{username}#{hash_password(username, password)}")
ok?(db_command(auth)) ok?(command(auth))
end end
# Deauthorizes use for this database for this connection. # Deauthorizes use for this database for this connection.
def logout def logout
doc = db_command(:logout => 1) doc = command(:logout => 1)
raise "error logging out: #{doc.inspect}" unless ok?(doc) raise "error logging out: #{doc.inspect}" unless ok?(doc)
end end
@ -252,7 +184,7 @@ module Mongo
# Create new collection # Create new collection
oh = OrderedHash.new oh = OrderedHash.new
oh[:create] = name oh[:create] = name
doc = db_command(oh.merge(options || {})) doc = command(oh.merge(options || {}))
ok = doc['ok'] ok = doc['ok']
return Collection.new(self, name, @pk_factory) if ok.kind_of?(Numeric) && (ok.to_i == 1 || ok.to_i == 0) return Collection.new(self, name, @pk_factory) if ok.kind_of?(Numeric) && (ok.to_i == 1 || ok.to_i == 0)
raise "Error creating collection: #{doc.inspect}" raise "Error creating collection: #{doc.inspect}"
@ -276,20 +208,20 @@ module Mongo
def drop_collection(name) def drop_collection(name)
return true unless collection_names.include?(name) return true unless collection_names.include?(name)
ok?(db_command(:drop => name)) ok?(command(:drop => name))
end end
# Returns the error message from the most recently executed database # Returns the error message from the most recently executed database
# operation for this connection, or +nil+ if there was no error. # operation for this connection, or +nil+ if there was no error.
def error def error
doc = db_command(:getlasterror => 1) doc = command(:getlasterror => 1)
raise "error retrieving last error: #{doc}" unless ok?(doc) raise "error retrieving last error: #{doc}" unless ok?(doc)
doc['err'] doc['err']
end end
# Get status information from the last operation on this connection. # Get status information from the last operation on this connection.
def last_status def last_status
db_command(:getlasterror => 1) command(:getlasterror => 1)
end end
# Returns +true+ if an error was caused by the most recently executed # Returns +true+ if an error was caused by the most recently executed
@ -303,7 +235,7 @@ module Mongo
# Only returns errors that have occured since the last call to # Only returns errors that have occured since the last call to
# DB#reset_error_history - returns +nil+ if there is no such error. # DB#reset_error_history - returns +nil+ if there is no such error.
def previous_error def previous_error
error = db_command(:getpreverror => 1) error = command(:getpreverror => 1)
if error["err"] if error["err"]
error error
else else
@ -316,42 +248,7 @@ module Mongo
# Calls to DB#previous_error will only return errors that have occurred # Calls to DB#previous_error will only return errors that have occurred
# since the most recent call to this method. # since the most recent call to this method.
def reset_error_history def reset_error_history
db_command(:reseterror => 1) command(:reseterror => 1)
end
# Returns true if this database is a master (or is not paired with any
# other database), false if it is a slave.
def master?
doc = db_command(:ismaster => 1)
is_master = doc['ismaster']
ok?(doc) && is_master.kind_of?(Numeric) && is_master.to_i == 1
end
# Returns a string of the form "host:port" that points to the master
# database. Works even if this is the master database.
def master
doc = db_command(:ismaster => 1)
is_master = doc['ismaster']
raise "Error retrieving master database: #{doc.inspect}" unless ok?(doc) && is_master.kind_of?(Numeric)
case is_master.to_i
when 1
"#@host:#@port"
else
doc['remote']
end
end
# Close the connection to the database.
def close
if @socket
s = @socket
@socket = nil
s.close
end
end
def connected?
@socket != nil
end end
# Returns a Cursor over the query results. # Returns a Cursor over the query results.
@ -380,7 +277,7 @@ module Mongo
oh = OrderedHash.new oh = OrderedHash.new
oh[:$eval] = code oh[:$eval] = code
oh[:args] = args oh[:args] = args
doc = db_command(oh) doc = command(oh)
return doc['retval'] if ok?(doc) return doc['retval'] if ok?(doc)
raise OperationFailure, "eval failed: #{doc['errmsg']}" raise OperationFailure, "eval failed: #{doc['errmsg']}"
end end
@ -391,7 +288,7 @@ module Mongo
oh = OrderedHash.new oh = OrderedHash.new
oh[:renameCollection] = "#{@name}.#{from}" oh[:renameCollection] = "#{@name}.#{from}"
oh[:to] = "#{@name}.#{to}" oh[:to] = "#{@name}.#{to}"
doc = db_command(oh, true) doc = command(oh, true)
raise "Error renaming collection: #{doc.inspect}" unless ok?(doc) raise "Error renaming collection: #{doc.inspect}" unless ok?(doc)
end end
@ -401,7 +298,7 @@ module Mongo
oh = OrderedHash.new oh = OrderedHash.new
oh[:deleteIndexes] = collection_name oh[:deleteIndexes] = collection_name
oh[:index] = name oh[:index] = name
doc = db_command(oh) doc = command(oh)
raise "Error with drop_index command: #{doc.inspect}" unless ok?(doc) raise "Error with drop_index command: #{doc.inspect}" unless ok?(doc)
end end
@ -429,58 +326,6 @@ module Mongo
self.collection(collection_name).create_index(field_or_spec, unique) self.collection(collection_name).create_index(field_or_spec, unique)
end end
# Sends a message to MongoDB.
#
# Takes a MongoDB opcode, +operation+, a message of class ByteBuffer,
# +message+, and an optional formatted +log_message+.
# Sends the message to the databse, adding the necessary headers.
def send_message_with_operation(operation, message, log_message=nil)
message_with_headers = add_message_headers(operation, message).to_s
@logger.debug(" MONGODB #{log_message || message}") if @logger
@semaphore.synchronize do
send_message_on_socket(message_with_headers)
end
end
def send_message_with_operation_raw(operation, message, log_message=nil)
message_with_headers = add_message_headers_raw(operation, message)
@logger.debug(" MONGODB #{log_message || message}") if @logger
@semaphore.synchronize do
send_message_on_socket(message_with_headers)
end
end
# Sends a message to the database, waits for a response, and raises
# and exception if the operation has failed.
def send_message_with_safe_check(operation, message, log_message=nil)
message_with_headers = add_message_headers(operation, message)
message_with_check = last_error_message
@logger.debug(" MONGODB #{log_message || message}") if @logger
@semaphore.synchronize do
send_message_on_socket(message_with_headers.append!(message_with_check).to_s)
docs, num_received, cursor_id = receive
if num_received == 1 && error = docs[0]['err']
if docs[0]['err'] == 'not master'
raise ConnectionFailure
else
raise Mongo::OperationFailure, error
end
else
true
end
end
end
# Send a message to the database and waits for the response.
def receive_message_with_operation(operation, message, log_message=nil)
message_with_headers = add_message_headers(operation, message).to_s
@logger.debug(" MONGODB #{log_message || message}") if @logger
@semaphore.synchronize do
send_message_on_socket(message_with_headers)
receive
end
end
# Return +true+ if +doc+ contains an 'ok' field with the value 1. # Return +true+ if +doc+ contains an 'ok' field with the value 1.
def ok?(doc) def ok?(doc)
ok = doc['ok'] ok = doc['ok']
@ -490,14 +335,14 @@ module Mongo
# DB commands need to be ordered, so selector must be an OrderedHash # DB commands need to be ordered, so selector must be an OrderedHash
# (or a Hash with only one element). What DB commands really need is # (or a Hash with only one element). What DB commands really need is
# that the "command" key be first. # that the "command" key be first.
def db_command(selector, use_admin_db=false) def command(selector, use_admin_db=false, sock=nil)
if !selector.kind_of?(OrderedHash) if !selector.kind_of?(OrderedHash)
if !selector.kind_of?(Hash) || selector.keys.length > 1 if !selector.kind_of?(Hash) || selector.keys.length > 1
raise "db_command must be given an OrderedHash when there is more than one key" raise "command must be given an OrderedHash when there is more than one key"
end end
end end
cursor = Cursor.new(Collection.new(self, SYSTEM_COMMAND_COLLECTION), :admin => use_admin_db, :limit => -1, :selector => selector) cursor = Cursor.new(Collection.new(self, SYSTEM_COMMAND_COLLECTION), :admin => use_admin_db, :limit => -1, :selector => selector, :socket => sock)
cursor.next_object cursor.next_object
end end
@ -514,14 +359,14 @@ module Mongo
# #
# Note: DB commands must start with the "command" key. For this reason, # Note: DB commands must start with the "command" key. For this reason,
# any selector containing more than one key must be an OrderedHash. # any selector containing more than one key must be an OrderedHash.
def command(selector, admin=false, check_response=false) def command(selector, admin=false, check_response=false, sock=nil)
raise MongoArgumentError, "command must be given a selector" unless selector.is_a?(Hash) && !selector.empty? raise MongoArgumentError, "command must be given a selector" unless selector.is_a?(Hash) && !selector.empty?
if selector.class.eql?(Hash) && selector.keys.length > 1 if selector.class.eql?(Hash) && selector.keys.length > 1
raise MongoArgumentError, "DB#command requires an OrderedHash when hash contains multiple keys" raise MongoArgumentError, "DB#command requires an OrderedHash when hash contains multiple keys"
end end
result = Cursor.new(system_command_collection, :admin => admin, result = Cursor.new(system_command_collection, :admin => admin,
:limit => -1, :selector => selector).next_object :limit => -1, :selector => selector, :socket => sock).next_object
if check_response && !ok?(result) if check_response && !ok?(result)
raise OperationFailure, "Database command '#{selector.keys.first}' failed." raise OperationFailure, "Database command '#{selector.keys.first}' failed."
@ -530,147 +375,18 @@ module Mongo
end end
end end
# DEPRECATED: please use DB#command instead.
def db_command(*args)
warn "DB#db_command has been DEPRECATED. Please use DB#command instead."
command(args[0], args[1])
end
def full_collection_name(collection_name) def full_collection_name(collection_name)
"#{@name}.#{collection_name}" "#{@name}.#{collection_name}"
end end
private private
def receive
receive_header
number_received, cursor_id = receive_response_header
read_documents(number_received, cursor_id)
end
def receive_header
header = ByteBuffer.new
header.put_array(receive_data_on_socket(16).unpack("C*"))
unless header.size == STANDARD_HEADER_SIZE
raise "Short read for DB response header: " +
"expected #{STANDARD_HEADER_SIZE} bytes, saw #{header.size}"
end
header.rewind
size = header.get_int
request_id = header.get_int
response_to = header.get_int
op = header.get_int
end
def receive_response_header
header_buf = ByteBuffer.new
header_buf.put_array(receive_data_on_socket(RESPONSE_HEADER_SIZE).unpack("C*"))
if header_buf.length != RESPONSE_HEADER_SIZE
raise "Short read for DB response header; " +
"expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}"
end
header_buf.rewind
result_flags = header_buf.get_int
cursor_id = header_buf.get_long
starting_from = header_buf.get_int
number_remaining = header_buf.get_int
[number_remaining, cursor_id]
end
def read_documents(number_received, cursor_id)
docs = []
number_remaining = number_received
while number_remaining > 0 do
buf = ByteBuffer.new
buf.put_array(receive_data_on_socket(4).unpack("C*"))
buf.rewind
size = buf.get_int
buf.put_array(receive_data_on_socket(size - 4).unpack("C*"), 4)
number_remaining -= 1
buf.rewind
docs << BSON.new.deserialize(buf)
end
[docs, number_received, cursor_id]
end
# Sending a message on socket.
def send_message_on_socket(packed_message)
connect_to_master if !connected?
begin
@socket.print(packed_message)
@socket.flush
rescue => ex
close
raise ConnectionFailure, "Operation failed with the following exception: #{ex}."
end
end
# Receive data of specified length on socket.
def receive_data_on_socket(length)
connect_to_master if !connected?
message = ""
chunk = ""
while message.length < length do
begin
chunk = @socket.read(length - message.length)
raise ConnectionFailure, "connection closed" unless chunk && chunk.length > 0
message += chunk
rescue => ex
raise ConnectionFailure, "Operation failed with the following exception: #{ex}"
end
end
message
end
# Prepares a message for transmission to MongoDB by
# constructing a valid message header.
def add_message_headers(operation, message)
headers = ByteBuffer.new
# Message size.
headers.put_int(16 + message.size)
# Unique request id.
headers.put_int(get_request_id)
# Response id.
headers.put_int(0)
# Opcode.
headers.put_int(operation)
message.prepend!(headers)
end
# Increments and then returns the next available request id.
# Note: this method should be called from within a lock.
def get_request_id
@@current_request_id += 1
@@current_request_id
end
# Creates a getlasterror message.
def last_error_message
generate_last_error_message
end
def generate_last_error_message
message = ByteBuffer.new
message.put_int(0)
BSON.serialize_cstr(message, "#{@name}.$cmd")
message.put_int(0)
message.put_int(-1)
message.put_array(BSON_SERIALIZER.serialize({:getlasterror => 1}, false).unpack("C*"))
add_message_headers(Mongo::Constants::OP_QUERY, message)
end
def reset_error_message
@@reset_error_message ||= generate_reset_error_message
end
def generate_reset_error_message
message = ByteBuffer.new
message.put_int(0)
BSON.serialize_cstr(message, "#{@name}.$cmd")
message.put_int(0)
message.put_int(-1)
message.put_array(BSON_SERIALIZER.serialize({:reseterror => 1}, false).unpack("C*"))
add_message_headers(Mongo::Constants::OP_QUERY, message)
end
def hash_password(username, plaintext) def hash_password(username, plaintext)
Digest::MD5.hexdigest("#{username}:mongo:#{plaintext}") Digest::MD5.hexdigest("#{username}:mongo:#{plaintext}")
end end
@ -678,5 +394,19 @@ module Mongo
def system_command_collection def system_command_collection
Collection.new(self, SYSTEM_COMMAND_COLLECTION) Collection.new(self, SYSTEM_COMMAND_COLLECTION)
end end
def validate_db_name(db_name)
unless [String, Symbol].include?(db_name.class)
raise TypeError, "db_name must be a string or symbol"
end
[" ", ".", "$", "/", "\\"].each do |invalid_char|
if db_name.include? invalid_char
raise InvalidName, "database names cannot contain the character '#{invalid_char}'"
end
end
raise InvalidName, "database name cannot be the empty string" if db_name.empty?
db_name
end
end end
end end

View File

@ -27,6 +27,12 @@ module Mongo
# Raised when invalid arguments are sent to Mongo Ruby methods. # Raised when invalid arguments are sent to Mongo Ruby methods.
class MongoArgumentError < MongoRubyError; end class MongoArgumentError < MongoRubyError; end
# Raised on failures in connection to the database server.
class ConnectionError < MongoRubyError; end
# Raised on failures in connection to the database server.
class ConnectionTimeoutError < MongoRubyError; end
# Raised when a database operation fails. # Raised when a database operation fails.
class OperationFailure < MongoDBError; end class OperationFailure < MongoDBError; end

View File

@ -442,7 +442,7 @@ module GridFS
md5_command = OrderedHash.new md5_command = OrderedHash.new
md5_command['filemd5'] = @files_id md5_command['filemd5'] = @files_id
md5_command['root'] = @root md5_command['root'] = @root
h['md5'] = @db.db_command(md5_command)['md5'] h['md5'] = @db.command(md5_command)['md5']
h h
end end

View File

@ -144,7 +144,7 @@ class TestCollection < Test::Unit::TestCase
# Can't duplicate an index. # Can't duplicate an index.
assert_raise OperationFailure do assert_raise OperationFailure do
@@test.update({}, {"x" => 10}, :safe => true, :upsert => true) @@test.update({}, {"x" => 10}, :safe => true)
end end
end end
end end
@ -154,7 +154,6 @@ class TestCollection < Test::Unit::TestCase
@@test.save("hello" => "world") @@test.save("hello" => "world")
@@test.save("hello" => "world") @@test.save("hello" => "world")
assert(@@db.error.include?("E11000"))
assert_raise OperationFailure do assert_raise OperationFailure do
@@test.save({"hello" => "world"}, :safe => true) @@test.save({"hello" => "world"}, :safe => true)

View File

@ -79,8 +79,7 @@ class TestConnection < Test::Unit::TestCase
logger = Logger.new(output) logger = Logger.new(output)
logger.level = Logger::DEBUG logger.level = Logger::DEBUG
db = Connection.new(@host, @port, :logger => logger).db('ruby-mongo-test') db = Connection.new(@host, @port, :logger => logger).db('ruby-mongo-test')
assert output.string.include?("admin.$cmd.find")
assert output.string.include?("$cmd.find")
end end
def test_connection_logger def test_connection_logger
@ -106,23 +105,23 @@ class TestConnection < Test::Unit::TestCase
assert !@mongo.database_names.include?('ruby-mongo-will-be-deleted') assert !@mongo.database_names.include?('ruby-mongo-will-be-deleted')
end end
def test_pair def test_nodes
db = Connection.new({:left => ['foo', 123]}) db = Connection.new({:left => ['foo', 123]}, nil, :connect => false)
pair = db.instance_variable_get('@pair') nodes = db.nodes
assert_equal 2, pair.length assert_equal 2, db.nodes.length
assert_equal ['foo', 123], pair[0] assert_equal ['foo', 123], nodes[0]
assert_equal ['localhost', Connection::DEFAULT_PORT], pair[1] assert_equal ['localhost', Connection::DEFAULT_PORT], nodes[1]
db = Connection.new({:right => 'bar'}) db = Connection.new({:right => 'bar'}, nil, :connect => false)
pair = db.instance_variable_get('@pair') nodes = db.nodes
assert_equal 2, pair.length assert_equal 2, nodes.length
assert_equal ['localhost', Connection::DEFAULT_PORT], pair[0] assert_equal ['localhost', Connection::DEFAULT_PORT], nodes[0]
assert_equal ['bar', Connection::DEFAULT_PORT], pair[1] assert_equal ['bar', Connection::DEFAULT_PORT], nodes[1]
db = Connection.new({:right => ['foo', 123], :left => 'bar'}) db = Connection.new({:right => ['foo', 123], :left => 'bar'}, nil, :connect => false)
pair = db.instance_variable_get('@pair') nodes = db.nodes
assert_equal 2, pair.length assert_equal 2, nodes.length
assert_equal ['bar', Connection::DEFAULT_PORT], pair[0] assert_equal ['bar', Connection::DEFAULT_PORT], nodes[0]
assert_equal ['foo', 123], pair[1] assert_equal ['foo', 123], nodes[1]
end end
end end

View File

@ -246,26 +246,26 @@ class CursorTest < Test::Unit::TestCase
def test_kill_cursors def test_kill_cursors
@@coll.drop @@coll.drop
client_cursors = @@db.db_command("cursorInfo" => 1)["clientCursors_size"] client_cursors = @@db.command("cursorInfo" => 1)["clientCursors_size"]
by_location = @@db.db_command("cursorInfo" => 1)["byLocation_size"] by_location = @@db.command("cursorInfo" => 1)["byLocation_size"]
10000.times do |i| 10000.times do |i|
@@coll.insert("i" => i) @@coll.insert("i" => i)
end end
assert_equal(client_cursors, assert_equal(client_cursors,
@@db.db_command("cursorInfo" => 1)["clientCursors_size"]) @@db.command("cursorInfo" => 1)["clientCursors_size"])
assert_equal(by_location, assert_equal(by_location,
@@db.db_command("cursorInfo" => 1)["byLocation_size"]) @@db.command("cursorInfo" => 1)["byLocation_size"])
10.times do |i| 10.times do |i|
@@coll.find_one() @@coll.find_one()
end end
assert_equal(client_cursors, assert_equal(client_cursors,
@@db.db_command("cursorInfo" => 1)["clientCursors_size"]) @@db.command("cursorInfo" => 1)["clientCursors_size"])
assert_equal(by_location, assert_equal(by_location,
@@db.db_command("cursorInfo" => 1)["byLocation_size"]) @@db.command("cursorInfo" => 1)["byLocation_size"])
10.times do |i| 10.times do |i|
a = @@coll.find() a = @@coll.find()
@ -274,49 +274,49 @@ class CursorTest < Test::Unit::TestCase
end end
assert_equal(client_cursors, assert_equal(client_cursors,
@@db.db_command("cursorInfo" => 1)["clientCursors_size"]) @@db.command("cursorInfo" => 1)["clientCursors_size"])
assert_equal(by_location, assert_equal(by_location,
@@db.db_command("cursorInfo" => 1)["byLocation_size"]) @@db.command("cursorInfo" => 1)["byLocation_size"])
a = @@coll.find() a = @@coll.find()
a.next_object() a.next_object()
assert_not_equal(client_cursors, assert_not_equal(client_cursors,
@@db.db_command("cursorInfo" => 1)["clientCursors_size"]) @@db.command("cursorInfo" => 1)["clientCursors_size"])
assert_not_equal(by_location, assert_not_equal(by_location,
@@db.db_command("cursorInfo" => 1)["byLocation_size"]) @@db.command("cursorInfo" => 1)["byLocation_size"])
a.close() a.close()
assert_equal(client_cursors, assert_equal(client_cursors,
@@db.db_command("cursorInfo" => 1)["clientCursors_size"]) @@db.command("cursorInfo" => 1)["clientCursors_size"])
assert_equal(by_location, assert_equal(by_location,
@@db.db_command("cursorInfo" => 1)["byLocation_size"]) @@db.command("cursorInfo" => 1)["byLocation_size"])
a = @@coll.find({}, :limit => 10).next_object() a = @@coll.find({}, :limit => 10).next_object()
assert_equal(client_cursors, assert_equal(client_cursors,
@@db.db_command("cursorInfo" => 1)["clientCursors_size"]) @@db.command("cursorInfo" => 1)["clientCursors_size"])
assert_equal(by_location, assert_equal(by_location,
@@db.db_command("cursorInfo" => 1)["byLocation_size"]) @@db.command("cursorInfo" => 1)["byLocation_size"])
@@coll.find() do |cursor| @@coll.find() do |cursor|
cursor.next_object() cursor.next_object()
end end
assert_equal(client_cursors, assert_equal(client_cursors,
@@db.db_command("cursorInfo" => 1)["clientCursors_size"]) @@db.command("cursorInfo" => 1)["clientCursors_size"])
assert_equal(by_location, assert_equal(by_location,
@@db.db_command("cursorInfo" => 1)["byLocation_size"]) @@db.command("cursorInfo" => 1)["byLocation_size"])
@@coll.find() { |cursor| @@coll.find() { |cursor|
cursor.next_object() cursor.next_object()
} }
assert_equal(client_cursors, assert_equal(client_cursors,
@@db.db_command("cursorInfo" => 1)["clientCursors_size"]) @@db.command("cursorInfo" => 1)["clientCursors_size"])
assert_equal(by_location, assert_equal(by_location,
@@db.db_command("cursorInfo" => 1)["byLocation_size"]) @@db.command("cursorInfo" => 1)["byLocation_size"])
end end
def test_count_with_fields def test_count_with_fields

View File

@ -19,7 +19,8 @@ class DBTest < Test::Unit::TestCase
@@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost' @@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
@@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT @@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT
@@db = Connection.new(@@host, @@port).db('ruby-mongo-test') @@conn = Connection.new(@@host, @@port)
@@db = @@conn.db('ruby-mongo-test')
@@users = @@db.collection('system.users') @@users = @@db.collection('system.users')
def setup def setup
@ -35,8 +36,8 @@ class DBTest < Test::Unit::TestCase
end end
def test_close def test_close
@@db.close @@conn.close
assert !@@db.connected? assert !@@conn.connected?
begin begin
@@db.collection('test').insert('a' => 1) @@db.collection('test').insert('a' => 1)
fail "expected 'NilClass' exception" fail "expected 'NilClass' exception"
@ -52,10 +53,10 @@ class DBTest < Test::Unit::TestCase
output = StringIO.new output = StringIO.new
logger = Logger.new(output) logger = Logger.new(output)
logger.level = Logger::DEBUG logger.level = Logger::DEBUG
db = Connection.new(@host, @port, :logger => logger).db('ruby-mongo-test') conn = Connection.new(@host, @port, :logger => logger)
assert_equal logger, db.logger assert_equal logger, conn.logger
db.logger.debug 'testing' conn.logger.debug 'testing'
assert output.string.include?('testing') assert output.string.include?('testing')
end end
@ -89,12 +90,16 @@ class DBTest < Test::Unit::TestCase
end end
def test_pair def test_pair
@@db.close @@conn.close
@@users = nil @@users = nil
@@db = Connection.new({:left => "this-should-fail", :right => [@@host, @@port]}).db('ruby-mongo-test') @@conn = Connection.new({:left => "this-should-fail", :right => [@@host, @@port]})
assert @@db.connected? @@db = @@conn['ruby-mongo-test']
assert @@conn.connected?
ensure ensure
@@db = Connection.new(@@host, @@port).db('ruby-mongo-test') unless @@db.connected? unless @@conn.connected?
@@conn = Connection.new(@@host, @@port)
@@db = @@conn.db('ruby-mongo-test')
end
@@users = @@db.collection('system.users') @@users = @@db.collection('system.users')
end end
@ -122,7 +127,8 @@ class DBTest < Test::Unit::TestCase
end end
def test_pk_factory_reset def test_pk_factory_reset
db = Connection.new(@@host, @@port).db('ruby-mongo-test') conn = Connection.new(@@host, @@port)
db = conn.db('ruby-mongo-test')
db.pk_factory = Object.new # first time db.pk_factory = Object.new # first time
begin begin
db.pk_factory = Object.new db.pk_factory = Object.new
@ -130,7 +136,7 @@ class DBTest < Test::Unit::TestCase
rescue => ex rescue => ex
assert_match /can not change PK factory/, ex.to_s assert_match /can not change PK factory/, ex.to_s
ensure ensure
db.close conn.close
end end
end end
@ -150,12 +156,12 @@ class DBTest < Test::Unit::TestCase
assert !@@db.error? assert !@@db.error?
assert_nil @@db.previous_error assert_nil @@db.previous_error
@@db.send(:db_command, :forceerror => 1) @@db.send(:command, :forceerror => 1)
assert @@db.error? assert @@db.error?
assert_not_nil @@db.error assert_not_nil @@db.error
assert_not_nil @@db.previous_error assert_not_nil @@db.previous_error
@@db.send(:db_command, :forceerror => 1) @@db.send(:command, :forceerror => 1)
assert @@db.error? assert @@db.error?
assert @@db.error assert @@db.error
prev_error = @@db.previous_error prev_error = @@db.previous_error
@ -192,10 +198,10 @@ class DBTest < Test::Unit::TestCase
assert !@@db.last_status()["updatedExisting"] assert !@@db.last_status()["updatedExisting"]
end end
def test_text_port_number def test_text_port_number_raises_no_errors
db = DB.new('ruby-mongo-test', [[@@host, @@port.to_s]]) conn = Connection.new(@@host, @@port.to_s)
# If there is no error, all is well db = conn['ruby-mongo-test']
db.collection('users').remove assert db.collection('users').remove
end end
end end

View File

@ -6,11 +6,11 @@ require 'test/unit'
class DBAPITest < Test::Unit::TestCase class DBAPITest < Test::Unit::TestCase
include Mongo include Mongo
@@connection = Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost', @@conn = Connection.new(ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost',
ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT) ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT)
@@db = @@connection.db("ruby-mongo-test") @@db = @@conn.db("ruby-mongo-test")
@@coll = @@db.collection('test') @@coll = @@db.collection('test')
@@version = @@connection.server_version @@version = @@conn.server_version
def setup def setup
@@coll.remove @@coll.remove
@ -489,11 +489,11 @@ class DBAPITest < Test::Unit::TestCase
end end
def test_ismaster def test_ismaster
assert @@db.master? assert @@conn.master?
end end
def test_master def test_master
assert_equal "#{@@db.host}:#{@@db.port}", @@db.master assert_equal "#{@@conn.host}:#{@@conn.port}", @@conn.master
end end
def test_where def test_where

View File

@ -9,8 +9,8 @@ class SlaveConnectionTest < Test::Unit::TestCase
def self.connect_to_slave def self.connect_to_slave
@@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost' @@host = ENV['MONGO_RUBY_DRIVER_HOST'] || 'localhost'
@@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT @@port = ENV['MONGO_RUBY_DRIVER_PORT'] || Connection::DEFAULT_PORT
db = Connection.new(@@host, @@port, :slave_ok => true).db('ruby-mongo-demo') conn = Connection.new(@@host, @@port, :slave_ok => true)
!db.master? !conn.master?
end end
if self.connect_to_slave if self.connect_to_slave
@ -30,8 +30,8 @@ class SlaveConnectionTest < Test::Unit::TestCase
puts "Not connected to slave; skipping slave connection tests." puts "Not connected to slave; skipping slave connection tests."
def test_slave_ok_false_on_queries def test_slave_ok_false_on_queries
@db = Connection.new(@@host, @@port).db('ruby-mongo-demo') @conn = Connection.new(@@host, @@port)
assert !@db.slave_ok? assert !@conn.slave_ok?
end end
end end
end end

View File

@ -4,7 +4,7 @@ class TestThreading < Test::Unit::TestCase
include Mongo include Mongo
@@db = Connection.new.db('ruby-mongo-test') @@db = Connection.new('localhost', 27017, :pool_size => 150, :timeout => 1).db('ruby-mongo-test')
@@coll = @@db.collection('thread-test-collection') @@coll = @@db.collection('thread-test-collection')
def set_up_safe_data def set_up_safe_data

View File

@ -1,12 +1,6 @@
require 'test/test_helper' require 'test/test_helper'
class CollectionTest < Test::Unit::TestCase class ConnectionTest < Test::Unit::TestCase
class MockDB < DB
def connect_to_master
true
end
end
context "Basic operations: " do context "Basic operations: " do
setup do setup do
@ -14,36 +8,40 @@ class CollectionTest < Test::Unit::TestCase
end end
should "send update message" do should "send update message" do
@db = MockDB.new("testing", ['localhost', 27017], :logger => @logger) @conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false)
@db = @conn['testing']
@coll = @db.collection('books') @coll = @db.collection('books')
@db.expects(:send_message_with_operation).with do |op, msg, log| @conn.expects(:send_message_with_operation).with do |op, msg, log|
op == 2001 && log.include?("db.books.update") op == 2001 && log.include?("db.books.update")
end end
@coll.update({}, {:title => 'Moby Dick'}) @coll.update({}, {:title => 'Moby Dick'})
end end
should "send insert message" do should "send insert message" do
@db = MockDB.new("testing", ['localhost', 27017], :logger => @logger) @conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false)
@db = @conn['testing']
@coll = @db.collection('books') @coll = @db.collection('books')
@db.expects(:send_message_with_operation).with do |op, msg, log| @conn.expects(:send_message_with_operation).with do |op, msg, log|
op == 2002 && log.include?("db.books.insert") op == 2002 && log.include?("db.books.insert")
end end
@coll.insert({:title => 'Moby Dick'}) @coll.insert({:title => 'Moby Dick'})
end end
should "send safe update message" do should "send safe update message" do
@db = MockDB.new("testing", ['localhost', 27017], :logger => @logger) @conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false)
@db = @conn['testing']
@coll = @db.collection('books') @coll = @db.collection('books')
@db.expects(:send_message_with_safe_check).with do |op, msg, log| @conn.expects(:send_message_with_safe_check).with do |op, msg, db_name, log|
op == 2001 && log.include?("db.books.update") op == 2001 && log.include?("db.books.update")
end end
@coll.update({}, {:title => 'Moby Dick'}, :safe => true) @coll.update({}, {:title => 'Moby Dick'}, :safe => true)
end end
should "send safe insert message" do should "send safe insert message" do
@db = MockDB.new("testing", ['localhost', 27017], :logger => @logger) @conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false)
@db = @conn['testing']
@coll = @db.collection('books') @coll = @db.collection('books')
@db.expects(:send_message_with_safe_check).with do |op, msg, log| @conn.expects(:send_message_with_safe_check).with do |op, msg, db_name, log|
op == 2001 && log.include?("db.books.update") op == 2001 && log.include?("db.books.update")
end end
@coll.update({}, {:title => 'Moby Dick'}, :safe => true) @coll.update({}, {:title => 'Moby Dick'}, :safe => true)

View File

@ -0,0 +1,133 @@
require 'test/test_helper'
class ConnectionTest < Test::Unit::TestCase
def new_mock_socket
socket = Object.new
socket.stubs(:setsockopt).with(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
socket
end
def new_mock_db
db = Object.new
end
context "Initialization: " do
context "given a single node" do
setup do
@conn = Connection.new('localhost', 27107, :connect => false)
admin_db = new_mock_db
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1})
@conn.expects(:[]).with('admin').returns(admin_db)
@conn.connect_to_master
end
should "set localhost and port to master" do
assert_equal 'localhost', @conn.host
assert_equal 27017, @conn.port
end
should "set connection pool to 1" do
assert_equal 1, @conn.size
end
should "default slave_ok to false" do
assert !@conn.slave_ok?
end
should "default auto_reconnect to false" do
assert !@conn.auto_reconnect?
end
end
end
context "Connection pooling: " do
setup do
@conn = Connection.new('localhost', 27107, :connect => false,
:pool_size => 3)
admin_db = new_mock_db
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1})
@conn.expects(:[]).with('admin').returns(admin_db)
@conn.connect_to_master
end
should "check out a new connection" do
socket = @conn.checkout
assert @conn.reserved_connections.keys.include? Thread.current.object_id
end
context "with multiple threads" do
setup do
@thread1 = Object.new
@thread2 = Object.new
@thread3 = Object.new
@thread4 = Object.new
Thread.stubs(:current).returns(@thread1)
@socket1 = @conn.checkout
Thread.stubs(:current).returns(@thread2)
@socket2 = @conn.checkout
Thread.stubs(:current).returns(@thread3)
@socket3 = @conn.checkout
end
should "add each thread to the reserved pool" do
assert @conn.reserved_connections.keys.include?(@thread1.object_id)
assert @conn.reserved_connections.keys.include?(@thread2.object_id)
assert @conn.reserved_connections.keys.include?(@thread3.object_id)
end
should "only add one socket per thread" do
@conn.reserved_connections.values do |socket|
assert [@socket1, @socket2, @socket3].include?(socket)
end
end
should "check out all sockets" do
assert_equal @conn.sockets.size, @conn.checked_out.size
@conn.sockets.each do |sock|
assert @conn.checked_out.include?(sock)
end
end
should "raise an error if no more sockets can be checked out" do
# This can't be tested with mock threads.
# Will test in integration tests.
end
context "when releasing dead threads" do
setup do
@thread1.expects(:alive?).returns(false)
@thread2.expects(:alive?).returns(true)
@thread3.expects(:alive?).returns(true)
Thread.expects(:list).returns([@thread1, @thread2, @thread3])
@conn.clear_stale_cached_connections!
end
should "return connections for dead threads" do
assert !@conn.checked_out.include?(@socket1)
assert_nil @conn.reserved_connections[@thread1.object_id]
end
should "maintain connection for live threads" do
assert @conn.checked_out.include?(@socket2)
assert @conn.checked_out.include?(@socket3)
end
end
context "when checking in a socket" do
setup do
@conn.checkin(@socket3)
end
should "reduce the number checked out by one" do
assert_equal @conn.checked_out.size, (@conn.sockets.size - 1)
end
end
end
end
end

View File

@ -4,7 +4,8 @@ class TestCursor < Test::Unit::TestCase
context "Cursor options" do context "Cursor options" do
setup do setup do
@db = stub(:name => "testing", :slave_ok? => false) @connection = stub(:class => Connection)
@db = stub(:name => "testing", :slave_ok? => false, :connection => @connection)
@collection = stub(:db => @db, :name => "items") @collection = stub(:db => @db, :name => "items")
@cursor = Cursor.new(@collection) @cursor = Cursor.new(@collection)
end end
@ -64,39 +65,10 @@ class TestCursor < Test::Unit::TestCase
end end
end end
context "Query options" do
should "test timeout true and slave_ok false" do
@db = stub(:slave_ok? => false, :name => "testing")
@collection = stub(:db => @db, :name => "items")
@cursor = Cursor.new(@collection, :timeout => true)
assert_equal 0, @cursor.query_opts
end
should "test timeout false and slave_ok false" do
@db = stub(:slave_ok? => false, :name => "testing")
@collection = stub(:db => @db, :name => "items")
@cursor = Cursor.new(@collection, :timeout => false)
assert_equal 16, @cursor.query_opts
end
should "set timeout true and slave_ok true" do
@db = stub(:slave_ok? => true, :name => "testing")
@collection = stub(:db => @db, :name => "items")
@cursor = Cursor.new(@collection, :timeout => true)
assert_equal 4, @cursor.query_opts
end
should "set timeout false and slave_ok true" do
@db = stub(:slave_ok? => true, :name => "testing")
@collection = stub(:db => @db, :name => "items")
@cursor = Cursor.new(@collection, :timeout => false)
assert_equal 20, @cursor.query_opts
end
end
context "Query fields" do context "Query fields" do
setup do setup do
@db = stub(:slave_ok? => true, :name => "testing") @connection = stub(:class => Collection)
@db = stub(:slave_ok? => true, :name => "testing", :connection => @connection)
@collection = stub(:db => @db, :name => "items") @collection = stub(:db => @db, :name => "items")
end end

View File

@ -2,16 +2,6 @@ require 'test/test_helper'
class DBTest < Test::Unit::TestCase class DBTest < Test::Unit::TestCase
class MockDB < DB
attr_accessor :socket
def connect_to_master
true
end
public :add_message_headers
end
def insert_message(db, documents) def insert_message(db, documents)
documents = [documents] unless documents.is_a?(Array) documents = [documents] unless documents.is_a?(Array)
message = ByteBuffer.new message = ByteBuffer.new
@ -23,7 +13,8 @@ class DBTest < Test::Unit::TestCase
context "DB commands" do context "DB commands" do
setup do setup do
@db = MockDB.new("testing", ['localhost', 27017]) @conn = stub()
@db = DB.new("testing", @conn)
@collection = mock() @collection = mock()
@db.stubs(:system_command_collection).returns(@collection) @db.stubs(:system_command_collection).returns(@collection)
end end
@ -43,7 +34,7 @@ class DBTest < Test::Unit::TestCase
should "create the proper cursor" do should "create the proper cursor" do
@cursor = mock(:next_object => {"ok" => 1}) @cursor = mock(:next_object => {"ok" => 1})
Cursor.expects(:new).with(@collection, :admin => true, Cursor.expects(:new).with(@collection, :admin => true,
:limit => -1, :selector => {:buildinfo => 1}).returns(@cursor) :limit => -1, :selector => {:buildinfo => 1}, :socket => nil).returns(@cursor)
command = {:buildinfo => 1} command = {:buildinfo => 1}
@db.command(command, true) @db.command(command, true)
end end
@ -51,32 +42,13 @@ class DBTest < Test::Unit::TestCase
should "raise an error when the command fails" do should "raise an error when the command fails" do
@cursor = mock(:next_object => {"ok" => 0}) @cursor = mock(:next_object => {"ok" => 0})
Cursor.expects(:new).with(@collection, :admin => true, Cursor.expects(:new).with(@collection, :admin => true,
:limit => -1, :selector => {:buildinfo => 1}).returns(@cursor) :limit => -1, :selector => {:buildinfo => 1}, :socket => nil).returns(@cursor)
assert_raise OperationFailure do assert_raise OperationFailure do
command = {:buildinfo => 1} command = {:buildinfo => 1}
@db.command(command, true, true) @db.command(command, true, true)
end end
end end
end end
context "safe messages" do
setup do
@db = MockDB.new("testing", ['localhost', 27017])
@collection = mock()
@db.stubs(:system_command_collection).returns(@collection)
end
should "receive getlasterror message" do
@socket = mock()
@socket.stubs(:close)
@socket.expects(:flush)
@socket.expects(:print).with { |message| message.include?('getlasterror') }
@db.socket = @socket
@db.stubs(:receive)
message = insert_message(@db, {:a => 1})
@db.send_message_with_safe_check(Mongo::Constants::OP_QUERY, message)
end
end
end end