simplified connection pooling
This commit is contained in:
parent
c15e8c2d7d
commit
5c1b3aed0f
|
@ -31,7 +31,7 @@ module Mongo
|
||||||
STANDARD_HEADER_SIZE = 16
|
STANDARD_HEADER_SIZE = 16
|
||||||
RESPONSE_HEADER_SIZE = 20
|
RESPONSE_HEADER_SIZE = 20
|
||||||
|
|
||||||
attr_reader :logger, :size, :host, :port, :nodes, :sockets, :checked_out, :reserved_connections
|
attr_reader :logger, :size, :host, :port, :nodes, :sockets, :checked_out
|
||||||
|
|
||||||
def slave_ok?
|
def slave_ok?
|
||||||
@slave_ok
|
@slave_ok
|
||||||
|
@ -108,9 +108,6 @@ module Mongo
|
||||||
@size = options[:pool_size] || 1
|
@size = options[:pool_size] || 1
|
||||||
@timeout = options[:timeout] || 1.0
|
@timeout = options[:timeout] || 1.0
|
||||||
|
|
||||||
# Cache of reserved sockets mapped to threads
|
|
||||||
@reserved_connections = {}
|
|
||||||
|
|
||||||
# Mutex for synchronizing pool access
|
# Mutex for synchronizing pool access
|
||||||
@connection_mutex = Monitor.new
|
@connection_mutex = Monitor.new
|
||||||
|
|
||||||
|
@ -293,49 +290,24 @@ module Mongo
|
||||||
@host = @port = nil
|
@host = @port = nil
|
||||||
@sockets.clear
|
@sockets.clear
|
||||||
@checked_out.clear
|
@checked_out.clear
|
||||||
@reserved_connections.clear
|
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
# Get a socket from the pool, mapped to the current thread.
|
# Get a socket from the pool, mapped to the current thread.
|
||||||
def checkout
|
def checkout
|
||||||
#return @socket ||= checkout_new_socket if @size == 1
|
obtain_socket
|
||||||
if sock = @reserved_connections[Thread.current.object_id]
|
|
||||||
sock
|
|
||||||
else
|
|
||||||
sock = obtain_socket
|
|
||||||
@reserved_connections[Thread.current.object_id] = sock
|
|
||||||
end
|
|
||||||
sock
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# Return a socket to the pool.
|
# Return a socket to the pool.
|
||||||
def checkin(socket)
|
def checkin(socket)
|
||||||
@connection_mutex.synchronize do
|
@connection_mutex.synchronize do
|
||||||
@reserved_connections.delete Thread.current.object_id
|
|
||||||
@checked_out.delete(socket)
|
@checked_out.delete(socket)
|
||||||
@queue.signal
|
@queue.signal
|
||||||
end
|
end
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
|
|
||||||
# Releases the connection for any dead threads.
|
|
||||||
# Called when the connection pool grows too large to free up more sockets.
|
|
||||||
def clear_stale_cached_connections!
|
|
||||||
keys = @reserved_connections.keys
|
|
||||||
|
|
||||||
Thread.list.each do |thread|
|
|
||||||
keys.delete(thread.object_id) if thread.alive?
|
|
||||||
end
|
|
||||||
|
|
||||||
keys.each do |key|
|
|
||||||
next unless @reserved_connections.has_key?(key)
|
|
||||||
checkin(@reserved_connections[key])
|
|
||||||
@reserved_connections.delete(key)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# Adds a new socket to the pool and checks it out.
|
# Adds a new socket to the pool and checks it out.
|
||||||
#
|
#
|
||||||
# This method is called exclusively from #obtain_socket;
|
# This method is called exclusively from #obtain_socket;
|
||||||
|
@ -366,10 +338,16 @@ module Mongo
|
||||||
# pool size has not been exceeded. Otherwise, wait for the next
|
# pool size has not been exceeded. Otherwise, wait for the next
|
||||||
# available socket.
|
# available socket.
|
||||||
def obtain_socket
|
def obtain_socket
|
||||||
@connection_mutex.synchronize do
|
connect_to_master if !connected?
|
||||||
connect_to_master if !connected?
|
start_time = Time.now
|
||||||
|
loop do
|
||||||
|
if (Time.now - start_time) > 30
|
||||||
|
raise ConnectionTimeoutError, "could not obtain connection within " +
|
||||||
|
"#{@timeout} seconds. The max pool size is currently #{@size}; " +
|
||||||
|
"consider increasing it."
|
||||||
|
end
|
||||||
|
|
||||||
loop do
|
@connection_mutex.synchronize do
|
||||||
socket = if @checked_out.size < @sockets.size
|
socket = if @checked_out.size < @sockets.size
|
||||||
checkout_existing_socket
|
checkout_existing_socket
|
||||||
elsif @sockets.size < @size
|
elsif @sockets.size < @size
|
||||||
|
@ -377,26 +355,9 @@ module Mongo
|
||||||
end
|
end
|
||||||
|
|
||||||
return socket if socket
|
return socket if socket
|
||||||
|
wait
|
||||||
# Try to clear out any stale threads to free up some connections
|
end
|
||||||
clear_stale_cached_connections!
|
end
|
||||||
next if @checked_out.size < @sockets.size
|
|
||||||
|
|
||||||
# Otherwise, wait.
|
|
||||||
if wait
|
|
||||||
next
|
|
||||||
else
|
|
||||||
|
|
||||||
# Try to clear stale threads once more before failing.
|
|
||||||
clear_stale_cached_connections!
|
|
||||||
if @size == @sockets.size
|
|
||||||
raise ConnectionTimeoutError, "could not obtain connection within " +
|
|
||||||
"#{@timeout} seconds. The max pool size is currently #{@size}; " +
|
|
||||||
"consider increasing it."
|
|
||||||
end
|
|
||||||
end # if
|
|
||||||
end # loop
|
|
||||||
end # synchronize
|
|
||||||
end
|
end
|
||||||
|
|
||||||
if RUBY_VERSION >= '1.9'
|
if RUBY_VERSION >= '1.9'
|
||||||
|
|
|
@ -6,7 +6,7 @@ class TestThreadingLargePool < Test::Unit::TestCase
|
||||||
|
|
||||||
include Mongo
|
include Mongo
|
||||||
|
|
||||||
@@db = Connection.new('localhost', 27017, :pool_size => 50, :timeout => 15).db('ruby-mongo-test')
|
@@db = Connection.new('localhost', 27017, :pool_size => 50, :timeout => 1).db('ruby-mongo-test')
|
||||||
@@coll = @@db.collection('thread-test-collection')
|
@@coll = @@db.collection('thread-test-collection')
|
||||||
|
|
||||||
def set_up_safe_data
|
def set_up_safe_data
|
||||||
|
@ -25,7 +25,7 @@ class TestThreadingLargePool < Test::Unit::TestCase
|
||||||
def test_safe_update
|
def test_safe_update
|
||||||
set_up_safe_data
|
set_up_safe_data
|
||||||
threads = []
|
threads = []
|
||||||
100.times do |i|
|
1000.times do |i|
|
||||||
threads[i] = Thread.new do
|
threads[i] = Thread.new do
|
||||||
if i % 2 == 0
|
if i % 2 == 0
|
||||||
assert_raise Mongo::OperationFailure do
|
assert_raise Mongo::OperationFailure do
|
||||||
|
@ -37,7 +37,7 @@ class TestThreadingLargePool < Test::Unit::TestCase
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
100.times do |i|
|
1000.times do |i|
|
||||||
threads[i].join
|
threads[i].join
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -45,7 +45,7 @@ class TestThreadingLargePool < Test::Unit::TestCase
|
||||||
def test_safe_insert
|
def test_safe_insert
|
||||||
set_up_safe_data
|
set_up_safe_data
|
||||||
threads = []
|
threads = []
|
||||||
100.times do |i|
|
1000.times do |i|
|
||||||
threads[i] = Thread.new do
|
threads[i] = Thread.new do
|
||||||
if i % 2 == 0
|
if i % 2 == 0
|
||||||
assert_raise Mongo::OperationFailure do
|
assert_raise Mongo::OperationFailure do
|
||||||
|
@ -57,7 +57,7 @@ class TestThreadingLargePool < Test::Unit::TestCase
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
100.times do |i|
|
1000.times do |i|
|
||||||
threads[i].join
|
threads[i].join
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -12,11 +12,6 @@ class ConnectionTest < Test::Unit::TestCase
|
||||||
db = Object.new
|
db = Object.new
|
||||||
end
|
end
|
||||||
|
|
||||||
# Make a few methods public for these tests.
|
|
||||||
class Mongo::Connection
|
|
||||||
public :checkin, :checkout, :clear_stale_cached_connections!
|
|
||||||
end
|
|
||||||
|
|
||||||
context "Initialization: " do
|
context "Initialization: " do
|
||||||
|
|
||||||
context "given a single node" do
|
context "given a single node" do
|
||||||
|
@ -60,79 +55,5 @@ class ConnectionTest < Test::Unit::TestCase
|
||||||
assert_equal 2500, @conn.port
|
assert_equal 2500, @conn.port
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context "Connection pooling: " do
|
|
||||||
setup do
|
|
||||||
TCPSocket.stubs(:new).returns(new_mock_socket)
|
|
||||||
@conn = Connection.new('localhost', 27107, :connect => false,
|
|
||||||
:pool_size => 3)
|
|
||||||
|
|
||||||
admin_db = new_mock_db
|
|
||||||
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1})
|
|
||||||
@conn.expects(:[]).with('admin').returns(admin_db)
|
|
||||||
@conn.connect_to_master
|
|
||||||
end
|
|
||||||
|
|
||||||
should "check out a new connection" do
|
|
||||||
socket = @conn.checkout
|
|
||||||
assert @conn.reserved_connections.keys.include?(Thread.current.object_id)
|
|
||||||
end
|
|
||||||
|
|
||||||
context "with multiple threads" do
|
|
||||||
setup do
|
|
||||||
@thread1 = Object.new
|
|
||||||
@thread2 = Object.new
|
|
||||||
@thread3 = Object.new
|
|
||||||
@thread4 = Object.new
|
|
||||||
|
|
||||||
Thread.stubs(:current).returns(@thread1)
|
|
||||||
@socket1 = @conn.checkout
|
|
||||||
Thread.stubs(:current).returns(@thread2)
|
|
||||||
@socket2 = @conn.checkout
|
|
||||||
Thread.stubs(:current).returns(@thread3)
|
|
||||||
@socket3 = @conn.checkout
|
|
||||||
end
|
|
||||||
|
|
||||||
should "add each thread to the reserved pool" do
|
|
||||||
assert @conn.reserved_connections.keys.include?(@thread1.object_id)
|
|
||||||
assert @conn.reserved_connections.keys.include?(@thread2.object_id)
|
|
||||||
assert @conn.reserved_connections.keys.include?(@thread3.object_id)
|
|
||||||
end
|
|
||||||
|
|
||||||
should "only add one socket per thread" do
|
|
||||||
@conn.reserved_connections.values do |socket|
|
|
||||||
assert [@socket1, @socket2, @socket3].include?(socket)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
should "check out all sockets" do
|
|
||||||
assert_equal @conn.sockets.size, @conn.checked_out.size
|
|
||||||
@conn.sockets.each do |sock|
|
|
||||||
assert @conn.checked_out.include?(sock)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
should "raise an error if no more sockets can be checked out" do
|
|
||||||
# This can't be tested with mock threads.
|
|
||||||
# Will test in integration tests.
|
|
||||||
end
|
|
||||||
|
|
||||||
context "when releasing dead threads" do
|
|
||||||
setup do
|
|
||||||
@thread1.expects(:alive?).returns(false)
|
|
||||||
@thread2.expects(:alive?).returns(true)
|
|
||||||
@thread3.expects(:alive?).returns(true)
|
|
||||||
Thread.expects(:list).returns([@thread1, @thread2, @thread3])
|
|
||||||
@conn.clear_stale_cached_connections!
|
|
||||||
end
|
|
||||||
|
|
||||||
should "return connections for dead threads" do
|
|
||||||
assert !@conn.checked_out.include?(@socket1)
|
|
||||||
assert_nil @conn.reserved_connections[@thread1.object_id]
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue