From 20562e96ab3ae0bd48cd785b3e5c0049d93dbcb4 Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Tue, 24 Nov 2009 13:47:37 -0500 Subject: [PATCH] Tests passing for connection pooling. --- lib/mongo/connection.rb | 34 ++++++------ test/test_threading.rb | 2 +- test/test_threading_large_pool.rb | 90 +++++++++++++++++++++++++++++++ test/unit/connection_test.rb | 14 ----- 4 files changed, 107 insertions(+), 33 deletions(-) create mode 100644 test/test_threading_large_pool.rb diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 258a46a..a41cdc1 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -211,6 +211,7 @@ module Mongo packed_message = pack_message(operation, message) socket = checkout send_message_on_socket(packed_message, socket) + checkin(socket) end # Sends a message to MongoDB and returns the response. @@ -243,9 +244,8 @@ module Mongo @logger.debug(" MONGODB #{log_message || message}") if @logger packed_message = pack_message(operation, message) socket = checkout - response = send_message_on_socket(packed_message, socket) + send_message_on_socket(packed_message, socket) checkin(socket) - response end # Sends a message to the database, waits for a response, and raises @@ -258,10 +258,10 @@ module Mongo msg = message_with_headers.append!(message_with_check).to_s send_message_on_socket(msg, sock) docs, num_received, cursor_id = receive(sock) + checkin(sock) if num_received == 1 && error = docs[0]['err'] raise Mongo::OperationFailure, error end - checkin(sock) [docs, num_received, cursor_id] end @@ -272,7 +272,9 @@ module Mongo sock = socket || checkout send_message_on_socket(message_with_headers, sock) - receive(sock) + result = receive(sock) + checkin(sock) + result end # Creates a new socket and tries to connect to master. @@ -346,6 +348,7 @@ module Mongo # Get a socket from the pool, mapped to the current thread. def checkout + #return @socket ||= checkout_new_socket if @size == 1 if sock = @reserved_connections[Thread.current.object_id] sock else @@ -358,16 +361,17 @@ module Mongo # Return a socket to the pool. def checkin(socket) @connection_mutex.synchronize do - @checked_out.delete(socket) @reserved_connections.delete Thread.current.object_id + @checked_out.delete(socket) @queue.signal end + true end # Releases the connection for any dead threads. # Called when the connection pool grows too large to free up more sockets. def clear_stale_cached_connections! - keys = Set.new(@reserved_connections.keys) + keys = @reserved_connections.keys Thread.list.each do |thread| keys.delete(thread.object_id) if thread.alive? @@ -418,20 +422,22 @@ module Mongo end return socket if socket - # No connections available; wait. + # Try to clear out any stale threads to free up some connections + clear_stale_cached_connections! + next if @checked_out.size < @sockets.size + # Otherwise, wait. if @queue.wait(@timeout) next else - # Try to clear out any stale threads to free up some connections clear_stale_cached_connections! if @size == @sockets.size raise ConnectionTimeoutError, "could not obtain connection within " + "#{@timeout} seconds. The max pool size is currently #{@size}; " + "consider increasing it." end - end # if + end # if end # loop - end #sync + end # synchronize end def receive(sock) @@ -518,15 +524,7 @@ module Mongo # Low-level method for sending a message on a socket. # Requires a packed message and an available socket, def send_message_on_socket(packed_message, socket) - #socket will be connected to master when we receive it - #begin socket.send(packed_message, 0) - #rescue => ex - # close - # need to find a way to release the socket here - # checkin(socket) - # raise ex - #end end # Low-level method for receiving data from socket. diff --git a/test/test_threading.rb b/test/test_threading.rb index 0e0c7c7..cc398e8 100644 --- a/test/test_threading.rb +++ b/test/test_threading.rb @@ -4,7 +4,7 @@ class TestThreading < Test::Unit::TestCase include Mongo - @@db = Connection.new('localhost', 27017, :pool_size => 150, :timeout => 1).db('ruby-mongo-test') + @@db = Connection.new('localhost', 27017, :pool_size => 1, :timeout => 3).db('ruby-mongo-test') @@coll = @@db.collection('thread-test-collection') def set_up_safe_data diff --git a/test/test_threading_large_pool.rb b/test/test_threading_large_pool.rb new file mode 100644 index 0000000..8a3d177 --- /dev/null +++ b/test/test_threading_large_pool.rb @@ -0,0 +1,90 @@ +require 'test/test_helper' + +# Essentialy the same as test_threading.rb but with an expanded pool for +# testing multiple connections. +class TestThreadingLargePool < Test::Unit::TestCase + + include Mongo + + @@db = Connection.new('localhost', 27017, :pool_size => 50, :timeout => 15).db('ruby-mongo-test') + @@coll = @@db.collection('thread-test-collection') + + def set_up_safe_data + @@db.drop_collection('duplicate') + @@db.drop_collection('unique') + @duplicate = @@db.collection('duplicate') + @unique = @@db.collection('unique') + + @duplicate.insert("test" => "insert") + @duplicate.insert("test" => "update") + @unique.insert("test" => "insert") + @unique.insert("test" => "update") + @unique.create_index("test", true) + end + + def test_safe_update + set_up_safe_data + threads = [] + 100.times do |i| + threads[i] = Thread.new do + if i % 2 == 0 + assert_raise Mongo::OperationFailure do + @unique.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true) + end + else + @duplicate.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true) + end + end + end + + 100.times do |i| + threads[i].join + end + end + + def test_safe_insert + set_up_safe_data + threads = [] + 100.times do |i| + threads[i] = Thread.new do + if i % 2 == 0 + assert_raise Mongo::OperationFailure do + @unique.insert({"test" => "insert"}, :safe => true) + end + else + @duplicate.insert({"test" => "insert"}, :safe => true) + end + end + end + + 100.times do |i| + threads[i].join + end + end + + def test_threading + @@coll.drop + @@coll = @@db.collection('thread-test-collection') + + 1000.times do |i| + @@coll.insert("x" => i) + end + + threads = [] + + 10.times do |i| + threads[i] = Thread.new do + sum = 0 + @@coll.find().each do |document| + sum += document["x"] + end + assert_equal 499500, sum + end + end + + 10.times do |i| + threads[i].join + end + end + +end diff --git a/test/unit/connection_test.rb b/test/unit/connection_test.rb index 3e7cdc5..4177ba3 100644 --- a/test/unit/connection_test.rb +++ b/test/unit/connection_test.rb @@ -109,22 +109,8 @@ class ConnectionTest < Test::Unit::TestCase assert !@conn.checked_out.include?(@socket1) assert_nil @conn.reserved_connections[@thread1.object_id] end - - should "maintain connection for live threads" do - #assert @conn.checked_out.include?(@socket2) - #assert @conn.checked_out.include?(@socket3) - end end - context "when checking in a socket" do - setup do - @conn.checkin(@socket3) - end - - should "reduce the number checked out by one" do - #assert_equal @conn.checked_out.size, (@conn.sockets.size - 1) - end - end end end end