Initial decoupling of Connection, ReplSetConnection,
and networking code.
This commit is contained in:
parent
3655a94934
commit
a5373ddb83
|
@ -65,6 +65,7 @@ require 'mongo/util/ssl_socket'
|
||||||
require 'mongo/util/uri_parser'
|
require 'mongo/util/uri_parser'
|
||||||
|
|
||||||
require 'mongo/collection'
|
require 'mongo/collection'
|
||||||
|
require 'mongo/networking'
|
||||||
require 'mongo/connection'
|
require 'mongo/connection'
|
||||||
require 'mongo/repl_set_connection'
|
require 'mongo/repl_set_connection'
|
||||||
require 'mongo/cursor'
|
require 'mongo/cursor'
|
||||||
|
|
|
@ -25,6 +25,7 @@ module Mongo
|
||||||
# Instantiates and manages connections to MongoDB.
|
# Instantiates and manages connections to MongoDB.
|
||||||
class Connection
|
class Connection
|
||||||
include Mongo::Logging
|
include Mongo::Logging
|
||||||
|
include Mongo::Networking
|
||||||
|
|
||||||
TCPSocket = ::TCPSocket
|
TCPSocket = ::TCPSocket
|
||||||
Mutex = ::Mutex
|
Mutex = ::Mutex
|
||||||
|
@ -33,15 +34,11 @@ module Mongo
|
||||||
Thread.abort_on_exception = true
|
Thread.abort_on_exception = true
|
||||||
|
|
||||||
DEFAULT_PORT = 27017
|
DEFAULT_PORT = 27017
|
||||||
STANDARD_HEADER_SIZE = 16
|
|
||||||
RESPONSE_HEADER_SIZE = 20
|
|
||||||
|
|
||||||
attr_reader :logger, :size, :auths, :primary, :safe, :host_to_try,
|
attr_reader :logger, :size, :auths, :primary, :safe, :host_to_try,
|
||||||
:pool_size, :connect_timeout, :pool_timeout,
|
:pool_size, :connect_timeout, :pool_timeout,
|
||||||
:primary_pool, :socket_class
|
:primary_pool, :socket_class
|
||||||
|
|
||||||
# Counter for generating unique request ids.
|
|
||||||
@@current_request_id = 0
|
|
||||||
|
|
||||||
# Create a connection to single MongoDB instance.
|
# Create a connection to single MongoDB instance.
|
||||||
#
|
#
|
||||||
|
@ -382,126 +379,6 @@ module Mongo
|
||||||
@slave_ok
|
@slave_ok
|
||||||
end
|
end
|
||||||
|
|
||||||
# Send a message to MongoDB, adding the necessary headers.
|
|
||||||
#
|
|
||||||
# @param [Integer] operation a MongoDB opcode.
|
|
||||||
# @param [BSON::ByteBuffer] message a message to send to the database.
|
|
||||||
#
|
|
||||||
# @option opts [Symbol] :connection (:writer) The connection to which
|
|
||||||
# this message should be sent. Valid options are :writer and :reader.
|
|
||||||
#
|
|
||||||
# @return [Integer] number of bytes sent
|
|
||||||
def send_message(operation, message, opts={})
|
|
||||||
if opts.is_a?(String)
|
|
||||||
warn "Connection#send_message no longer takes a string log message. " +
|
|
||||||
"Logging is now handled within the Collection and Cursor classes."
|
|
||||||
opts = {}
|
|
||||||
end
|
|
||||||
|
|
||||||
connection = opts.fetch(:connection, :writer)
|
|
||||||
|
|
||||||
begin
|
|
||||||
add_message_headers(message, operation)
|
|
||||||
packed_message = message.to_s
|
|
||||||
|
|
||||||
if connection == :writer
|
|
||||||
socket = checkout_writer
|
|
||||||
else
|
|
||||||
socket = checkout_reader
|
|
||||||
end
|
|
||||||
|
|
||||||
send_message_on_socket(packed_message, socket)
|
|
||||||
ensure
|
|
||||||
checkin(socket)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# Sends a message to the database, waits for a response, and raises
|
|
||||||
# an exception if the operation has failed.
|
|
||||||
#
|
|
||||||
# @param [Integer] operation a MongoDB opcode.
|
|
||||||
# @param [BSON::ByteBuffer] message a message to send to the database.
|
|
||||||
# @param [String] db_name the name of the database. used on call to get_last_error.
|
|
||||||
# @param [Hash] last_error_params parameters to be sent to getLastError. See DB#error for
|
|
||||||
# available options.
|
|
||||||
#
|
|
||||||
# @see DB#get_last_error for valid last error params.
|
|
||||||
#
|
|
||||||
# @return [Hash] The document returned by the call to getlasterror.
|
|
||||||
def send_message_with_safe_check(operation, message, db_name, log_message=nil, last_error_params=false)
|
|
||||||
docs = num_received = cursor_id = ''
|
|
||||||
add_message_headers(message, operation)
|
|
||||||
|
|
||||||
last_error_message = BSON::ByteBuffer.new
|
|
||||||
build_last_error_message(last_error_message, db_name, last_error_params)
|
|
||||||
last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY)
|
|
||||||
|
|
||||||
packed_message = message.append!(last_error_message).to_s
|
|
||||||
begin
|
|
||||||
sock = checkout_writer
|
|
||||||
send_message_on_socket(packed_message, sock)
|
|
||||||
docs, num_received, cursor_id = receive(sock, last_error_id)
|
|
||||||
ensure
|
|
||||||
checkin(sock)
|
|
||||||
end
|
|
||||||
|
|
||||||
if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg'])
|
|
||||||
close if error == "not master"
|
|
||||||
error = "wtimeout" if error == "timeout"
|
|
||||||
raise OperationFailure.new(docs[0]['code'].to_s + ': ' + error, docs[0]['code'], docs[0])
|
|
||||||
end
|
|
||||||
|
|
||||||
docs[0]
|
|
||||||
end
|
|
||||||
|
|
||||||
# Sends a message to the database and waits for the response.
|
|
||||||
#
|
|
||||||
# @param [Integer] operation a MongoDB opcode.
|
|
||||||
# @param [BSON::ByteBuffer] message a message to send to the database.
|
|
||||||
# @param [String] log_message this is currently a no-op and will be removed.
|
|
||||||
# @param [Socket] socket a socket to use in lieu of checking out a new one.
|
|
||||||
# @param [Boolean] command (false) indicate whether this is a command. If this is a command,
|
|
||||||
# the message will be sent to the primary node.
|
|
||||||
# @param [Boolean] command (false) indicate whether the cursor should be exhausted. Set
|
|
||||||
# this to true only when the OP_QUERY_EXHAUST flag is set.
|
|
||||||
#
|
|
||||||
# @return [Array]
|
|
||||||
# An array whose indexes include [0] documents returned, [1] number of document received,
|
|
||||||
# and [3] a cursor_id.
|
|
||||||
def receive_message(operation, message, log_message=nil, socket=nil, command=false,
|
|
||||||
read=:primary, exhaust=false)
|
|
||||||
request_id = add_message_headers(message, operation)
|
|
||||||
packed_message = message.to_s
|
|
||||||
begin
|
|
||||||
if socket
|
|
||||||
sock = socket
|
|
||||||
should_checkin = false
|
|
||||||
else
|
|
||||||
if command
|
|
||||||
sock = checkout_writer
|
|
||||||
elsif read == :primary
|
|
||||||
sock = checkout_writer
|
|
||||||
elsif read == :secondary
|
|
||||||
sock = checkout_reader
|
|
||||||
else
|
|
||||||
sock = checkout_tagged(read)
|
|
||||||
end
|
|
||||||
should_checkin = true
|
|
||||||
end
|
|
||||||
|
|
||||||
result = ''
|
|
||||||
@safe_mutexes[sock].synchronize do
|
|
||||||
send_message_on_socket(packed_message, sock)
|
|
||||||
result = receive(sock, request_id, exhaust)
|
|
||||||
end
|
|
||||||
ensure
|
|
||||||
if should_checkin
|
|
||||||
checkin(sock)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
result
|
|
||||||
end
|
|
||||||
|
|
||||||
def get_socket_from_thread_local
|
def get_socket_from_thread_local
|
||||||
Thread.current[:socket_map] ||= {}
|
Thread.current[:socket_map] ||= {}
|
||||||
Thread.current[:socket_map][self] ||= {}
|
Thread.current[:socket_map][self] ||= {}
|
||||||
|
@ -761,212 +638,5 @@ module Mongo
|
||||||
@primary = [host, port]
|
@primary = [host, port]
|
||||||
@primary_pool = Pool.new(self, host, port, :size => @pool_size, :timeout => @pool_timeout)
|
@primary_pool = Pool.new(self, host, port, :size => @pool_size, :timeout => @pool_timeout)
|
||||||
end
|
end
|
||||||
|
|
||||||
## Low-level connection methods.
|
|
||||||
|
|
||||||
def receive(sock, cursor_id, exhaust=false)
|
|
||||||
begin
|
|
||||||
if exhaust
|
|
||||||
docs = []
|
|
||||||
num_received = 0
|
|
||||||
|
|
||||||
while(cursor_id != 0) do
|
|
||||||
receive_header(sock, cursor_id, exhaust)
|
|
||||||
number_received, cursor_id = receive_response_header(sock)
|
|
||||||
new_docs, n = read_documents(number_received, sock)
|
|
||||||
docs += new_docs
|
|
||||||
num_received += n
|
|
||||||
end
|
|
||||||
|
|
||||||
return [docs, num_received, cursor_id]
|
|
||||||
else
|
|
||||||
receive_header(sock, cursor_id, exhaust)
|
|
||||||
number_received, cursor_id = receive_response_header(sock)
|
|
||||||
docs, num_received = read_documents(number_received, sock)
|
|
||||||
|
|
||||||
return [docs, num_received, cursor_id]
|
|
||||||
end
|
|
||||||
rescue Mongo::ConnectionFailure => ex
|
|
||||||
close
|
|
||||||
raise ex
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def receive_header(sock, expected_response, exhaust=false)
|
|
||||||
header = receive_message_on_socket(16, sock)
|
|
||||||
size, request_id, response_to = header.unpack('VVV')
|
|
||||||
if !exhaust && expected_response != response_to
|
|
||||||
raise Mongo::ConnectionFailure, "Expected response #{expected_response} but got #{response_to}"
|
|
||||||
end
|
|
||||||
|
|
||||||
unless header.size == STANDARD_HEADER_SIZE
|
|
||||||
raise "Short read for DB response header: " +
|
|
||||||
"expected #{STANDARD_HEADER_SIZE} bytes, saw #{header.size}"
|
|
||||||
end
|
|
||||||
nil
|
|
||||||
end
|
|
||||||
|
|
||||||
def receive_response_header(sock)
|
|
||||||
header_buf = receive_message_on_socket(RESPONSE_HEADER_SIZE, sock)
|
|
||||||
if header_buf.length != RESPONSE_HEADER_SIZE
|
|
||||||
raise "Short read for DB response header; " +
|
|
||||||
"expected #{RESPONSE_HEADER_SIZE} bytes, saw #{header_buf.length}"
|
|
||||||
end
|
|
||||||
flags, cursor_id_a, cursor_id_b, starting_from, number_remaining = header_buf.unpack('VVVVV')
|
|
||||||
check_response_flags(flags)
|
|
||||||
cursor_id = (cursor_id_b << 32) + cursor_id_a
|
|
||||||
[number_remaining, cursor_id]
|
|
||||||
end
|
|
||||||
|
|
||||||
def check_response_flags(flags)
|
|
||||||
if flags & Mongo::Constants::REPLY_CURSOR_NOT_FOUND != 0
|
|
||||||
raise Mongo::OperationFailure, "Query response returned CURSOR_NOT_FOUND. " +
|
|
||||||
"Either an invalid cursor was specified, or the cursor may have timed out on the server."
|
|
||||||
elsif flags & Mongo::Constants::REPLY_QUERY_FAILURE != 0
|
|
||||||
# Getting odd failures when a exception is raised here.
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def read_documents(number_received, sock)
|
|
||||||
docs = []
|
|
||||||
number_remaining = number_received
|
|
||||||
while number_remaining > 0 do
|
|
||||||
buf = receive_message_on_socket(4, sock)
|
|
||||||
size = buf.unpack('V')[0]
|
|
||||||
buf << receive_message_on_socket(size - 4, sock)
|
|
||||||
number_remaining -= 1
|
|
||||||
docs << BSON::BSON_CODER.deserialize(buf)
|
|
||||||
end
|
|
||||||
[docs, number_received]
|
|
||||||
end
|
|
||||||
|
|
||||||
# Constructs a getlasterror message. This method is used exclusively by
|
|
||||||
# Connection#send_message_with_safe_check.
|
|
||||||
#
|
|
||||||
# Because it modifies message by reference, we don't need to return it.
|
|
||||||
def build_last_error_message(message, db_name, opts)
|
|
||||||
message.put_int(0)
|
|
||||||
BSON::BSON_RUBY.serialize_cstr(message, "#{db_name}.$cmd")
|
|
||||||
message.put_int(0)
|
|
||||||
message.put_int(-1)
|
|
||||||
cmd = BSON::OrderedHash.new
|
|
||||||
cmd[:getlasterror] = 1
|
|
||||||
if opts.is_a?(Hash)
|
|
||||||
opts.assert_valid_keys(:w, :wtimeout, :fsync)
|
|
||||||
cmd.merge!(opts)
|
|
||||||
end
|
|
||||||
message.put_binary(BSON::BSON_CODER.serialize(cmd, false).to_s)
|
|
||||||
nil
|
|
||||||
end
|
|
||||||
|
|
||||||
# Prepares a message for transmission to MongoDB by
|
|
||||||
# constructing a valid message header.
|
|
||||||
#
|
|
||||||
# Note: this method modifies message by reference.
|
|
||||||
#
|
|
||||||
# @return [Integer] the request id used in the header
|
|
||||||
def add_message_headers(message, operation)
|
|
||||||
headers = [
|
|
||||||
# Message size.
|
|
||||||
16 + message.size,
|
|
||||||
|
|
||||||
# Unique request id.
|
|
||||||
request_id = get_request_id,
|
|
||||||
|
|
||||||
# Response id.
|
|
||||||
0,
|
|
||||||
|
|
||||||
# Opcode.
|
|
||||||
operation
|
|
||||||
].pack('VVVV')
|
|
||||||
|
|
||||||
message.prepend!(headers)
|
|
||||||
|
|
||||||
request_id
|
|
||||||
end
|
|
||||||
|
|
||||||
# Increment and return the next available request id.
|
|
||||||
#
|
|
||||||
# return [Integer]
|
|
||||||
def get_request_id
|
|
||||||
request_id = ''
|
|
||||||
@id_lock.synchronize do
|
|
||||||
request_id = @@current_request_id += 1
|
|
||||||
end
|
|
||||||
request_id
|
|
||||||
end
|
|
||||||
|
|
||||||
# Low-level method for sending a message on a socket.
|
|
||||||
# Requires a packed message and an available socket,
|
|
||||||
#
|
|
||||||
# @return [Integer] number of bytes sent
|
|
||||||
def send_message_on_socket(packed_message, socket)
|
|
||||||
begin
|
|
||||||
total_bytes_sent = socket.send(packed_message, 0)
|
|
||||||
if total_bytes_sent != packed_message.size
|
|
||||||
packed_message.slice!(0, total_bytes_sent)
|
|
||||||
while packed_message.size > 0
|
|
||||||
byte_sent = socket.send(packed_message, 0)
|
|
||||||
total_bytes_sent += byte_sent
|
|
||||||
packed_message.slice!(0, byte_sent)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
total_bytes_sent
|
|
||||||
rescue => ex
|
|
||||||
close
|
|
||||||
raise ConnectionFailure, "Operation failed with the following exception: #{ex}"
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# Low-level method for receiving data from socket.
|
|
||||||
# Requires length and an available socket.
|
|
||||||
def receive_message_on_socket(length, socket)
|
|
||||||
begin
|
|
||||||
if @op_timeout
|
|
||||||
message = nil
|
|
||||||
Mongo::TimeoutHandler.timeout(@op_timeout, OperationTimeout) do
|
|
||||||
message = receive_data(length, socket)
|
|
||||||
end
|
|
||||||
else
|
|
||||||
message = receive_data(length, socket)
|
|
||||||
end
|
|
||||||
rescue => ex
|
|
||||||
close
|
|
||||||
|
|
||||||
if ex.class == OperationTimeout
|
|
||||||
raise OperationTimeout, "Timed out waiting on socket read."
|
|
||||||
else
|
|
||||||
raise ConnectionFailure, "Operation failed with the following exception: #{ex}"
|
|
||||||
end
|
|
||||||
end
|
|
||||||
message
|
|
||||||
end
|
|
||||||
|
|
||||||
def receive_data(length, socket)
|
|
||||||
message = new_binary_string
|
|
||||||
socket.read(length, message)
|
|
||||||
raise ConnectionFailure, "connection closed" unless message && message.length > 0
|
|
||||||
if message.length < length
|
|
||||||
chunk = new_binary_string
|
|
||||||
while message.length < length
|
|
||||||
socket.read(length - message.length, chunk)
|
|
||||||
raise ConnectionFailure, "connection closed" unless chunk.length > 0
|
|
||||||
message << chunk
|
|
||||||
end
|
|
||||||
end
|
|
||||||
message
|
|
||||||
end
|
|
||||||
|
|
||||||
if defined?(Encoding)
|
|
||||||
BINARY_ENCODING = Encoding.find("binary")
|
|
||||||
|
|
||||||
def new_binary_string
|
|
||||||
"".force_encoding(BINARY_ENCODING)
|
|
||||||
end
|
|
||||||
else
|
|
||||||
def new_binary_string
|
|
||||||
""
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -422,6 +422,67 @@ module Mongo
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
|
# Generic initialization code.
|
||||||
|
def setup(opts)
|
||||||
|
# Default maximum BSON object size
|
||||||
|
@max_bson_size = Mongo::DEFAULT_MAX_BSON_SIZE
|
||||||
|
|
||||||
|
@safe_mutex_lock = Mutex.new
|
||||||
|
@safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}
|
||||||
|
|
||||||
|
# Determine whether to use SSL.
|
||||||
|
@ssl = opts.fetch(:ssl, false)
|
||||||
|
if @ssl
|
||||||
|
@socket_class = Mongo::SSLSocket
|
||||||
|
else
|
||||||
|
@socket_class = ::TCPSocket
|
||||||
|
end
|
||||||
|
|
||||||
|
# Authentication objects
|
||||||
|
@auths = opts.fetch(:auths, [])
|
||||||
|
|
||||||
|
# Lock for request ids.
|
||||||
|
@id_lock = Mutex.new
|
||||||
|
|
||||||
|
# Pool size and timeout.
|
||||||
|
@pool_size = opts[:pool_size] || 1
|
||||||
|
if opts[:timeout]
|
||||||
|
warn "The :timeout option has been deprecated " +
|
||||||
|
"and will be removed in the 2.0 release. Use :pool_timeout instead."
|
||||||
|
end
|
||||||
|
@pool_timeout = opts[:pool_timeout] || opts[:timeout] || 5.0
|
||||||
|
|
||||||
|
# Timeout on socket read operation.
|
||||||
|
@op_timeout = opts[:op_timeout] || nil
|
||||||
|
|
||||||
|
# Timeout on socket connect.
|
||||||
|
@connect_timeout = opts[:connect_timeout] || nil
|
||||||
|
|
||||||
|
# Mutex for synchronizing pool access
|
||||||
|
# TODO: remove this.
|
||||||
|
@connection_mutex = Mutex.new
|
||||||
|
|
||||||
|
# Global safe option. This is false by default.
|
||||||
|
@safe = opts[:safe] || false
|
||||||
|
|
||||||
|
# Condition variable for signal and wait
|
||||||
|
@queue = ConditionVariable.new
|
||||||
|
|
||||||
|
# Connection pool for primay node
|
||||||
|
@primary = nil
|
||||||
|
@primary_pool = nil
|
||||||
|
|
||||||
|
@logger = opts[:logger] || nil
|
||||||
|
|
||||||
|
if @logger
|
||||||
|
@logger.debug("MongoDB logging. Please note that logging negatively impacts performance " +
|
||||||
|
"and should be disabled for high-performance production apps.")
|
||||||
|
end
|
||||||
|
|
||||||
|
should_connect = opts.fetch(:connect, true)
|
||||||
|
connect if should_connect
|
||||||
|
end
|
||||||
|
|
||||||
# Given a pool manager, update this connection's
|
# Given a pool manager, update this connection's
|
||||||
# view of the replica set.
|
# view of the replica set.
|
||||||
def update_config(new_manager)
|
def update_config(new_manager)
|
||||||
|
|
Loading…
Reference in New Issue