diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 0372c95..fdf7041 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -31,7 +31,7 @@ module Mongo STANDARD_HEADER_SIZE = 16 RESPONSE_HEADER_SIZE = 20 - attr_reader :logger, :size, :host, :port, :nodes, :sockets, :checked_out, :reserved_connections + attr_reader :logger, :size, :host, :port, :nodes, :sockets, :checked_out def slave_ok? @slave_ok @@ -108,9 +108,6 @@ module Mongo @size = options[:pool_size] || 1 @timeout = options[:timeout] || 1.0 - # Cache of reserved sockets mapped to threads - @reserved_connections = {} - # Mutex for synchronizing pool access @connection_mutex = Monitor.new @@ -293,49 +290,24 @@ module Mongo @host = @port = nil @sockets.clear @checked_out.clear - @reserved_connections.clear end private # Get a socket from the pool, mapped to the current thread. def checkout - #return @socket ||= checkout_new_socket if @size == 1 - if sock = @reserved_connections[Thread.current.object_id] - sock - else - sock = obtain_socket - @reserved_connections[Thread.current.object_id] = sock - end - sock + obtain_socket end # Return a socket to the pool. def checkin(socket) @connection_mutex.synchronize do - @reserved_connections.delete Thread.current.object_id @checked_out.delete(socket) @queue.signal end true end - # Releases the connection for any dead threads. - # Called when the connection pool grows too large to free up more sockets. - def clear_stale_cached_connections! - keys = @reserved_connections.keys - - Thread.list.each do |thread| - keys.delete(thread.object_id) if thread.alive? - end - - keys.each do |key| - next unless @reserved_connections.has_key?(key) - checkin(@reserved_connections[key]) - @reserved_connections.delete(key) - end - end - # Adds a new socket to the pool and checks it out. # # This method is called exclusively from #obtain_socket; @@ -366,10 +338,16 @@ module Mongo # pool size has not been exceeded. Otherwise, wait for the next # available socket. def obtain_socket - @connection_mutex.synchronize do - connect_to_master if !connected? + connect_to_master if !connected? + start_time = Time.now + loop do + if (Time.now - start_time) > 30 + raise ConnectionTimeoutError, "could not obtain connection within " + + "#{@timeout} seconds. The max pool size is currently #{@size}; " + + "consider increasing it." + end - loop do + @connection_mutex.synchronize do socket = if @checked_out.size < @sockets.size checkout_existing_socket elsif @sockets.size < @size @@ -377,26 +355,9 @@ module Mongo end return socket if socket - - # Try to clear out any stale threads to free up some connections - clear_stale_cached_connections! - next if @checked_out.size < @sockets.size - - # Otherwise, wait. - if wait - next - else - - # Try to clear stale threads once more before failing. - clear_stale_cached_connections! - if @size == @sockets.size - raise ConnectionTimeoutError, "could not obtain connection within " + - "#{@timeout} seconds. The max pool size is currently #{@size}; " + - "consider increasing it." - end - end # if - end # loop - end # synchronize + wait + end + end end if RUBY_VERSION >= '1.9' diff --git a/test/threading/test_threading_large_pool.rb b/test/threading/test_threading_large_pool.rb index 0cfe6e5..037ebdc 100644 --- a/test/threading/test_threading_large_pool.rb +++ b/test/threading/test_threading_large_pool.rb @@ -6,7 +6,7 @@ class TestThreadingLargePool < Test::Unit::TestCase include Mongo - @@db = Connection.new('localhost', 27017, :pool_size => 50, :timeout => 15).db('ruby-mongo-test') + @@db = Connection.new('localhost', 27017, :pool_size => 50, :timeout => 1).db('ruby-mongo-test') @@coll = @@db.collection('thread-test-collection') def set_up_safe_data @@ -25,7 +25,7 @@ class TestThreadingLargePool < Test::Unit::TestCase def test_safe_update set_up_safe_data threads = [] - 100.times do |i| + 1000.times do |i| threads[i] = Thread.new do if i % 2 == 0 assert_raise Mongo::OperationFailure do @@ -37,7 +37,7 @@ class TestThreadingLargePool < Test::Unit::TestCase end end - 100.times do |i| + 1000.times do |i| threads[i].join end end @@ -45,7 +45,7 @@ class TestThreadingLargePool < Test::Unit::TestCase def test_safe_insert set_up_safe_data threads = [] - 100.times do |i| + 1000.times do |i| threads[i] = Thread.new do if i % 2 == 0 assert_raise Mongo::OperationFailure do @@ -57,7 +57,7 @@ class TestThreadingLargePool < Test::Unit::TestCase end end - 100.times do |i| + 1000.times do |i| threads[i].join end end diff --git a/test/unit/connection_test.rb b/test/unit/connection_test.rb index 9ba1dd5..cc5e8d7 100644 --- a/test/unit/connection_test.rb +++ b/test/unit/connection_test.rb @@ -12,11 +12,6 @@ class ConnectionTest < Test::Unit::TestCase db = Object.new end - # Make a few methods public for these tests. - class Mongo::Connection - public :checkin, :checkout, :clear_stale_cached_connections! - end - context "Initialization: " do context "given a single node" do @@ -60,79 +55,5 @@ class ConnectionTest < Test::Unit::TestCase assert_equal 2500, @conn.port end end - - context "Connection pooling: " do - setup do - TCPSocket.stubs(:new).returns(new_mock_socket) - @conn = Connection.new('localhost', 27107, :connect => false, - :pool_size => 3) - - admin_db = new_mock_db - admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1}) - @conn.expects(:[]).with('admin').returns(admin_db) - @conn.connect_to_master - end - - should "check out a new connection" do - socket = @conn.checkout - assert @conn.reserved_connections.keys.include?(Thread.current.object_id) - end - - context "with multiple threads" do - setup do - @thread1 = Object.new - @thread2 = Object.new - @thread3 = Object.new - @thread4 = Object.new - - Thread.stubs(:current).returns(@thread1) - @socket1 = @conn.checkout - Thread.stubs(:current).returns(@thread2) - @socket2 = @conn.checkout - Thread.stubs(:current).returns(@thread3) - @socket3 = @conn.checkout - end - - should "add each thread to the reserved pool" do - assert @conn.reserved_connections.keys.include?(@thread1.object_id) - assert @conn.reserved_connections.keys.include?(@thread2.object_id) - assert @conn.reserved_connections.keys.include?(@thread3.object_id) - end - - should "only add one socket per thread" do - @conn.reserved_connections.values do |socket| - assert [@socket1, @socket2, @socket3].include?(socket) - end - end - - should "check out all sockets" do - assert_equal @conn.sockets.size, @conn.checked_out.size - @conn.sockets.each do |sock| - assert @conn.checked_out.include?(sock) - end - end - - should "raise an error if no more sockets can be checked out" do - # This can't be tested with mock threads. - # Will test in integration tests. - end - - context "when releasing dead threads" do - setup do - @thread1.expects(:alive?).returns(false) - @thread2.expects(:alive?).returns(true) - @thread3.expects(:alive?).returns(true) - Thread.expects(:list).returns([@thread1, @thread2, @thread3]) - @conn.clear_stale_cached_connections! - end - - should "return connections for dead threads" do - assert !@conn.checked_out.include?(@socket1) - assert_nil @conn.reserved_connections[@thread1.object_id] - end - end - - end - end end