From 29cc4b20e2ecaebcf8265384691dd52ce222edeb Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Mon, 27 Feb 2012 16:16:05 -0500 Subject: [PATCH] RUBY-416 thread affinity for Mongo::Pool --- lib/mongo/util/pool.rb | 66 ++++++++++++++++++++++++++---------------- test/pool_test.rb | 57 ++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 25 deletions(-) create mode 100644 test/pool_test.rb diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index 72d6551..52782a8 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -19,6 +19,7 @@ module Mongo class Pool PING_ATTEMPTS = 6 MAX_PING_TIME = 1_000_000 + PRUNE_INTERVAL = 1000 attr_accessor :host, :port, :address, :size, :timeout, :safe, :checked_out, :connection @@ -36,8 +37,8 @@ module Mongo @address = "#{@host}:#{@port}" # Pool size and timeout. - @size = opts[:size] || 10000 - @timeout = opts[:timeout] || 5.0 + @size = opts.fetch(:size, 20) + @timeout = opts.fetch(:timeout, 30) # Mutex for synchronizing pool access @connection_mutex = Mutex.new @@ -51,11 +52,11 @@ module Mongo @sockets = [] @pids = {} @checked_out = [] - @threads = {} @ping_time = nil @last_ping = nil @closed = false - @last_pruning = Time.now + @threads_to_sockets = {} + @checkout_counter = 0 end # Close this pool. @@ -179,7 +180,7 @@ module Mongo @sockets << socket @pids[socket] = Process.pid @checked_out << socket - @threads[socket] = Thread.current.object_id + @threads_to_sockets[Thread.current] = socket socket end @@ -216,8 +217,11 @@ module Mongo # # This method is called exclusively from #checkout; # therefore, it runs within a mutex. - def checkout_existing_socket - socket = (@sockets - @checked_out).first + def checkout_existing_socket(socket=nil) + if !socket + socket = (@sockets - @checked_out).first + end + if @pids[socket] != Process.pid @pids[socket] = nil @sockets.delete(socket) @@ -225,21 +229,21 @@ module Mongo checkout_new_socket else @checked_out << socket - @threads[socket] = Thread.current.object_id + @threads_to_sockets[Thread.current] = socket socket end end - # If we have more sockets than the soft limit specified - # by the max pool size, then we should prune those - # extraneous sockets. - # - # Note: this must be called from within a mutex. - def prune - idle_sockets = @sockets - @checked_out - idle_sockets.each do |socket| - socket.close unless socket.closed? - @sockets.delete(socket) + def prune_thread_socket_hash + map = {} + Thread.list.each do |t| + map[t] = 1 + end + + @threads_to_sockets.keys.each do |key| + if !map[key] + @threads_to_sockets.delete(key) + end end end @@ -257,15 +261,27 @@ module Mongo end @connection_mutex.synchronize do - if @sockets.size > @size * 1.5 - prune + if @checkout_counter > PRUNE_INTERVAL + @checkout_counter = 0 + prune_thread_socket_hash + else + @checkout_counter += 1 end - socket = if @checked_out.size < @sockets.size - checkout_existing_socket - else - checkout_new_socket - end + if socket_for_thread = @threads_to_sockets[Thread.current] + if !@checked_out.include?(socket_for_thread) + socket = checkout_existing_socket(socket_for_thread) + end + else # First checkout for this thread + thread_length = @threads_to_sockets.keys.length + if (thread_length <= @sockets.size) && (@sockets.size < @size) + socket = checkout_new_socket + elsif @checked_out.size < @sockets.size + socket = checkout_existing_socket + elsif @sockets.size < @size + socket = checkout_new_socket + end + end if socket # This calls all procs, in order, scoped to existing sockets. diff --git a/test/pool_test.rb b/test/pool_test.rb new file mode 100644 index 0000000..3428517 --- /dev/null +++ b/test/pool_test.rb @@ -0,0 +1,57 @@ +require './test/test_helper' +require 'thread' + +class PoolTest < Test::Unit::TestCase + include Mongo + + def setup + @connection = standard_connection + end + + def test_pool_affinity + @pool = Pool.new(@connection, TEST_HOST, TEST_PORT, :size => 5) + + @threads = [] + @sockets = [] + + 10.times do + @threads << Thread.new do + original_socket = @pool.checkout + @sockets << original_socket + @pool.checkin(original_socket) + 5000.times do + socket = @pool.checkout + assert_equal original_socket, socket + @pool.checkin(socket) + end + end + end + + @threads.each { |t| t.join } + end + + def test_pool_thread_pruning + @pool = Pool.new(@connection, TEST_HOST, TEST_PORT, :size => 5) + + @threads = [] + + 10.times do + @threads << Thread.new do + 50.times do + socket = @pool.checkout + @pool.checkin(socket) + end + end + end + + @threads.each { |t| t.join } + assert_equal 10, @pool.instance_variable_get(:@threads_to_sockets).size + + # Thread-socket pool + 1000.times do + @pool.checkin(@pool.checkout) + end + + assert_equal 1, @pool.instance_variable_get(:@threads_to_sockets).size + end +end