Tests passing for connection pooling.

This commit is contained in:
Kyle Banker 2009-11-24 13:47:37 -05:00
parent 515594ebf3
commit 20562e96ab
4 changed files with 107 additions and 33 deletions

View File

@ -211,6 +211,7 @@ module Mongo
packed_message = pack_message(operation, message) packed_message = pack_message(operation, message)
socket = checkout socket = checkout
send_message_on_socket(packed_message, socket) send_message_on_socket(packed_message, socket)
checkin(socket)
end end
# Sends a message to MongoDB and returns the response. # Sends a message to MongoDB and returns the response.
@ -243,9 +244,8 @@ module Mongo
@logger.debug(" MONGODB #{log_message || message}") if @logger @logger.debug(" MONGODB #{log_message || message}") if @logger
packed_message = pack_message(operation, message) packed_message = pack_message(operation, message)
socket = checkout socket = checkout
response = send_message_on_socket(packed_message, socket) send_message_on_socket(packed_message, socket)
checkin(socket) checkin(socket)
response
end end
# Sends a message to the database, waits for a response, and raises # Sends a message to the database, waits for a response, and raises
@ -258,10 +258,10 @@ module Mongo
msg = message_with_headers.append!(message_with_check).to_s msg = message_with_headers.append!(message_with_check).to_s
send_message_on_socket(msg, sock) send_message_on_socket(msg, sock)
docs, num_received, cursor_id = receive(sock) docs, num_received, cursor_id = receive(sock)
checkin(sock)
if num_received == 1 && error = docs[0]['err'] if num_received == 1 && error = docs[0]['err']
raise Mongo::OperationFailure, error raise Mongo::OperationFailure, error
end end
checkin(sock)
[docs, num_received, cursor_id] [docs, num_received, cursor_id]
end end
@ -272,7 +272,9 @@ module Mongo
sock = socket || checkout sock = socket || checkout
send_message_on_socket(message_with_headers, sock) send_message_on_socket(message_with_headers, sock)
receive(sock) result = receive(sock)
checkin(sock)
result
end end
# Creates a new socket and tries to connect to master. # Creates a new socket and tries to connect to master.
@ -346,6 +348,7 @@ module Mongo
# Get a socket from the pool, mapped to the current thread. # Get a socket from the pool, mapped to the current thread.
def checkout def checkout
#return @socket ||= checkout_new_socket if @size == 1
if sock = @reserved_connections[Thread.current.object_id] if sock = @reserved_connections[Thread.current.object_id]
sock sock
else else
@ -358,16 +361,17 @@ module Mongo
# Return a socket to the pool. # Return a socket to the pool.
def checkin(socket) def checkin(socket)
@connection_mutex.synchronize do @connection_mutex.synchronize do
@checked_out.delete(socket)
@reserved_connections.delete Thread.current.object_id @reserved_connections.delete Thread.current.object_id
@checked_out.delete(socket)
@queue.signal @queue.signal
end end
true
end end
# Releases the connection for any dead threads. # Releases the connection for any dead threads.
# Called when the connection pool grows too large to free up more sockets. # Called when the connection pool grows too large to free up more sockets.
def clear_stale_cached_connections! def clear_stale_cached_connections!
keys = Set.new(@reserved_connections.keys) keys = @reserved_connections.keys
Thread.list.each do |thread| Thread.list.each do |thread|
keys.delete(thread.object_id) if thread.alive? keys.delete(thread.object_id) if thread.alive?
@ -418,20 +422,22 @@ module Mongo
end end
return socket if socket return socket if socket
# No connections available; wait. # Try to clear out any stale threads to free up some connections
clear_stale_cached_connections!
next if @checked_out.size < @sockets.size
# Otherwise, wait.
if @queue.wait(@timeout) if @queue.wait(@timeout)
next next
else else
# Try to clear out any stale threads to free up some connections
clear_stale_cached_connections! clear_stale_cached_connections!
if @size == @sockets.size if @size == @sockets.size
raise ConnectionTimeoutError, "could not obtain connection within " + raise ConnectionTimeoutError, "could not obtain connection within " +
"#{@timeout} seconds. The max pool size is currently #{@size}; " + "#{@timeout} seconds. The max pool size is currently #{@size}; " +
"consider increasing it." "consider increasing it."
end end
end # if end # if
end # loop end # loop
end #sync end # synchronize
end end
def receive(sock) def receive(sock)
@ -518,15 +524,7 @@ module Mongo
# Low-level method for sending a message on a socket. # Low-level method for sending a message on a socket.
# Requires a packed message and an available socket, # Requires a packed message and an available socket,
def send_message_on_socket(packed_message, socket) def send_message_on_socket(packed_message, socket)
#socket will be connected to master when we receive it
#begin
socket.send(packed_message, 0) socket.send(packed_message, 0)
#rescue => ex
# close
# need to find a way to release the socket here
# checkin(socket)
# raise ex
#end
end end
# Low-level method for receiving data from socket. # Low-level method for receiving data from socket.

