Check in connections on operation exceptions RUBY-83
This commit is contained in:
parent
8c6e0a3591
commit
80afca2fe2
@ -225,10 +225,13 @@ module Mongo
|
|||||||
# @return [True]
|
# @return [True]
|
||||||
def send_message(operation, message, log_message=nil)
|
def send_message(operation, message, log_message=nil)
|
||||||
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
||||||
packed_message = add_message_headers(operation, message).to_s
|
begin
|
||||||
socket = checkout
|
packed_message = add_message_headers(operation, message).to_s
|
||||||
send_message_on_socket(packed_message, socket)
|
socket = checkout
|
||||||
checkin(socket)
|
send_message_on_socket(packed_message, socket)
|
||||||
|
ensure
|
||||||
|
checkin(socket)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Sends a message to the database, waits for a response, and raises
|
# Sends a message to the database, waits for a response, and raises
|
||||||
@ -246,14 +249,17 @@ module Mongo
|
|||||||
message_with_headers = add_message_headers(operation, message)
|
message_with_headers = add_message_headers(operation, message)
|
||||||
message_with_check = last_error_message(db_name)
|
message_with_check = last_error_message(db_name)
|
||||||
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
||||||
sock = checkout
|
begin
|
||||||
packed_message = message_with_headers.append!(message_with_check).to_s
|
sock = checkout
|
||||||
docs = num_received = cursor_id = ''
|
packed_message = message_with_headers.append!(message_with_check).to_s
|
||||||
@safe_mutex.synchronize do
|
docs = num_received = cursor_id = ''
|
||||||
send_message_on_socket(packed_message, sock)
|
@safe_mutex.synchronize do
|
||||||
docs, num_received, cursor_id = receive(sock)
|
send_message_on_socket(packed_message, sock)
|
||||||
|
docs, num_received, cursor_id = receive(sock)
|
||||||
|
end
|
||||||
|
ensure
|
||||||
|
checkin(sock)
|
||||||
end
|
end
|
||||||
checkin(sock)
|
|
||||||
if num_received == 1 && error = docs[0]['err']
|
if num_received == 1 && error = docs[0]['err']
|
||||||
raise Mongo::OperationFailure, error
|
raise Mongo::OperationFailure, error
|
||||||
end
|
end
|
||||||
@ -273,14 +279,17 @@ module Mongo
|
|||||||
def receive_message(operation, message, log_message=nil, socket=nil)
|
def receive_message(operation, message, log_message=nil, socket=nil)
|
||||||
packed_message = add_message_headers(operation, message).to_s
|
packed_message = add_message_headers(operation, message).to_s
|
||||||
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
@logger.debug(" MONGODB #{log_message || message}") if @logger
|
||||||
sock = socket || checkout
|
begin
|
||||||
|
sock = socket || checkout
|
||||||
|
|
||||||
result = ''
|
result = ''
|
||||||
@safe_mutex.synchronize do
|
@safe_mutex.synchronize do
|
||||||
send_message_on_socket(packed_message, sock)
|
send_message_on_socket(packed_message, sock)
|
||||||
result = receive(sock)
|
result = receive(sock)
|
||||||
|
end
|
||||||
|
ensure
|
||||||
|
checkin(sock)
|
||||||
end
|
end
|
||||||
checkin(sock)
|
|
||||||
result
|
result
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -394,6 +403,9 @@ module Mongo
|
|||||||
return socket if socket
|
return socket if socket
|
||||||
|
|
||||||
# Otherwise, wait
|
# Otherwise, wait
|
||||||
|
if @logger
|
||||||
|
@logger.warn "Waiting for available connection; #{@checked_out.size} of #{@size} connections checked out."
|
||||||
|
end
|
||||||
@queue.wait(@connection_mutex)
|
@queue.wait(@connection_mutex)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
require 'test/test_helper'
|
require 'test/test_helper'
|
||||||
require 'logger'
|
require 'logger'
|
||||||
require 'stringio'
|
require 'stringio'
|
||||||
|
require 'thread'
|
||||||
|
|
||||||
# NOTE: assumes Mongo is running
|
# NOTE: assumes Mongo is running
|
||||||
class TestConnection < Test::Unit::TestCase
|
class TestConnection < Test::Unit::TestCase
|
||||||
@ -122,4 +123,38 @@ class TestConnection < Test::Unit::TestCase
|
|||||||
assert_equal ['bar', Connection::DEFAULT_PORT], nodes[0]
|
assert_equal ['bar', Connection::DEFAULT_PORT], nodes[0]
|
||||||
assert_equal ['foo', 123], nodes[1]
|
assert_equal ['foo', 123], nodes[1]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context "Connection exceptions" do
|
||||||
|
setup do
|
||||||
|
@conn = Mongo::Connection.new('localhost', 27017, :pool_size => 10, :timeout => 10)
|
||||||
|
@coll = @conn['mongo-ruby-test']['test-connection-exceptions']
|
||||||
|
end
|
||||||
|
|
||||||
|
should "release connection if an exception is raised on send_message" do
|
||||||
|
@conn.stubs(:send_message_on_socket).raises(ConnectionFailure)
|
||||||
|
assert_equal 0, @conn.checked_out.size
|
||||||
|
assert_raise ConnectionFailure do
|
||||||
|
@coll.insert({:test => "insert"})
|
||||||
|
end
|
||||||
|
assert_equal 0, @conn.checked_out.size
|
||||||
|
end
|
||||||
|
|
||||||
|
should "release connection if an exception is raised on send_with_safe_check" do
|
||||||
|
@conn.stubs(:receive).raises(ConnectionFailure)
|
||||||
|
assert_equal 0, @conn.checked_out.size
|
||||||
|
assert_raise ConnectionFailure do
|
||||||
|
@coll.insert({:test => "insert"}, :safe => true)
|
||||||
|
end
|
||||||
|
assert_equal 0, @conn.checked_out.size
|
||||||
|
end
|
||||||
|
|
||||||
|
should "release connection if an exception is raised on receive_message" do
|
||||||
|
@conn.stubs(:receive).raises(ConnectionFailure)
|
||||||
|
assert_equal 0, @conn.checked_out.size
|
||||||
|
assert_raise ConnectionFailure do
|
||||||
|
@coll.find.to_a
|
||||||
|
end
|
||||||
|
assert_equal 0, @conn.checked_out.size
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
Loading…
Reference in New Issue
Block a user