2008-12-17 16:49:06 +00:00
|
|
|
# --
|
2009-01-06 15:51:01 +00:00
|
|
|
# Copyright (C) 2008-2009 10gen Inc.
|
2008-11-22 01:00:51 +00:00
|
|
|
#
|
2009-02-15 13:24:14 +00:00
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
2008-11-22 01:00:51 +00:00
|
|
|
#
|
2009-02-15 13:24:14 +00:00
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
2008-11-22 01:00:51 +00:00
|
|
|
#
|
2009-02-15 13:24:14 +00:00
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
2008-12-17 16:49:06 +00:00
|
|
|
# ++
|
2008-11-22 01:00:51 +00:00
|
|
|
|
2009-11-23 20:20:05 +00:00
|
|
|
require 'set'
|
|
|
|
require 'socket'
|
2009-12-28 18:05:45 +00:00
|
|
|
require 'thread'
|
2009-11-23 20:20:05 +00:00
|
|
|
|
2009-08-20 14:50:48 +00:00
|
|
|
module Mongo
|
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Instantiates and manages connections to MongoDB.
|
2009-08-20 22:48:09 +00:00
|
|
|
class Connection
|
2009-08-20 14:50:48 +00:00
|
|
|
|
2010-01-05 22:42:52 +00:00
|
|
|
# Abort connections if a ConnectionError is raised.
|
2009-11-24 20:20:51 +00:00
|
|
|
Thread.abort_on_exception = true
|
|
|
|
|
2009-08-20 14:50:48 +00:00
|
|
|
DEFAULT_PORT = 27017
|
2009-11-23 20:20:05 +00:00
|
|
|
STANDARD_HEADER_SIZE = 16
|
|
|
|
RESPONSE_HEADER_SIZE = 20
|
|
|
|
|
2009-12-18 22:29:44 +00:00
|
|
|
attr_reader :logger, :size, :host, :port, :nodes, :sockets, :checked_out
|
2009-11-23 20:20:05 +00:00
|
|
|
|
|
|
|
# Counter for generating unique request ids.
|
|
|
|
@@current_request_id = 0
|
2009-08-20 14:50:48 +00:00
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Create a connection to MongoDB. Specify either one or a pair of servers,
|
2009-11-24 19:43:52 +00:00
|
|
|
# along with a maximum connection pool size and timeout.
|
2009-08-20 14:50:48 +00:00
|
|
|
#
|
2009-11-23 21:03:33 +00:00
|
|
|
# If connecting to just one server, you may specify whether connection to slave is permitted.
|
2010-01-07 17:37:53 +00:00
|
|
|
# In all cases, the default host is "localhost" and the default port is 27017.
|
2009-08-20 14:50:48 +00:00
|
|
|
#
|
2010-01-08 20:43:13 +00:00
|
|
|
# When specifying a pair, +pair_or_host+, is a hash with two keys: :left and :right. Each key maps to either
|
2009-11-23 21:03:33 +00:00
|
|
|
# * a server name, in which case port is 27017,
|
|
|
|
# * a port number, in which case the server is "localhost", or
|
|
|
|
# * an array containing [server_name, port_number]
|
2009-08-20 14:50:48 +00:00
|
|
|
#
|
2010-01-08 20:43:13 +00:00
|
|
|
# Note that there are a few issues when using connection pooling with Ruby 1.9 on Windows. These
|
|
|
|
# should be resolved in the next release.
|
|
|
|
#
|
2010-01-07 17:37:53 +00:00
|
|
|
# @param [String, Hash] pair_or_host See explanation above.
|
|
|
|
# @param [Integer] port specify a port number here if only one host is being specified. Leave nil if
|
|
|
|
# specifying a pair of servers in +pair_or_host+.
|
2009-08-20 14:50:48 +00:00
|
|
|
#
|
2010-01-07 17:37:53 +00:00
|
|
|
# @option options [Boolean] :slave_ok (false) Must be set to +true+ when connecting
|
|
|
|
# to a single, slave node.
|
|
|
|
# @option options [Logger, #debug] :logger (nil) Logger instance to receive driver operation log.
|
|
|
|
# @option options [Boolean] :auto_reconnect DEPRECATED. See http://www.mongodb.org/display/DOCS/Replica+Pairs+in+Ruby
|
|
|
|
# @option options [Integer] :pool_size (1) The maximum number of socket connections that can be opened to the database.
|
|
|
|
# @option options [Float] :timeout (5.0) When all of the connections to the pool are checked out,
|
|
|
|
# this is the number of seconds to wait for a new connection to be released before throwing an exception.
|
2009-11-23 21:03:33 +00:00
|
|
|
#
|
2009-12-29 22:32:10 +00:00
|
|
|
#
|
2010-01-07 17:37:53 +00:00
|
|
|
# @example localhost, 27017
|
|
|
|
# Connection.new
|
2009-12-16 19:03:15 +00:00
|
|
|
#
|
2010-01-07 17:37:53 +00:00
|
|
|
# @example localhost, 27017
|
|
|
|
# Connection.new("localhost")
|
2009-08-20 14:50:48 +00:00
|
|
|
#
|
2010-01-07 17:37:53 +00:00
|
|
|
# @example localhost, 3000, max 5 connections, with max 5 seconds of wait time.
|
|
|
|
# Connection.new("localhost", 3000, :pool_size => 5, :timeout => 5)
|
2009-08-20 14:50:48 +00:00
|
|
|
#
|
2010-01-07 17:37:53 +00:00
|
|
|
# @example localhost, 3000, where this node may be a slave
|
|
|
|
# Connection.new("localhost", 3000, :slave_ok => true)
|
2009-08-20 14:50:48 +00:00
|
|
|
#
|
2010-01-07 17:37:53 +00:00
|
|
|
# @example A pair of servers. The driver will always talk to master.
|
2010-01-08 20:43:13 +00:00
|
|
|
# # On connection errors, Mongo::ConnectionFailure will be raised.
|
2010-01-07 17:37:53 +00:00
|
|
|
# Connection.new({:left => ["db1.example.com", 27017],
|
2009-11-23 21:03:33 +00:00
|
|
|
# :right => ["db2.example.com", 27017]})
|
2009-11-24 19:43:52 +00:00
|
|
|
#
|
2010-01-07 17:37:53 +00:00
|
|
|
# @example A pair of servers with connection pooling enabled. Note the nil param placeholder for port.
|
|
|
|
# Connection.new({:left => ["db1.example.com", 27017],
|
|
|
|
# :right => ["db2.example.com", 27017]}, nil,
|
|
|
|
# :pool_size => 20, :timeout => 5)
|
2010-01-08 20:43:13 +00:00
|
|
|
#
|
|
|
|
# @see http://www.mongodb.org/display/DOCS/Replica+Pairs+in+Ruby Replica pairs in Ruby
|
2010-02-08 17:12:18 +00:00
|
|
|
#
|
|
|
|
# @core connections constructor_details
|
2009-08-20 14:50:48 +00:00
|
|
|
def initialize(pair_or_host=nil, port=nil, options={})
|
2009-12-02 21:24:36 +00:00
|
|
|
@nodes = format_pair(pair_or_host, port)
|
2009-11-23 20:20:05 +00:00
|
|
|
|
|
|
|
# Host and port of current master.
|
|
|
|
@host = @port = nil
|
2009-12-16 19:03:15 +00:00
|
|
|
|
2009-11-23 20:20:05 +00:00
|
|
|
# Lock for request ids.
|
|
|
|
@id_lock = Mutex.new
|
|
|
|
|
|
|
|
# Pool size and timeout.
|
|
|
|
@size = options[:pool_size] || 1
|
2009-12-21 14:06:28 +00:00
|
|
|
@timeout = options[:timeout] || 5.0
|
|
|
|
|
2009-11-23 20:20:05 +00:00
|
|
|
# Mutex for synchronizing pool access
|
2009-12-28 18:05:45 +00:00
|
|
|
@connection_mutex = Mutex.new
|
2009-12-29 18:27:12 +00:00
|
|
|
@safe_mutex = Mutex.new
|
2009-11-23 20:20:05 +00:00
|
|
|
|
|
|
|
# Condition variable for signal and wait
|
2009-12-28 18:05:45 +00:00
|
|
|
@queue = ConditionVariable.new
|
2009-11-23 20:20:05 +00:00
|
|
|
|
|
|
|
@sockets = []
|
|
|
|
@checked_out = []
|
|
|
|
|
2009-11-23 21:03:33 +00:00
|
|
|
if options[:auto_reconnect]
|
|
|
|
warn(":auto_reconnect is deprecated. see http://www.mongodb.org/display/DOCS/Replica+Pairs+in+Ruby")
|
|
|
|
end
|
2009-12-16 19:03:15 +00:00
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# slave_ok can be true only if one node is specified
|
2009-11-23 20:20:05 +00:00
|
|
|
@slave_ok = options[:slave_ok] && @nodes.length == 1
|
|
|
|
@logger = options[:logger] || nil
|
|
|
|
@options = options
|
|
|
|
|
|
|
|
should_connect = options[:connect].nil? ? true : options[:connect]
|
|
|
|
connect_to_master if should_connect
|
2009-10-07 23:39:36 +00:00
|
|
|
end
|
2009-01-16 14:52:31 +00:00
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Return a hash with all database names
|
|
|
|
# and their respective sizes on disk.
|
|
|
|
#
|
|
|
|
# @return [Hash]
|
2009-08-20 14:50:48 +00:00
|
|
|
def database_info
|
2009-11-23 20:20:05 +00:00
|
|
|
doc = self['admin'].command(:listDatabases => 1)
|
|
|
|
returning({}) do |info|
|
|
|
|
doc['databases'].each { |db| info[db['name']] = db['sizeOnDisk'].to_i }
|
|
|
|
end
|
2009-08-20 14:50:48 +00:00
|
|
|
end
|
2008-11-22 01:00:51 +00:00
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Return an array of database names.
|
|
|
|
#
|
|
|
|
# @return [Array]
|
2009-08-20 14:50:48 +00:00
|
|
|
def database_names
|
|
|
|
database_info.keys
|
|
|
|
end
|
2008-11-22 01:00:51 +00:00
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Return a database with the given name.
|
|
|
|
# See DB#new for valid options hash parameters.
|
|
|
|
#
|
|
|
|
# @param [String] db_name a valid database name.
|
|
|
|
#
|
|
|
|
# @return [Mongo::DB]
|
2010-02-08 17:12:18 +00:00
|
|
|
#
|
|
|
|
# @core databases db-instance_method
|
2009-11-23 20:20:05 +00:00
|
|
|
def db(db_name, options={})
|
|
|
|
DB.new(db_name, self, options.merge(:logger => @logger))
|
|
|
|
end
|
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Shortcut for returning a database. Use DB#db to accept options.
|
|
|
|
#
|
|
|
|
# @param [String] db_name a valid database name.
|
|
|
|
#
|
|
|
|
# @return [Mongo::DB]
|
2010-02-08 17:12:18 +00:00
|
|
|
#
|
|
|
|
# @core databases []-instance_method
|
2009-11-23 20:20:05 +00:00
|
|
|
def [](db_name)
|
|
|
|
DB.new(db_name, self, :logger => @logger)
|
|
|
|
end
|
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Drop a database.
|
|
|
|
#
|
|
|
|
# @param [String] name name of an existing database.
|
2009-08-20 14:50:48 +00:00
|
|
|
def drop_database(name)
|
2009-11-23 20:20:05 +00:00
|
|
|
self[name].command(:dropDatabase => 1)
|
2009-08-20 14:50:48 +00:00
|
|
|
end
|
2009-01-23 18:30:59 +00:00
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Copy the database +from+ on the local server to +to+ on the specified +host+.
|
2009-11-04 16:57:03 +00:00
|
|
|
# +host+ defaults to 'localhost' if no value is provided.
|
2010-01-07 17:37:53 +00:00
|
|
|
#
|
|
|
|
# @param [String] from name of the database to copy from.
|
|
|
|
# @param [String] to name of the database to copy to.
|
|
|
|
# @param [String] from_host host of the 'from' database.
|
|
|
|
def copy_database(from, to, from_host="localhost")
|
2009-11-03 17:33:02 +00:00
|
|
|
oh = OrderedHash.new
|
2009-11-04 16:57:03 +00:00
|
|
|
oh[:copydb] = 1
|
2010-01-07 17:37:53 +00:00
|
|
|
oh[:fromhost] = from_host
|
2009-11-04 16:57:03 +00:00
|
|
|
oh[:fromdb] = from
|
|
|
|
oh[:todb] = to
|
2009-11-23 20:20:05 +00:00
|
|
|
self["admin"].command(oh)
|
|
|
|
end
|
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Increment and return the next available request id.
|
|
|
|
#
|
|
|
|
# return [Integer]
|
2009-11-23 20:20:05 +00:00
|
|
|
def get_request_id
|
|
|
|
request_id = ''
|
2009-12-16 19:03:15 +00:00
|
|
|
@id_lock.synchronize do
|
2009-11-23 20:20:05 +00:00
|
|
|
request_id = @@current_request_id += 1
|
|
|
|
end
|
|
|
|
request_id
|
|
|
|
end
|
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Get the build information for the current connection.
|
|
|
|
#
|
|
|
|
# @return [Hash]
|
2009-10-26 18:54:33 +00:00
|
|
|
def server_info
|
2009-11-02 18:22:46 +00:00
|
|
|
db("admin").command({:buildinfo => 1}, {:admin => true, :check_response => true})
|
2009-10-26 18:54:33 +00:00
|
|
|
end
|
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Get the build version of the current server.
|
|
|
|
#
|
|
|
|
# @return [Mongo::ServerVersion]
|
|
|
|
# object allowing easy comparability of version.
|
2009-10-26 18:54:33 +00:00
|
|
|
def server_version
|
|
|
|
ServerVersion.new(server_info["version"])
|
|
|
|
end
|
|
|
|
|
2010-01-08 21:18:07 +00:00
|
|
|
# Is it okay to connect to a slave?
|
|
|
|
#
|
|
|
|
# @return [Boolean]
|
|
|
|
def slave_ok?
|
|
|
|
@slave_ok
|
|
|
|
end
|
|
|
|
|
2009-08-20 14:50:48 +00:00
|
|
|
|
2009-11-23 20:20:05 +00:00
|
|
|
## Connections and pooling ##
|
2009-12-16 19:03:15 +00:00
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Send a message to MongoDB, adding the necessary headers.
|
|
|
|
#
|
|
|
|
# @param [Integer] operation a MongoDB opcode.
|
|
|
|
# @param [ByteBuffer] message a message to send to the database.
|
|
|
|
# @param [String] log_message text version of +message+ for logging.
|
2009-11-23 20:20:05 +00:00
|
|
|
#
|
2010-01-07 17:37:53 +00:00
|
|
|
# @return [True]
|
2009-11-24 19:23:43 +00:00
|
|
|
def send_message(operation, message, log_message=nil)
|
2009-11-23 20:20:05 +00:00
|
|
|
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
2010-01-20 17:40:16 +00:00
|
|
|
begin
|
|
|
|
packed_message = add_message_headers(operation, message).to_s
|
|
|
|
socket = checkout
|
|
|
|
send_message_on_socket(packed_message, socket)
|
|
|
|
ensure
|
|
|
|
checkin(socket)
|
|
|
|
end
|
2009-11-23 20:20:05 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
# Sends a message to the database, waits for a response, and raises
|
2009-12-23 17:12:46 +00:00
|
|
|
# an exception if the operation has failed.
|
2009-11-24 19:23:43 +00:00
|
|
|
#
|
2010-01-07 17:37:53 +00:00
|
|
|
# @param [Integer] operation a MongoDB opcode.
|
|
|
|
# @param [ByteBuffer] message a message to send to the database.
|
|
|
|
# @param [String] db_name the name of the database. used on call to get_last_error.
|
|
|
|
# @param [String] log_message text version of +message+ for logging.
|
|
|
|
#
|
|
|
|
# @return [Array]
|
|
|
|
# An array whose indexes include [0] documents returned, [1] number of document received,
|
|
|
|
# and [3] a cursor_id.
|
2009-11-23 20:20:05 +00:00
|
|
|
def send_message_with_safe_check(operation, message, db_name, log_message=nil)
|
|
|
|
message_with_headers = add_message_headers(operation, message)
|
|
|
|
message_with_check = last_error_message(db_name)
|
|
|
|
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
2010-01-20 17:40:16 +00:00
|
|
|
begin
|
|
|
|
sock = checkout
|
|
|
|
packed_message = message_with_headers.append!(message_with_check).to_s
|
|
|
|
docs = num_received = cursor_id = ''
|
|
|
|
@safe_mutex.synchronize do
|
|
|
|
send_message_on_socket(packed_message, sock)
|
|
|
|
docs, num_received, cursor_id = receive(sock)
|
|
|
|
end
|
|
|
|
ensure
|
|
|
|
checkin(sock)
|
2009-12-29 18:27:12 +00:00
|
|
|
end
|
2009-11-23 20:20:05 +00:00
|
|
|
if num_received == 1 && error = docs[0]['err']
|
|
|
|
raise Mongo::OperationFailure, error
|
|
|
|
end
|
|
|
|
[docs, num_received, cursor_id]
|
|
|
|
end
|
|
|
|
|
2009-11-24 19:23:43 +00:00
|
|
|
# Sends a message to the database and waits for the response.
|
|
|
|
#
|
2010-01-07 17:37:53 +00:00
|
|
|
# @param [Integer] operation a MongoDB opcode.
|
|
|
|
# @param [ByteBuffer] message a message to send to the database.
|
|
|
|
# @param [String] log_message text version of +message+ for logging.
|
|
|
|
# @param [Socket] socket a socket to use in lieu of checking out a new one.
|
|
|
|
#
|
|
|
|
# @return [Array]
|
|
|
|
# An array whose indexes include [0] documents returned, [1] number of document received,
|
|
|
|
# and [3] a cursor_id.
|
2009-11-24 19:23:43 +00:00
|
|
|
def receive_message(operation, message, log_message=nil, socket=nil)
|
2009-11-24 19:43:52 +00:00
|
|
|
packed_message = add_message_headers(operation, message).to_s
|
2009-11-23 20:20:05 +00:00
|
|
|
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
2010-01-20 17:40:16 +00:00
|
|
|
begin
|
|
|
|
sock = socket || checkout
|
2009-11-23 20:20:05 +00:00
|
|
|
|
2010-01-20 17:40:16 +00:00
|
|
|
result = ''
|
|
|
|
@safe_mutex.synchronize do
|
|
|
|
send_message_on_socket(packed_message, sock)
|
|
|
|
result = receive(sock)
|
|
|
|
end
|
|
|
|
ensure
|
|
|
|
checkin(sock)
|
2009-12-29 18:27:12 +00:00
|
|
|
end
|
2009-11-24 18:47:37 +00:00
|
|
|
result
|
2009-11-23 20:20:05 +00:00
|
|
|
end
|
|
|
|
|
2010-01-07 17:37:53 +00:00
|
|
|
# Create a new socket and attempt to connect to master.
|
2010-01-05 22:42:52 +00:00
|
|
|
# If successful, sets host and port to master and returns the socket.
|
2010-01-07 17:37:53 +00:00
|
|
|
#
|
|
|
|
# @raise [ConnectionFailure] if unable to connect to any host or port.
|
2009-11-23 20:20:05 +00:00
|
|
|
def connect_to_master
|
2009-11-23 23:09:13 +00:00
|
|
|
close
|
2009-11-23 20:20:05 +00:00
|
|
|
@host = @port = nil
|
|
|
|
for node_pair in @nodes
|
|
|
|
host, port = *node_pair
|
|
|
|
begin
|
|
|
|
socket = TCPSocket.new(host, port)
|
|
|
|
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
|
|
|
|
2009-11-24 19:04:39 +00:00
|
|
|
# If we're connected to master, set the @host and @port
|
2009-11-23 20:20:05 +00:00
|
|
|
result = self['admin'].command({:ismaster => 1}, false, false, socket)
|
|
|
|
if result['ok'] == 1 && ((is_master = result['ismaster'] == 1) || @slave_ok)
|
|
|
|
@host, @port = host, port
|
|
|
|
end
|
|
|
|
|
|
|
|
# Note: slave_ok can be true only when connecting to a single node.
|
2009-11-24 18:55:59 +00:00
|
|
|
if @nodes.length == 1 && !is_master && !@slave_ok
|
2009-11-23 20:20:05 +00:00
|
|
|
raise ConfigurationError, "Trying to connect directly to slave; " +
|
|
|
|
"if this is what you want, specify :slave_ok => true."
|
|
|
|
end
|
|
|
|
|
|
|
|
break if is_master || @slave_ok
|
|
|
|
rescue SocketError, SystemCallError, IOError => ex
|
|
|
|
socket.close if socket
|
2010-02-04 23:07:45 +00:00
|
|
|
close
|
2009-11-23 20:20:05 +00:00
|
|
|
false
|
|
|
|
end
|
2009-12-16 19:03:15 +00:00
|
|
|
end
|
2009-11-23 21:03:33 +00:00
|
|
|
raise ConnectionFailure, "failed to connect to any given host:port" unless socket
|
2009-11-23 20:20:05 +00:00
|
|
|
end
|
|
|
|
|
2009-11-23 21:03:33 +00:00
|
|
|
# Are we connected to MongoDB? This is determined by checking whether
|
2010-01-05 22:42:52 +00:00
|
|
|
# host and port have values, since they're set to nil on calls to #close.
|
2009-11-23 20:20:05 +00:00
|
|
|
def connected?
|
2009-11-23 21:03:33 +00:00
|
|
|
@host && @port
|
2009-11-23 20:20:05 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
# Close the connection to the database.
|
|
|
|
def close
|
|
|
|
@sockets.each do |sock|
|
|
|
|
sock.close
|
|
|
|
end
|
|
|
|
@host = @port = nil
|
2009-12-16 19:03:15 +00:00
|
|
|
@sockets.clear
|
2009-11-23 20:20:05 +00:00
|
|
|
@checked_out.clear
|
|
|
|
end
|
|
|
|
|
2009-11-24 19:23:43 +00:00
|
|
|
private
|
|
|
|
|
2009-11-23 20:20:05 +00:00
|
|
|
# Return a socket to the pool.
|
|
|
|
def checkin(socket)
|
2009-12-16 19:03:15 +00:00
|
|
|
@connection_mutex.synchronize do
|
2009-11-24 18:47:37 +00:00
|
|
|
@checked_out.delete(socket)
|
2009-12-29 17:22:01 +00:00
|
|
|
@queue.signal
|
2009-11-23 20:20:05 +00:00
|
|
|
end
|
2009-11-24 18:47:37 +00:00
|
|
|
true
|
2009-11-23 20:20:05 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
# Adds a new socket to the pool and checks it out.
|
|
|
|
#
|
2009-12-29 17:22:01 +00:00
|
|
|
# This method is called exclusively from #checkout;
|
|
|
|
# therefore, it runs within a mutex.
|
2009-11-23 20:20:05 +00:00
|
|
|
def checkout_new_socket
|
2009-11-24 20:20:51 +00:00
|
|
|
begin
|
2009-11-23 20:20:05 +00:00
|
|
|
socket = TCPSocket.new(@host, @port)
|
|
|
|
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
2009-11-24 20:20:51 +00:00
|
|
|
rescue => ex
|
|
|
|
raise ConnectionFailure, "Failed to connect socket: #{ex}"
|
|
|
|
end
|
2009-11-23 20:20:05 +00:00
|
|
|
@sockets << socket
|
|
|
|
@checked_out << socket
|
|
|
|
socket
|
|
|
|
end
|
|
|
|
|
|
|
|
# Checks out the first available socket from the pool.
|
|
|
|
#
|
2009-12-29 17:22:01 +00:00
|
|
|
# This method is called exclusively from #checkout;
|
|
|
|
# therefore, it runs within a mutex.
|
2009-11-23 20:20:05 +00:00
|
|
|
def checkout_existing_socket
|
|
|
|
socket = (@sockets - @checked_out).first
|
|
|
|
@checked_out << socket
|
|
|
|
socket
|
|
|
|
end
|
|
|
|
|
|
|
|
# Check out an existing socket or create a new socket if the maximum
|
|
|
|
# pool size has not been exceeded. Otherwise, wait for the next
|
|
|
|
# available socket.
|
2009-12-21 14:06:28 +00:00
|
|
|
def checkout
|
2009-12-18 22:29:44 +00:00
|
|
|
connect_to_master if !connected?
|
|
|
|
start_time = Time.now
|
|
|
|
loop do
|
2009-12-21 14:06:28 +00:00
|
|
|
if (Time.now - start_time) > @timeout
|
2009-12-18 22:29:44 +00:00
|
|
|
raise ConnectionTimeoutError, "could not obtain connection within " +
|
|
|
|
"#{@timeout} seconds. The max pool size is currently #{@size}; " +
|
2009-12-21 14:06:28 +00:00
|
|
|
"consider increasing the pool size or timeout."
|
2009-12-18 22:29:44 +00:00
|
|
|
end
|
2009-11-24 19:43:52 +00:00
|
|
|
|
2009-12-18 22:29:44 +00:00
|
|
|
@connection_mutex.synchronize do
|
2009-11-23 20:20:05 +00:00
|
|
|
socket = if @checked_out.size < @sockets.size
|
|
|
|
checkout_existing_socket
|
|
|
|
elsif @sockets.size < @size
|
|
|
|
checkout_new_socket
|
|
|
|
end
|
|
|
|
|
|
|
|
return socket if socket
|
|
|
|
|
2009-12-28 18:05:45 +00:00
|
|
|
# Otherwise, wait
|
2010-01-20 17:40:16 +00:00
|
|
|
if @logger
|
|
|
|
@logger.warn "Waiting for available connection; #{@checked_out.size} of #{@size} connections checked out."
|
|
|
|
end
|
2009-12-28 18:05:45 +00:00
|
|
|
@queue.wait(@connection_mutex)
|
2009-11-24 21:13:14 +00:00
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2009-11-23 20:20:05 +00:00
|
|
|
def receive(sock)
|
|
|
|
receive_header(sock)
|
|
|
|
number_received, cursor_id = receive_response_header(sock)
|
|
|
|
read_documents(number_received, cursor_id, sock)
|
|
|
|
end
|
|
|
|
|
|
|
|
def receive_header(sock)
|
|
|
|
header = ByteBuffer.new
|
|
|
|
header.put_array(receive_message_on_socket(16, sock).unpack("C*"))
|
|
|
|
unless header.size == STANDARD_HEADER_SIZE
|
|
|
|
raise "Short read for DB response header: " +
|
2009-12-16 19:03:15 +00:00
|
|
|
"expected #{STANDARD_HEADER_SIZE} bytes, saw #{header.size}"
|
2009-11-23 20:20:05 +00:00
|
|
|
end
|
|
|
|
header.rewind
|
|
|
|
size = header.get_int
|
|
|
|
request_id = header.get_int
|
|
|
|
response_to = header.get_int
|
|
|
|
op = header.get_int
|
|
|
|
end
|
|
|
|
|
|
|
|
def receive_response_header(sock)
|
|
|
|
header_buf = ByteBuffer.new
|
|
|
|
header_buf.put_array(receive_message_on_socket(RESPONSE_HEADER_SIZE, sock).unpack("C*"))
|
|
|
|
if header_buf.length != RESPONSE_HEADER_SIZE
|
|
|
|
raise "Short read for DB response header; " +
|
|
|
|
"expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}"
|
|
|
|
end
|
|
|
|
header_buf.rewind
|
|
|
|
result_flags = header_buf.get_int
|
|
|
|
cursor_id = header_buf.get_long
|
|
|
|
starting_from = header_buf.get_int
|
|
|
|
number_remaining = header_buf.get_int
|
|
|
|
[number_remaining, cursor_id]
|
|
|
|
end
|
|
|
|
|
|
|
|
def read_documents(number_received, cursor_id, sock)
|
|
|
|
docs = []
|
|
|
|
number_remaining = number_received
|
|
|
|
while number_remaining > 0 do
|
|
|
|
buf = ByteBuffer.new
|
|
|
|
buf.put_array(receive_message_on_socket(4, sock).unpack("C*"))
|
|
|
|
buf.rewind
|
|
|
|
size = buf.get_int
|
|
|
|
buf.put_array(receive_message_on_socket(size - 4, sock).unpack("C*"), 4)
|
|
|
|
number_remaining -= 1
|
|
|
|
buf.rewind
|
2009-12-01 18:49:57 +00:00
|
|
|
docs << BSON.deserialize(buf)
|
2009-11-23 20:20:05 +00:00
|
|
|
end
|
|
|
|
[docs, number_received, cursor_id]
|
|
|
|
end
|
|
|
|
|
|
|
|
def last_error_message(db_name)
|
|
|
|
message = ByteBuffer.new
|
|
|
|
message.put_int(0)
|
2009-12-01 22:23:24 +00:00
|
|
|
BSON_RUBY.serialize_cstr(message, "#{db_name}.$cmd")
|
2009-11-23 20:20:05 +00:00
|
|
|
message.put_int(0)
|
|
|
|
message.put_int(-1)
|
2009-12-02 16:56:58 +00:00
|
|
|
message.put_array(BSON.serialize({:getlasterror => 1}, false).unpack("C*"))
|
2009-11-23 20:20:05 +00:00
|
|
|
add_message_headers(Mongo::Constants::OP_QUERY, message)
|
|
|
|
end
|
2009-12-16 19:03:15 +00:00
|
|
|
|
2009-11-23 20:20:05 +00:00
|
|
|
# Prepares a message for transmission to MongoDB by
|
2009-11-24 19:43:52 +00:00
|
|
|
# constructing a valid message header.
|
|
|
|
def add_message_headers(operation, message)
|
2009-11-23 20:20:05 +00:00
|
|
|
headers = ByteBuffer.new
|
|
|
|
|
|
|
|
# Message size.
|
|
|
|
headers.put_int(16 + message.size)
|
|
|
|
|
|
|
|
# Unique request id.
|
|
|
|
headers.put_int(get_request_id)
|
|
|
|
|
|
|
|
# Response id.
|
|
|
|
headers.put_int(0)
|
|
|
|
|
|
|
|
# Opcode.
|
|
|
|
headers.put_int(operation)
|
|
|
|
message.prepend!(headers)
|
|
|
|
end
|
|
|
|
|
|
|
|
# Low-level method for sending a message on a socket.
|
2009-12-16 19:03:15 +00:00
|
|
|
# Requires a packed message and an available socket,
|
2009-11-23 20:20:05 +00:00
|
|
|
def send_message_on_socket(packed_message, socket)
|
2009-11-24 18:55:59 +00:00
|
|
|
begin
|
2009-11-23 20:20:05 +00:00
|
|
|
socket.send(packed_message, 0)
|
2009-11-24 18:55:59 +00:00
|
|
|
rescue => ex
|
|
|
|
close
|
|
|
|
raise ConnectionFailure, "Operation failed with the following exception: #{ex}"
|
|
|
|
end
|
2009-11-23 20:20:05 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
# Low-level method for receiving data from socket.
|
|
|
|
# Requires length and an available socket.
|
|
|
|
def receive_message_on_socket(length, socket)
|
|
|
|
message = ""
|
2009-11-24 18:55:59 +00:00
|
|
|
begin
|
|
|
|
while message.length < length do
|
|
|
|
chunk = socket.recv(length - message.length)
|
|
|
|
raise ConnectionFailure, "connection closed" unless chunk.length > 0
|
|
|
|
message += chunk
|
|
|
|
end
|
|
|
|
rescue => ex
|
2010-02-04 23:07:45 +00:00
|
|
|
close
|
2009-11-24 18:55:59 +00:00
|
|
|
raise ConnectionFailure, "Operation failed with the following exception: #{ex}"
|
2009-11-23 20:20:05 +00:00
|
|
|
end
|
|
|
|
message
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
## Private helper methods
|
|
|
|
|
|
|
|
# Returns an array of host-port pairs.
|
2009-12-02 21:24:36 +00:00
|
|
|
def format_pair(pair_or_host, port)
|
2009-11-23 20:20:05 +00:00
|
|
|
case pair_or_host
|
|
|
|
when String
|
|
|
|
[[pair_or_host, port ? port.to_i : DEFAULT_PORT]]
|
|
|
|
when Hash
|
|
|
|
connections = []
|
|
|
|
connections << pair_val_to_connection(pair_or_host[:left])
|
|
|
|
connections << pair_val_to_connection(pair_or_host[:right])
|
|
|
|
connections
|
|
|
|
when nil
|
|
|
|
[['localhost', DEFAULT_PORT]]
|
|
|
|
end
|
|
|
|
end
|
2009-12-16 19:03:15 +00:00
|
|
|
|
2009-08-20 14:50:48 +00:00
|
|
|
# Turns an array containing a host name string and a
|
|
|
|
# port number integer into a [host, port] pair array.
|
|
|
|
def pair_val_to_connection(a)
|
|
|
|
case a
|
|
|
|
when nil
|
|
|
|
['localhost', DEFAULT_PORT]
|
|
|
|
when String
|
|
|
|
[a, DEFAULT_PORT]
|
|
|
|
when Integer
|
|
|
|
['localhost', a]
|
|
|
|
when Array
|
|
|
|
a
|
|
|
|
end
|
|
|
|
end
|
2009-11-24 19:23:43 +00:00
|
|
|
|
2009-08-20 22:48:09 +00:00
|
|
|
end
|
2008-11-22 01:00:51 +00:00
|
|
|
end
|