RUBY-416 thread affinity for Mongo::Pool
This commit is contained in:
parent
8e64c74d7d
commit
29cc4b20e2
@ -19,6 +19,7 @@ module Mongo
|
|||||||
class Pool
|
class Pool
|
||||||
PING_ATTEMPTS = 6
|
PING_ATTEMPTS = 6
|
||||||
MAX_PING_TIME = 1_000_000
|
MAX_PING_TIME = 1_000_000
|
||||||
|
PRUNE_INTERVAL = 1000
|
||||||
|
|
||||||
attr_accessor :host, :port, :address,
|
attr_accessor :host, :port, :address,
|
||||||
:size, :timeout, :safe, :checked_out, :connection
|
:size, :timeout, :safe, :checked_out, :connection
|
||||||
@ -36,8 +37,8 @@ module Mongo
|
|||||||
@address = "#{@host}:#{@port}"
|
@address = "#{@host}:#{@port}"
|
||||||
|
|
||||||
# Pool size and timeout.
|
# Pool size and timeout.
|
||||||
@size = opts[:size] || 10000
|
@size = opts.fetch(:size, 20)
|
||||||
@timeout = opts[:timeout] || 5.0
|
@timeout = opts.fetch(:timeout, 30)
|
||||||
|
|
||||||
# Mutex for synchronizing pool access
|
# Mutex for synchronizing pool access
|
||||||
@connection_mutex = Mutex.new
|
@connection_mutex = Mutex.new
|
||||||
@ -51,11 +52,11 @@ module Mongo
|
|||||||
@sockets = []
|
@sockets = []
|
||||||
@pids = {}
|
@pids = {}
|
||||||
@checked_out = []
|
@checked_out = []
|
||||||
@threads = {}
|
|
||||||
@ping_time = nil
|
@ping_time = nil
|
||||||
@last_ping = nil
|
@last_ping = nil
|
||||||
@closed = false
|
@closed = false
|
||||||
@last_pruning = Time.now
|
@threads_to_sockets = {}
|
||||||
|
@checkout_counter = 0
|
||||||
end
|
end
|
||||||
|
|
||||||
# Close this pool.
|
# Close this pool.
|
||||||
@ -179,7 +180,7 @@ module Mongo
|
|||||||
@sockets << socket
|
@sockets << socket
|
||||||
@pids[socket] = Process.pid
|
@pids[socket] = Process.pid
|
||||||
@checked_out << socket
|
@checked_out << socket
|
||||||
@threads[socket] = Thread.current.object_id
|
@threads_to_sockets[Thread.current] = socket
|
||||||
socket
|
socket
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -216,8 +217,11 @@ module Mongo
|
|||||||
#
|
#
|
||||||
# This method is called exclusively from #checkout;
|
# This method is called exclusively from #checkout;
|
||||||
# therefore, it runs within a mutex.
|
# therefore, it runs within a mutex.
|
||||||
def checkout_existing_socket
|
def checkout_existing_socket(socket=nil)
|
||||||
|
if !socket
|
||||||
socket = (@sockets - @checked_out).first
|
socket = (@sockets - @checked_out).first
|
||||||
|
end
|
||||||
|
|
||||||
if @pids[socket] != Process.pid
|
if @pids[socket] != Process.pid
|
||||||
@pids[socket] = nil
|
@pids[socket] = nil
|
||||||
@sockets.delete(socket)
|
@sockets.delete(socket)
|
||||||
@ -225,21 +229,21 @@ module Mongo
|
|||||||
checkout_new_socket
|
checkout_new_socket
|
||||||
else
|
else
|
||||||
@checked_out << socket
|
@checked_out << socket
|
||||||
@threads[socket] = Thread.current.object_id
|
@threads_to_sockets[Thread.current] = socket
|
||||||
socket
|
socket
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# If we have more sockets than the soft limit specified
|
def prune_thread_socket_hash
|
||||||
# by the max pool size, then we should prune those
|
map = {}
|
||||||
# extraneous sockets.
|
Thread.list.each do |t|
|
||||||
#
|
map[t] = 1
|
||||||
# Note: this must be called from within a mutex.
|
end
|
||||||
def prune
|
|
||||||
idle_sockets = @sockets - @checked_out
|
@threads_to_sockets.keys.each do |key|
|
||||||
idle_sockets.each do |socket|
|
if !map[key]
|
||||||
socket.close unless socket.closed?
|
@threads_to_sockets.delete(key)
|
||||||
@sockets.delete(socket)
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -257,14 +261,26 @@ module Mongo
|
|||||||
end
|
end
|
||||||
|
|
||||||
@connection_mutex.synchronize do
|
@connection_mutex.synchronize do
|
||||||
if @sockets.size > @size * 1.5
|
if @checkout_counter > PRUNE_INTERVAL
|
||||||
prune
|
@checkout_counter = 0
|
||||||
|
prune_thread_socket_hash
|
||||||
|
else
|
||||||
|
@checkout_counter += 1
|
||||||
end
|
end
|
||||||
|
|
||||||
socket = if @checked_out.size < @sockets.size
|
if socket_for_thread = @threads_to_sockets[Thread.current]
|
||||||
checkout_existing_socket
|
if !@checked_out.include?(socket_for_thread)
|
||||||
else
|
socket = checkout_existing_socket(socket_for_thread)
|
||||||
checkout_new_socket
|
end
|
||||||
|
else # First checkout for this thread
|
||||||
|
thread_length = @threads_to_sockets.keys.length
|
||||||
|
if (thread_length <= @sockets.size) && (@sockets.size < @size)
|
||||||
|
socket = checkout_new_socket
|
||||||
|
elsif @checked_out.size < @sockets.size
|
||||||
|
socket = checkout_existing_socket
|
||||||
|
elsif @sockets.size < @size
|
||||||
|
socket = checkout_new_socket
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
if socket
|
if socket
|
||||||
|
57
test/pool_test.rb
Normal file
57
test/pool_test.rb
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
require './test/test_helper'
|
||||||
|
require 'thread'
|
||||||
|
|
||||||
|
class PoolTest < Test::Unit::TestCase
|
||||||
|
include Mongo
|
||||||
|
|
||||||
|
def setup
|
||||||
|
@connection = standard_connection
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_pool_affinity
|
||||||
|
@pool = Pool.new(@connection, TEST_HOST, TEST_PORT, :size => 5)
|
||||||
|
|
||||||
|
@threads = []
|
||||||
|
@sockets = []
|
||||||
|
|
||||||
|
10.times do
|
||||||
|
@threads << Thread.new do
|
||||||
|
original_socket = @pool.checkout
|
||||||
|
@sockets << original_socket
|
||||||
|
@pool.checkin(original_socket)
|
||||||
|
5000.times do
|
||||||
|
socket = @pool.checkout
|
||||||
|
assert_equal original_socket, socket
|
||||||
|
@pool.checkin(socket)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@threads.each { |t| t.join }
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_pool_thread_pruning
|
||||||
|
@pool = Pool.new(@connection, TEST_HOST, TEST_PORT, :size => 5)
|
||||||
|
|
||||||
|
@threads = []
|
||||||
|
|
||||||
|
10.times do
|
||||||
|
@threads << Thread.new do
|
||||||
|
50.times do
|
||||||
|
socket = @pool.checkout
|
||||||
|
@pool.checkin(socket)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@threads.each { |t| t.join }
|
||||||
|
assert_equal 10, @pool.instance_variable_get(:@threads_to_sockets).size
|
||||||
|
|
||||||
|
# Thread-socket pool
|
||||||
|
1000.times do
|
||||||
|
@pool.checkin(@pool.checkout)
|
||||||
|
end
|
||||||
|
|
||||||
|
assert_equal 1, @pool.instance_variable_get(:@threads_to_sockets).size
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in New Issue
Block a user