View File

@ -4,7 +4,7 @@ class TestThreading < Test::Unit::TestCase
include Mongo include Mongo
@@db = Connection.new('localhost', 27017, :pool_size => 150, :timeout => 1).db('ruby-mongo-test') @@db = Connection.new('localhost', 27017, :pool_size => 1, :timeout => 3).db('ruby-mongo-test')
@@coll = @@db.collection('thread-test-collection') @@coll = @@db.collection('thread-test-collection')
def set_up_safe_data def set_up_safe_data

View File

@ -0,0 +1,90 @@
require 'test/test_helper'
# Essentialy the same as test_threading.rb but with an expanded pool for
# testing multiple connections.
class TestThreadingLargePool < Test::Unit::TestCase
include Mongo
@@db = Connection.new('localhost', 27017, :pool_size => 50, :timeout => 15).db('ruby-mongo-test')
@@coll = @@db.collection('thread-test-collection')
def set_up_safe_data
@@db.drop_collection('duplicate')
@@db.drop_collection('unique')
@duplicate = @@db.collection('duplicate')
@unique = @@db.collection('unique')
@duplicate.insert("test" => "insert")
@duplicate.insert("test" => "update")
@unique.insert("test" => "insert")
@unique.insert("test" => "update")
@unique.create_index("test", true)
end
def test_safe_update
set_up_safe_data
threads = []
100.times do |i|
threads[i] = Thread.new do
if i % 2 == 0
assert_raise Mongo::OperationFailure do
@unique.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true)
end
else
@duplicate.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true)
end
end
end
100.times do |i|
threads[i].join
end
end
def test_safe_insert
set_up_safe_data
threads = []
100.times do |i|
threads[i] = Thread.new do
if i % 2 == 0
assert_raise Mongo::OperationFailure do
@unique.insert({"test" => "insert"}, :safe => true)
end
else
@duplicate.insert({"test" => "insert"}, :safe => true)
end
end
end
100.times do |i|
threads[i].join
end
end
def test_threading
@@coll.drop
@@coll = @@db.collection('thread-test-collection')
1000.times do |i|
@@coll.insert("x" => i)
end
threads = []
10.times do |i|
threads[i] = Thread.new do
sum = 0
@@coll.find().each do |document|
sum += document["x"]
end
assert_equal 499500, sum
end
end
10.times do |i|
threads[i].join
end
end
end

View File

@ -109,22 +109,8 @@ class ConnectionTest < Test::Unit::TestCase
assert !@conn.checked_out.include?(@socket1) assert !@conn.checked_out.include?(@socket1)
assert_nil @conn.reserved_connections[@thread1.object_id] assert_nil @conn.reserved_connections[@thread1.object_id]
end end
should "maintain connection for live threads" do
#assert @conn.checked_out.include?(@socket2)
#assert @conn.checked_out.include?(@socket3)
end
end end
context "when checking in a socket" do
setup do
@conn.checkin(@socket3)
end
should "reduce the number checked out by one" do
#assert_equal @conn.checked_out.size, (@conn.sockets.size - 1)
end
end
end end
end end
end end