Prune sockets above max sockets per pool,
and close sockets associated with dead threads.
This commit is contained in:
parent
f224df45aa
commit
dc4be1afc7
@ -20,6 +20,8 @@ module Mongo
|
||||
|
||||
# Instantiates and manages connections to a MongoDB replica set.
|
||||
class ReplSetConnection < Connection
|
||||
CLEANUP_INTERVAL = 300
|
||||
|
||||
attr_reader :replica_set_name, :seeds, :refresh_interval, :refresh_mode,
|
||||
:refresh_version
|
||||
|
||||
@ -128,6 +130,7 @@ module Mongo
|
||||
|
||||
# Maps
|
||||
@sockets_to_pools = {}
|
||||
@threads_to_sockets = Hash.new { |h, k| h[k] = Hash.new }
|
||||
@tag_map = nil
|
||||
|
||||
# Replica set name
|
||||
@ -269,6 +272,7 @@ module Mongo
|
||||
|
||||
@manager.close if @manager
|
||||
@sockets_to_pools.clear
|
||||
@threads_to_sockets.clear
|
||||
end
|
||||
|
||||
# If a ConnectionFailure is raised, this method will be called
|
||||
@ -309,16 +313,16 @@ module Mongo
|
||||
self.connections ||= {}
|
||||
self.connections[self.object_id] ||= {}
|
||||
socket = self.connections[self.object_id][:reader] ||= checkout_reader
|
||||
threads_to_sockets[Thread.current] ||= {}
|
||||
threads_to_sockets[Thread.current][:reader] = socket
|
||||
@threads_to_sockets[Thread.current][:reader] = socket
|
||||
socket
|
||||
end
|
||||
|
||||
def get_local_writer
|
||||
self.connections ||= {}
|
||||
self.connections[self.object_id] ||= {}
|
||||
self.connections[self.object_id][:writer] ||= checkout_writer
|
||||
threads_to_sockets[Thread.current] ||= {}
|
||||
threads_to_sockets[Thread.current][:reader] = socket
|
||||
socket = self.connections[self.object_id][:writer] ||= checkout_writer
|
||||
@threads_to_sockets[Thread.current][:writer] = socket
|
||||
socket
|
||||
end
|
||||
|
||||
# Used to close, check in, or refresh sockets held
|
||||
@ -333,12 +337,26 @@ module Mongo
|
||||
end
|
||||
|
||||
if self.connections[self.object_id][:writer] == socket
|
||||
if self.primary_pool && (self.primary_pool.sockets_low? ||
|
||||
self.primary_pool != @sockets_to_pools[socket])
|
||||
if self.primary_pool &&
|
||||
(self.primary_pool.sockets_low? ||
|
||||
self.primary_pool != @sockets_to_pools[socket])
|
||||
checkin(socket)
|
||||
self.connections[self.object_id][:writer] = nil
|
||||
end
|
||||
end
|
||||
|
||||
if (Time.now - @last_cleanup) > CLEANUP_INTERVAL &&
|
||||
@cleanup_lock.try_lock
|
||||
@threads_to_sockets.each do |thread, sockets|
|
||||
if !thread.alive?
|
||||
checkin(sockets[:reader])
|
||||
checkin(sockets[:writer])
|
||||
@threads_to_sockets.delete(thread)
|
||||
end
|
||||
end
|
||||
|
||||
@cleanup_lock.unlock
|
||||
end
|
||||
end
|
||||
|
||||
# Checkout a socket for reading (i.e., a secondary node).
|
||||
@ -506,6 +524,10 @@ module Mongo
|
||||
|
||||
@logger = opts[:logger] || nil
|
||||
|
||||
# Clean up connections to dead threads.
|
||||
@last_cleanup = Time.now
|
||||
@cleanup_lock = Mutex.new
|
||||
|
||||
if @logger
|
||||
@logger.debug("MongoDB logging. Please note that logging negatively impacts performance " +
|
||||
"and should be disabled for high-performance production apps.")
|
||||
|
@ -17,8 +17,9 @@
|
||||
|
||||
module Mongo
|
||||
class Pool
|
||||
PING_ATTEMPTS = 6
|
||||
MAX_PING_TIME = 1_000_000
|
||||
PRUNE_INTERVAL = 300
|
||||
PING_ATTEMPTS = 6
|
||||
MAX_PING_TIME = 1_000_000
|
||||
|
||||
attr_accessor :host, :port, :address,
|
||||
:size, :timeout, :safe, :checked_out, :connection,
|
||||
@ -57,6 +58,7 @@ module Mongo
|
||||
@ping_time = nil
|
||||
@last_ping = nil
|
||||
@closed = false
|
||||
@last_pruning = Time.now
|
||||
end
|
||||
|
||||
# Close this pool.
|
||||
@ -154,9 +156,7 @@ module Mongo
|
||||
# Return a socket to the pool.
|
||||
def checkin(socket)
|
||||
@connection_mutex.synchronize do
|
||||
puts "deleting #{socket}, size: #{@checked_out.size}"
|
||||
@checked_out.delete(socket)
|
||||
puts "size now: #{@checked_out.size}"
|
||||
@queue.signal
|
||||
end
|
||||
true
|
||||
@ -168,7 +168,6 @@ module Mongo
|
||||
# therefore, it runs within a mutex.
|
||||
def checkout_new_socket
|
||||
begin
|
||||
puts "Creating connection in pool to #{@host}:#{@port}"
|
||||
socket = self.connection.socket_class.new(@host, @port)
|
||||
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
||||
rescue => ex
|
||||
@ -235,6 +234,21 @@ module Mongo
|
||||
end
|
||||
end
|
||||
|
||||
# If we have more sockets than the soft limit specified
|
||||
# by the max pool size, then we should prune those
|
||||
# extraneous sockets.
|
||||
#
|
||||
# Note: this must be called from within a mutex.
|
||||
def prune
|
||||
surplus = @size - @sockets.size
|
||||
return if surplus <= 0
|
||||
idle_sockets = @sockets - @checked_out
|
||||
[surplus, idle_sockets.length].min.times do |n|
|
||||
idle_sockets[n].close
|
||||
@sockets.delete(idle_sockets[n])
|
||||
end
|
||||
end
|
||||
|
||||
# Check out an existing socket or create a new socket if the maximum
|
||||
# pool size has not been exceeded. Otherwise, wait for the next
|
||||
# available socket.
|
||||
@ -248,16 +262,19 @@ module Mongo
|
||||
"consider increasing the pool size or timeout."
|
||||
end
|
||||
|
||||
puts "CHECKING OUT"
|
||||
#@sockets_low = @checked_out.size > @size / 2
|
||||
@connection_mutex.synchronize do
|
||||
if @sockets.size > 0.7 * @size
|
||||
@sockets_low = true
|
||||
else
|
||||
@sockets_low = false
|
||||
end
|
||||
|
||||
if (Time.now - @last_pruning) > PRUNE_INTERVAL
|
||||
prune
|
||||
@last_pruning = Time.now
|
||||
end
|
||||
|
||||
socket = if @checked_out.size < @sockets.size
|
||||
p "checkout existing from size #{@sockets.size}"
|
||||
checkout_existing_socket
|
||||
else
|
||||
checkout_new_socket
|
||||
|
@ -26,7 +26,7 @@ class TestThreading < Test::Unit::TestCase
|
||||
threads = []
|
||||
100.times do |i|
|
||||
threads[i] = Thread.new do
|
||||
10.times do
|
||||
100.times do
|
||||
if i % 2 == 0
|
||||
assert_raise Mongo::OperationFailure do
|
||||
t1 = Time.now
|
||||
|
Loading…
Reference in New Issue
Block a user