RUBY-374 Close connection on SystemStackError, NoMemoryError, and SystemCallError
This commit is contained in:
parent
d3c9637268
commit
03303b8409
@ -518,13 +518,21 @@ module Mongo
|
|||||||
end
|
end
|
||||||
|
|
||||||
def checkout_socket_from_connection
|
def checkout_socket_from_connection
|
||||||
|
socket = nil
|
||||||
|
begin
|
||||||
@checkin_connection = true
|
@checkin_connection = true
|
||||||
if @command || @read_preference == :primary
|
if @command || @read_preference == :primary
|
||||||
@connection.checkout_writer
|
socket = @connection.checkout_writer
|
||||||
else
|
else
|
||||||
@read_pool = @connection.read_pool
|
@read_pool = @connection.read_pool
|
||||||
@connection.checkout_reader
|
socket = @connection.checkout_reader
|
||||||
end
|
end
|
||||||
|
rescue SystemStackError, NoMemoryError, SystemCallError => ex
|
||||||
|
@connection.close
|
||||||
|
raise ex
|
||||||
|
end
|
||||||
|
|
||||||
|
socket
|
||||||
end
|
end
|
||||||
|
|
||||||
def checkout_socket_for_op_get_more
|
def checkout_socket_for_op_get_more
|
||||||
@ -540,9 +548,15 @@ module Mongo
|
|||||||
pool.host == @read_pool.host && pool.port == @read_pool.port
|
pool.host == @read_pool.host && pool.port == @read_pool.port
|
||||||
end
|
end
|
||||||
if new_pool
|
if new_pool
|
||||||
|
sock = nil
|
||||||
|
begin
|
||||||
@read_pool = new_pool
|
@read_pool = new_pool
|
||||||
sock = new_pool.checkout
|
sock = new_pool.checkout
|
||||||
@checkin_read_pool = true
|
@checkin_read_pool = true
|
||||||
|
rescue SystemStackError, NoMemoryError, SystemCallError => ex
|
||||||
|
@connection.close
|
||||||
|
raise ex
|
||||||
|
end
|
||||||
return sock
|
return sock
|
||||||
else
|
else
|
||||||
raise Mongo::OperationFailure, "Failure to continue iterating " +
|
raise Mongo::OperationFailure, "Failure to continue iterating " +
|
||||||
|
@ -28,15 +28,20 @@ module Mongo
|
|||||||
add_message_headers(message, operation)
|
add_message_headers(message, operation)
|
||||||
packed_message = message.to_s
|
packed_message = message.to_s
|
||||||
|
|
||||||
|
sock = nil
|
||||||
|
begin
|
||||||
if connection == :writer
|
if connection == :writer
|
||||||
sock = checkout_writer
|
sock = checkout_writer
|
||||||
else
|
else
|
||||||
sock = checkout_reader
|
sock = checkout_reader
|
||||||
end
|
end
|
||||||
|
|
||||||
begin
|
|
||||||
send_message_on_socket(packed_message, sock)
|
send_message_on_socket(packed_message, sock)
|
||||||
|
rescue SystemStackError, NoMemoryError, SystemCallError => ex
|
||||||
|
close
|
||||||
|
raise ex
|
||||||
ensure
|
ensure
|
||||||
|
if sock
|
||||||
if connection == :writer
|
if connection == :writer
|
||||||
checkin_writer(sock)
|
checkin_writer(sock)
|
||||||
else
|
else
|
||||||
@ -44,6 +49,7 @@ module Mongo
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
|
||||||
# Sends a message to the database, waits for a response, and raises
|
# Sends a message to the database, waits for a response, and raises
|
||||||
# an exception if the operation has failed.
|
# an exception if the operation has failed.
|
||||||
@ -66,14 +72,18 @@ module Mongo
|
|||||||
last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY)
|
last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY)
|
||||||
|
|
||||||
packed_message = message.append!(last_error_message).to_s
|
packed_message = message.append!(last_error_message).to_s
|
||||||
sock = checkout_writer
|
sock = nil
|
||||||
begin
|
begin
|
||||||
|
sock = checkout_writer
|
||||||
send_message_on_socket(packed_message, sock)
|
send_message_on_socket(packed_message, sock)
|
||||||
docs, num_received, cursor_id = receive(sock, last_error_id)
|
docs, num_received, cursor_id = receive(sock, last_error_id)
|
||||||
checkin_writer(sock)
|
checkin_writer(sock)
|
||||||
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
|
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
|
||||||
checkin_writer(sock)
|
checkin_writer(sock)
|
||||||
raise ex
|
raise ex
|
||||||
|
rescue SystemStackError, NoMemoryError, SystemCallError => ex
|
||||||
|
close
|
||||||
|
raise ex
|
||||||
end
|
end
|
||||||
|
|
||||||
if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg'])
|
if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg'])
|
||||||
@ -103,6 +113,10 @@ module Mongo
|
|||||||
read=:primary, exhaust=false)
|
read=:primary, exhaust=false)
|
||||||
request_id = add_message_headers(message, operation)
|
request_id = add_message_headers(message, operation)
|
||||||
packed_message = message.to_s
|
packed_message = message.to_s
|
||||||
|
|
||||||
|
result = ''
|
||||||
|
sock = nil
|
||||||
|
begin
|
||||||
if socket
|
if socket
|
||||||
sock = socket
|
sock = socket
|
||||||
should_checkin = false
|
should_checkin = false
|
||||||
@ -117,10 +131,11 @@ module Mongo
|
|||||||
should_checkin = true
|
should_checkin = true
|
||||||
end
|
end
|
||||||
|
|
||||||
result = ''
|
|
||||||
begin
|
|
||||||
send_message_on_socket(packed_message, sock)
|
send_message_on_socket(packed_message, sock)
|
||||||
result = receive(sock, request_id, exhaust)
|
result = receive(sock, request_id, exhaust)
|
||||||
|
rescue SystemStackError, NoMemoryError, SystemCallError => ex
|
||||||
|
close
|
||||||
|
raise ex
|
||||||
ensure
|
ensure
|
||||||
if should_checkin
|
if should_checkin
|
||||||
if command || read == :primary
|
if command || read == :primary
|
||||||
|
@ -300,12 +300,17 @@ module Mongo
|
|||||||
# if no read pool has been defined.
|
# if no read pool has been defined.
|
||||||
def checkout_reader
|
def checkout_reader
|
||||||
connect unless connected?
|
connect unless connected?
|
||||||
|
begin
|
||||||
socket = get_socket_from_pool(self.read_pool)
|
socket = get_socket_from_pool(self.read_pool)
|
||||||
|
|
||||||
if !socket
|
if !socket
|
||||||
connect
|
connect
|
||||||
socket = get_socket_from_pool(self.primary_pool)
|
socket = get_socket_from_pool(self.primary_pool)
|
||||||
end
|
end
|
||||||
|
rescue => ex
|
||||||
|
checkin(socket) if socket
|
||||||
|
raise ex
|
||||||
|
end
|
||||||
|
|
||||||
if socket
|
if socket
|
||||||
socket
|
socket
|
||||||
@ -317,12 +322,17 @@ module Mongo
|
|||||||
# Checkout a socket for writing (i.e., a primary node).
|
# Checkout a socket for writing (i.e., a primary node).
|
||||||
def checkout_writer
|
def checkout_writer
|
||||||
connect unless connected?
|
connect unless connected?
|
||||||
|
begin
|
||||||
socket = get_socket_from_pool(self.primary_pool)
|
socket = get_socket_from_pool(self.primary_pool)
|
||||||
|
|
||||||
if !socket
|
if !socket
|
||||||
connect
|
connect
|
||||||
socket = get_socket_from_pool(self.primary_pool)
|
socket = get_socket_from_pool(self.primary_pool)
|
||||||
end
|
end
|
||||||
|
rescue => ex
|
||||||
|
checkin(socket)
|
||||||
|
raise ex
|
||||||
|
end
|
||||||
|
|
||||||
if socket
|
if socket
|
||||||
socket
|
socket
|
||||||
|
@ -278,6 +278,58 @@ class TestConnection < Test::Unit::TestCase
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context "Socket pools" do
|
||||||
|
context "checking out writers" do
|
||||||
|
setup do
|
||||||
|
@con = standard_connection(:pool_size => 10, :timeout => 10)
|
||||||
|
@coll = @con[MONGO_TEST_DB]['test-connection-exceptions']
|
||||||
|
end
|
||||||
|
|
||||||
|
should "close the connection on send_message for major exceptions" do
|
||||||
|
@con.expects(:checkout_writer).raises(SystemStackError)
|
||||||
|
@con.expects(:close)
|
||||||
|
begin
|
||||||
|
@coll.insert({:foo => "bar"})
|
||||||
|
rescue SystemStackError
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
should "close the connection on send_message_with_safe_check for major exceptions" do
|
||||||
|
@con.expects(:checkout_writer).raises(SystemStackError)
|
||||||
|
@con.expects(:close)
|
||||||
|
begin
|
||||||
|
@coll.insert({:foo => "bar"}, :safe => true)
|
||||||
|
rescue SystemStackError
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
should "close the connection on receive_message for major exceptions" do
|
||||||
|
@con.expects(:checkout_writer).raises(SystemStackError)
|
||||||
|
@con.expects(:close)
|
||||||
|
begin
|
||||||
|
@coll.find.next
|
||||||
|
rescue SystemStackError
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "checking out readers" do
|
||||||
|
setup do
|
||||||
|
@con = standard_connection(:pool_size => 10, :timeout => 10, :slave_ok => true)
|
||||||
|
@coll = @con[MONGO_TEST_DB]['test-connection-exceptions']
|
||||||
|
end
|
||||||
|
|
||||||
|
should "close the connection on receive_message for major exceptions" do
|
||||||
|
@con.expects(:checkout_reader).raises(SystemStackError)
|
||||||
|
@con.expects(:close)
|
||||||
|
begin
|
||||||
|
@coll.find.next
|
||||||
|
rescue SystemStackError
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
context "Connection exceptions" do
|
context "Connection exceptions" do
|
||||||
setup do
|
setup do
|
||||||
@con = standard_connection(:pool_size => 10, :timeout => 10)
|
@con = standard_connection(:pool_size => 10, :timeout => 10)
|
||||||
|
@ -43,4 +43,52 @@ class BasicTest < Test::Unit::TestCase
|
|||||||
assert_equal 90, @conn.refresh_interval
|
assert_equal 90, @conn.refresh_interval
|
||||||
assert_equal @conn.refresh_mode, false
|
assert_equal @conn.refresh_mode, false
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context "Socket pools" do
|
||||||
|
context "checking out writers" do
|
||||||
|
setup do
|
||||||
|
seeds = [[self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||||
|
[self.rs.host, self.rs.ports[2]]]
|
||||||
|
args = seeds << {:name => self.rs.name}
|
||||||
|
@con = ReplSetConnection.new(*args)
|
||||||
|
@coll = @con[MONGO_TEST_DB]['test-connection-exceptions']
|
||||||
|
end
|
||||||
|
|
||||||
|
should "close the connection on send_message for major exceptions" do
|
||||||
|
@con.expects(:checkout_writer).raises(SystemStackError)
|
||||||
|
@con.expects(:close)
|
||||||
|
begin
|
||||||
|
@coll.insert({:foo => "bar"})
|
||||||
|
rescue SystemStackError
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
should "close the connection on send_message_with_safe_check for major exceptions" do
|
||||||
|
@con.expects(:checkout_writer).raises(SystemStackError)
|
||||||
|
@con.expects(:close)
|
||||||
|
begin
|
||||||
|
@coll.insert({:foo => "bar"}, :safe => true)
|
||||||
|
rescue SystemStackError
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
should "close the connection on receive_message for major exceptions" do
|
||||||
|
@con.expects(:checkout_writer).raises(SystemStackError)
|
||||||
|
@con.expects(:close)
|
||||||
|
begin
|
||||||
|
@coll.find.next
|
||||||
|
rescue SystemStackError
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
should "close the connection on receive_message for major exceptions" do
|
||||||
|
@con.expects(:checkout_reader).raises(SystemStackError)
|
||||||
|
@con.expects(:close)
|
||||||
|
begin
|
||||||
|
@coll.find({}, :read => :secondary).next
|
||||||
|
rescue SystemStackError
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
Loading…
Reference in New Issue
Block a user