Store sockets in thread-local variables when possible.
Allow connection pools to grow if needed. All this minimizes the number of locks required and reduces the waiting time for these locks.
This commit is contained in:
parent
0b33a48dd9
commit
f668678fd1
@ -19,10 +19,9 @@
|
||||
require 'set'
|
||||
require 'socket'
|
||||
require 'thread'
|
||||
|
||||
module Mongo
|
||||
|
||||
# Instantiates and manages connections to MongoDB.
|
||||
# Instantiates and manages self.connections to MongoDB.
|
||||
class Connection
|
||||
include Mongo::Logging
|
||||
include Mongo::Networking
|
||||
@ -35,11 +34,12 @@ module Mongo
|
||||
|
||||
DEFAULT_PORT = 27017
|
||||
|
||||
mongo_thread_local_accessor :connections
|
||||
|
||||
attr_reader :logger, :size, :auths, :primary, :safe, :host_to_try,
|
||||
:pool_size, :connect_timeout, :pool_timeout,
|
||||
:primary_pool, :socket_class
|
||||
|
||||
|
||||
# Create a connection to single MongoDB instance.
|
||||
#
|
||||
# You may specify whether connection to slave is permitted.
|
||||
@ -63,9 +63,9 @@ module Mongo
|
||||
# to a single, slave node.
|
||||
# @option opts [Logger, #debug] :logger (nil) A Logger instance for debugging driver ops. Note that
|
||||
# logging negatively impacts performance; therefore, it should not be used for high-performance apps.
|
||||
# @option opts [Integer] :pool_size (1) The maximum number of socket connections allowed per
|
||||
# @option opts [Integer] :pool_size (1) The maximum number of socket self.connections allowed per
|
||||
# connection pool. Note: this setting is relevant only for multi-threaded applications.
|
||||
# @option opts [Float] :pool_timeout (5.0) When all of the connections a pool are checked out,
|
||||
# @option opts [Float] :pool_timeout (5.0) When all of the self.connections a pool are checked out,
|
||||
# this is the number of seconds to wait for a new connection to be released before throwing an exception.
|
||||
# Note: this setting is relevant only for multi-threaded applications (which in Ruby are rare).
|
||||
# @option opts [Float] :op_timeout (nil) The number of seconds to wait for a read operation to time out.
|
||||
@ -80,7 +80,7 @@ module Mongo
|
||||
# @example localhost, 27017
|
||||
# Connection.new("localhost")
|
||||
#
|
||||
# @example localhost, 3000, max 5 connections, with max 5 seconds of wait time.
|
||||
# @example localhost, 3000, max 5 self.connections, with max 5 seconds of wait time.
|
||||
# Connection.new("localhost", 3000, :pool_size => 5, :timeout => 5)
|
||||
#
|
||||
# @example localhost, 3000, where this node may be a slave
|
||||
@ -91,7 +91,7 @@ module Mongo
|
||||
# @raise [ReplicaSetConnectionError] This is raised if a replica set name is specified and the
|
||||
# driver fails to connect to a replica set with that name.
|
||||
#
|
||||
# @core connections
|
||||
# @core self.connections
|
||||
def initialize(host=nil, port=nil, opts={})
|
||||
@host_to_try = format_pair(host, port)
|
||||
|
||||
@ -481,6 +481,55 @@ module Mongo
|
||||
@max_bson_size
|
||||
end
|
||||
|
||||
|
||||
def get_local_reader
|
||||
self.connections ||= {}
|
||||
self.connections[self.object_id] ||= {}
|
||||
self.connections[self.object_id][:reader] ||= checkout_reader
|
||||
|
||||
# Thread.current[:connections] ||= {}
|
||||
# Thread.current[:connections][self.object_id] ||= {}
|
||||
# Thread.current[:connections][self.object_id][:reader] ||= checkout_reader
|
||||
end
|
||||
|
||||
def get_local_writer
|
||||
self.connections ||= {}
|
||||
self.connections[self.object_id] ||= {}
|
||||
self.connections[self.object_id][:writer] ||= checkout_writer
|
||||
# Thread.current[:connections] ||= {}
|
||||
# Thread.current[:connections][self.object_id] ||= {}
|
||||
# Thread.current[:connections][self.object_id][:writer] ||= checkout_writer
|
||||
end
|
||||
|
||||
# Used to close, check in, or refresh sockets held
|
||||
# in thread-local variables.
|
||||
def local_socket_done(socket)
|
||||
#checkin(socket)
|
||||
puts "Done. Threads: #{Thread.list.size}, pool_size: #{self.pool_size}"
|
||||
if self.connections[self.object_id][:reader] == socket
|
||||
if self.read_pool.sockets_low?
|
||||
puts "***SOCKETS ARE LOW! READER****"
|
||||
checkin(socket)
|
||||
self.connections[self.object_id][:reader] = nil
|
||||
end
|
||||
end
|
||||
|
||||
if self.connections[self.object_id][:writer] == socket
|
||||
if self.primary_pool && self.primary_pool.sockets_low?
|
||||
puts "***SOCKETS ARE LOW! WRITER****"
|
||||
checkin(socket)
|
||||
self.connections[self.object_id][:writer] = nil
|
||||
end
|
||||
end
|
||||
|
||||
# if Thread.current[:connections][self.object_id][:reader] == socket
|
||||
# Thread.current[:connections][self.object_id][:reader] = nil
|
||||
# end
|
||||
# if Thread.current[:connections][self.object_id][:writer] == socket
|
||||
# Thread.current[:connections][self.object_id][:writer] = nil
|
||||
# end
|
||||
end
|
||||
|
||||
# Checkout a socket for reading (i.e., a secondary node).
|
||||
# Note: this is overridden in ReplSetConnection.
|
||||
def checkout_reader
|
||||
@ -525,9 +574,6 @@ module Mongo
|
||||
# Default maximum BSON object size
|
||||
@max_bson_size = Mongo::DEFAULT_MAX_BSON_SIZE
|
||||
|
||||
@safe_mutex_lock = Mutex.new
|
||||
@safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}
|
||||
|
||||
# Determine whether to use SSL.
|
||||
@ssl = opts.fetch(:ssl, false)
|
||||
if @ssl
|
||||
|
@ -470,9 +470,11 @@ module Mongo
|
||||
results, @n_received, @cursor_id = @connection.receive_message(
|
||||
Mongo::Constants::OP_QUERY, message, nil, sock, @command,
|
||||
nil, @options & OP_QUERY_EXHAUST != 0)
|
||||
ensure
|
||||
checkin_socket(sock) unless @socket
|
||||
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
|
||||
force_checkin_socket(sock)
|
||||
raise ex
|
||||
end
|
||||
checkin_socket(sock) unless @socket
|
||||
@returned += @n_received
|
||||
@cache += results
|
||||
@query_run = true
|
||||
@ -505,9 +507,11 @@ module Mongo
|
||||
begin
|
||||
results, @n_received, @cursor_id = @connection.receive_message(
|
||||
Mongo::Constants::OP_GET_MORE, message, nil, sock, @command, nil)
|
||||
ensure
|
||||
checkin_socket(sock) unless @socket
|
||||
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
|
||||
force_checkin_socket(sock)
|
||||
raise ex
|
||||
end
|
||||
checkin_socket(sock) unless @socket
|
||||
@returned += @n_received
|
||||
@cache += results
|
||||
close_cursor_if_query_complete
|
||||
@ -516,10 +520,10 @@ module Mongo
|
||||
def checkout_socket_from_connection
|
||||
@checkin_connection = true
|
||||
if @command || @read_preference == :primary
|
||||
@connection.checkout_writer
|
||||
@connection.get_local_writer
|
||||
else
|
||||
@read_pool = @connection.read_pool
|
||||
@connection.checkout_reader
|
||||
@connection.get_local_reader
|
||||
end
|
||||
end
|
||||
|
||||
@ -552,6 +556,16 @@ module Mongo
|
||||
@read_pool.checkin(sock)
|
||||
@checkin_read_pool = false
|
||||
elsif @checkin_connection
|
||||
@connection.local_socket_done(sock)
|
||||
@checkin_connection = false
|
||||
end
|
||||
end
|
||||
|
||||
def force_checkin_socket(sock)
|
||||
if @checkin_read_pool
|
||||
@read_pool.checkin(sock)
|
||||
@checkin_read_pool = false
|
||||
else
|
||||
@connection.checkin(sock)
|
||||
@checkin_connection = false
|
||||
end
|
||||
|
@ -30,14 +30,16 @@ module Mongo
|
||||
packed_message = message.to_s
|
||||
|
||||
if connection == :writer
|
||||
socket = checkout_writer
|
||||
sock = get_local_writer
|
||||
else
|
||||
socket = checkout_reader
|
||||
sock = get_local_reader
|
||||
end
|
||||
|
||||
send_message_on_socket(packed_message, socket)
|
||||
ensure
|
||||
checkin(socket)
|
||||
send_message_on_socket(packed_message, sock)
|
||||
local_socket_done(sock)
|
||||
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
|
||||
checkin(sock)
|
||||
raise ex
|
||||
end
|
||||
end
|
||||
|
||||
@ -63,11 +65,13 @@ module Mongo
|
||||
|
||||
packed_message = message.append!(last_error_message).to_s
|
||||
begin
|
||||
sock = checkout_writer
|
||||
sock = get_local_writer
|
||||
send_message_on_socket(packed_message, sock)
|
||||
docs, num_received, cursor_id = receive(sock, last_error_id)
|
||||
ensure
|
||||
local_socket_done(sock)
|
||||
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
|
||||
checkin(sock)
|
||||
raise ex
|
||||
end
|
||||
|
||||
if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg'])
|
||||
@ -103,11 +107,11 @@ module Mongo
|
||||
should_checkin = false
|
||||
else
|
||||
if command
|
||||
sock = checkout_writer
|
||||
sock = get_local_writer
|
||||
elsif read == :primary
|
||||
sock = checkout_writer
|
||||
sock = get_local_writer
|
||||
elsif read == :secondary
|
||||
sock = checkout_reader
|
||||
sock = get_local_reader
|
||||
else
|
||||
sock = checkout_tagged(read)
|
||||
end
|
||||
@ -115,14 +119,12 @@ module Mongo
|
||||
end
|
||||
|
||||
result = ''
|
||||
@safe_mutexes[sock].synchronize do
|
||||
send_message_on_socket(packed_message, sock)
|
||||
result = receive(sock, request_id, exhaust)
|
||||
end
|
||||
ensure
|
||||
if should_checkin
|
||||
checkin(sock)
|
||||
end
|
||||
send_message_on_socket(packed_message, sock)
|
||||
result = receive(sock, request_id, exhaust)
|
||||
local_socket_done(sock) if should_checkin
|
||||
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
|
||||
checkin(sock) if should_checkin
|
||||
raise ex
|
||||
end
|
||||
result
|
||||
end
|
||||
|
@ -20,7 +20,8 @@ module Mongo
|
||||
|
||||
# Instantiates and manages connections to a MongoDB replica set.
|
||||
class ReplSetConnection < Connection
|
||||
attr_reader :replica_set_name, :seeds, :refresh_interval, :refresh_mode
|
||||
attr_reader :replica_set_name, :seeds, :refresh_interval, :refresh_mode,
|
||||
:refresh_version
|
||||
|
||||
# Create a connection to a MongoDB replica set.
|
||||
#
|
||||
@ -122,7 +123,8 @@ module Mongo
|
||||
@connected = false
|
||||
|
||||
# Store the refresher thread
|
||||
@refresh_thread = nil
|
||||
@refresh_thread = nil
|
||||
@refresh_version = 0
|
||||
|
||||
# Maps
|
||||
@sockets_to_pools = {}
|
||||
@ -301,6 +303,36 @@ module Mongo
|
||||
end
|
||||
end
|
||||
|
||||
def get_local_reader
|
||||
Thread.current[:connections] ||= {}
|
||||
Thread.current[:connections][self.object_id] ||= {}
|
||||
Thread.current[:connections][self.object_id][:version] ||= self.refresh_version
|
||||
Thread.current[:connections][self.object_id][:reader] ||= checkout_reader
|
||||
end
|
||||
|
||||
def get_local_writer
|
||||
Thread.current[:connections] ||= {}
|
||||
Thread.current[:connections][self.object_id] ||= {}
|
||||
Thread.current[:connections][self.object_id][:version] ||= self.refresh_version
|
||||
Thread.current[:connections][self.object_id][:writer] ||= checkout_writer
|
||||
end
|
||||
|
||||
# Used to close, check in, or refresh sockets held
|
||||
# in thread-local variables.
|
||||
def local_socket_done
|
||||
if Thread.current[:connections][self.object_id][:version] != self.refresh_version
|
||||
checkin(Thread.current[:connections][self.object_id][:reader])
|
||||
Thread.current[:connections][self.object_id][:reader] ||= checkout_reader
|
||||
end
|
||||
end
|
||||
|
||||
def checkin_sockets
|
||||
checkin(Thread.current[:connections][self.object_id][:reader])
|
||||
checkin(Thread.current[:connections][self.object_id][:writer])
|
||||
Thread.current[:connections][self.object_id][:writer] = nil
|
||||
Thread.current[:connections][self.object_id][:reader] = nil
|
||||
end
|
||||
|
||||
# Checkout a socket for reading (i.e., a secondary node).
|
||||
# Note that @read_pool might point to the primary pool
|
||||
# if no read pool has been defined.
|
||||
@ -480,6 +512,7 @@ module Mongo
|
||||
@manager = new_manager
|
||||
@seeds = @manager.seeds.dup
|
||||
@sockets_to_pools.clear
|
||||
@refresh_version += 1
|
||||
old_manager.close if old_manager
|
||||
end
|
||||
|
||||
|
@ -58,3 +58,28 @@ class String
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
#:nodoc:
|
||||
class Class
|
||||
def mongo_thread_local_accessor name, options = {}
|
||||
m = Module.new
|
||||
m.module_eval do
|
||||
class_variable_set :"@@#{name}", Hash.new {|h,k| h[k] = options[:default] }
|
||||
end
|
||||
m.module_eval %{
|
||||
|
||||
def #{name}
|
||||
@@#{name}[Thread.current.object_id]
|
||||
end
|
||||
|
||||
def #{name}=(val)
|
||||
@@#{name}[Thread.current.object_id] = val
|
||||
end
|
||||
}
|
||||
|
||||
class_eval do
|
||||
include m
|
||||
extend m
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -21,7 +21,8 @@ module Mongo
|
||||
MAX_PING_TIME = 1_000_000
|
||||
|
||||
attr_accessor :host, :port, :address,
|
||||
:size, :timeout, :safe, :checked_out, :connection
|
||||
:size, :timeout, :safe, :checked_out, :connection,
|
||||
:sockets_low
|
||||
|
||||
# Create a new pool of connections.
|
||||
def initialize(connection, host, port, opts={})
|
||||
@ -36,7 +37,7 @@ module Mongo
|
||||
@address = "#{@host}:#{@port}"
|
||||
|
||||
# Pool size and timeout.
|
||||
@size = opts[:size] || 10
|
||||
@size = opts[:size] || 10000
|
||||
@timeout = opts[:timeout] || 5.0
|
||||
|
||||
# Mutex for synchronizing pool access
|
||||
@ -48,9 +49,11 @@ module Mongo
|
||||
# Operations to perform on a socket
|
||||
@socket_ops = Hash.new { |h, k| h[k] = [] }
|
||||
|
||||
@sockets_low = true
|
||||
@sockets = []
|
||||
@pids = {}
|
||||
@checked_out = []
|
||||
@threads = {}
|
||||
@ping_time = nil
|
||||
@last_ping = nil
|
||||
@closed = false
|
||||
@ -76,6 +79,10 @@ module Mongo
|
||||
@closed
|
||||
end
|
||||
|
||||
def sockets_low?
|
||||
@sockets_low
|
||||
end
|
||||
|
||||
def inspect
|
||||
"#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " +
|
||||
"@ping_time=#{@ping_time} #{@checked_out.size}/#{@size} sockets available.>"
|
||||
@ -138,7 +145,9 @@ module Mongo
|
||||
# Return a socket to the pool.
|
||||
def checkin(socket)
|
||||
@connection_mutex.synchronize do
|
||||
puts "deleting #{socket}, size: #{@checked_out.size}"
|
||||
@checked_out.delete(socket)
|
||||
puts "size now: #{@checked_out.size}"
|
||||
@queue.signal
|
||||
end
|
||||
true
|
||||
@ -166,6 +175,7 @@ module Mongo
|
||||
@sockets << socket
|
||||
@pids[socket] = Process.pid
|
||||
@checked_out << socket
|
||||
@threads[socket] = Thread.current.object_id
|
||||
socket
|
||||
end
|
||||
|
||||
@ -211,10 +221,36 @@ module Mongo
|
||||
checkout_new_socket
|
||||
else
|
||||
@checked_out << socket
|
||||
@threads[socket] = Thread.current.object_id
|
||||
socket
|
||||
end
|
||||
end
|
||||
|
||||
def cleanup
|
||||
return unless @sockets.size > @size
|
||||
puts "-----CLEANUP*****"
|
||||
alive = {}
|
||||
Thread.list.each do |t|
|
||||
if t.alive?
|
||||
alive[t.object_id] = true
|
||||
end
|
||||
end
|
||||
|
||||
@checked_out.each do |socket|
|
||||
if !alive[@threads[socket]]
|
||||
@checked_out.delete(socket)
|
||||
if @sockets.size > @size
|
||||
puts "CLEANING: #{socket}"
|
||||
socket.close
|
||||
@sockets.delete(socket)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
|
||||
end
|
||||
|
||||
# Check out an existing socket or create a new socket if the maximum
|
||||
# pool size has not been exceeded. Otherwise, wait for the next
|
||||
# available socket.
|
||||
@ -228,21 +264,28 @@ module Mongo
|
||||
"consider increasing the pool size or timeout."
|
||||
end
|
||||
|
||||
puts "CHECKING OUT"
|
||||
#@sockets_low = @checked_out.size > @size / 2
|
||||
@connection_mutex.synchronize do
|
||||
if @sockets.size > 0.7 * @size
|
||||
@sockets_low = true
|
||||
else
|
||||
@sockets_low = false
|
||||
end
|
||||
socket = if @checked_out.size < @sockets.size
|
||||
p "checkout existing from size #{@sockets.size}"
|
||||
checkout_existing_socket
|
||||
elsif @sockets.size < @size
|
||||
else
|
||||
checkout_new_socket
|
||||
end
|
||||
|
||||
if socket
|
||||
|
||||
# This calls all procs, in order, scoped to existing sockets.
|
||||
# At the moment, we use this to lazily authenticate and
|
||||
# logout existing socket connections.
|
||||
@socket_ops[socket].reject! do |op|
|
||||
op.call
|
||||
end
|
||||
# This calls all procs, in order, scoped to existing sockets.
|
||||
# At the moment, we use this to lazily authenticate and
|
||||
# logout existing socket connections.
|
||||
@socket_ops[socket].reject! do |op|
|
||||
op.call
|
||||
end
|
||||
|
||||
return socket
|
||||
else
|
||||
|
@ -4,7 +4,7 @@ class TestThreading < Test::Unit::TestCase
|
||||
|
||||
include Mongo
|
||||
|
||||
@@db = standard_connection(:pool_size => 1, :timeout => 30).db(MONGO_TEST_DB)
|
||||
@@db = standard_connection(:pool_size => 10, :timeout => 30).db(MONGO_TEST_DB)
|
||||
@@coll = @@db.collection('thread-test-collection')
|
||||
|
||||
def set_up_safe_data
|
||||
@ -21,16 +21,23 @@ class TestThreading < Test::Unit::TestCase
|
||||
end
|
||||
|
||||
def test_safe_update
|
||||
times = []
|
||||
set_up_safe_data
|
||||
threads = []
|
||||
100.times do |i|
|
||||
threads[i] = Thread.new do
|
||||
if i % 2 == 0
|
||||
assert_raise Mongo::OperationFailure do
|
||||
@unique.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true)
|
||||
10.times do
|
||||
if i % 2 == 0
|
||||
assert_raise Mongo::OperationFailure do
|
||||
t1 = Time.now
|
||||
@unique.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true)
|
||||
times << Time.now - t1
|
||||
end
|
||||
else
|
||||
t1 = Time.now
|
||||
@duplicate.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true)
|
||||
times << Time.now - t1
|
||||
end
|
||||
else
|
||||
@duplicate.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user