Fix connection leak and refresh.
This commit is contained in:
parent
1001e59e44
commit
6a7e991689
@ -186,28 +186,25 @@ module Mongo
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Refresh the current replica set configuration.
|
# Determine whether a replica set refresh is
|
||||||
# This method will attempt to do a soft refresh,
|
# required. If so, run a hard refresh. You can
|
||||||
# updating only those parts of the replica set that
|
# force a hard refresh by running
|
||||||
# have changed. If that's not possible, the method
|
# ReplSetConnection#hard_refresh!
|
||||||
# will perform a hard refresh.
|
|
||||||
#
|
#
|
||||||
# @return [Boolean] +true+ if hard refresh
|
# @return [Boolean] +true+ unless a hard refresh
|
||||||
# occurred. +false+ is returned when unable
|
# is run and the refresh lock can't be acquired.
|
||||||
# to get the refresh lock.
|
|
||||||
def refresh(opts={})
|
def refresh(opts={})
|
||||||
if !connected?
|
if !connected?
|
||||||
log(:info, "Trying to refresh but not connected..." +
|
log(:info, "Trying to check replica set health but not " +
|
||||||
"skipping replica set health check.")
|
"connected...")
|
||||||
hard_refresh!
|
return hard_refresh!
|
||||||
return true
|
|
||||||
end
|
end
|
||||||
|
|
||||||
log(:info, "Checking replica set connection health...")
|
log(:info, "Checking replica set connection health...")
|
||||||
@manager.check_connection_health
|
@manager.check_connection_health
|
||||||
|
|
||||||
if @manager.refresh_required?
|
if @manager.refresh_required?
|
||||||
hard_refresh!
|
return hard_refresh!
|
||||||
end
|
end
|
||||||
|
|
||||||
return true
|
return true
|
||||||
@ -227,9 +224,12 @@ module Mongo
|
|||||||
@background_manager.connect
|
@background_manager.connect
|
||||||
|
|
||||||
sync_synchronize(:EX) do
|
sync_synchronize(:EX) do
|
||||||
|
@manager.close
|
||||||
update_config(@background_manager)
|
update_config(@background_manager)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
initiate_refresh_mode
|
||||||
|
|
||||||
return true
|
return true
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -360,6 +360,7 @@ module Mongo
|
|||||||
@manager = manager
|
@manager = manager
|
||||||
@nodes = manager.nodes
|
@nodes = manager.nodes
|
||||||
@max_bson_size = manager.max_bson_size
|
@max_bson_size = manager.max_bson_size
|
||||||
|
@sockets_to_pools.clear
|
||||||
end
|
end
|
||||||
|
|
||||||
def initiate_refresh_mode
|
def initiate_refresh_mode
|
||||||
|
@ -56,7 +56,7 @@ module Mongo
|
|||||||
|
|
||||||
def close
|
def close
|
||||||
@connection_mutex.synchronize do
|
@connection_mutex.synchronize do
|
||||||
@sockets.each do |sock|
|
(@sockets - @checked_out).each do |sock|
|
||||||
begin
|
begin
|
||||||
sock.close
|
sock.close
|
||||||
rescue IOError => ex
|
rescue IOError => ex
|
||||||
|
@ -47,6 +47,13 @@ module Mongo
|
|||||||
config = seed.set_config
|
config = seed.set_config
|
||||||
if !config
|
if !config
|
||||||
@refresh_required = true
|
@refresh_required = true
|
||||||
|
seed.close
|
||||||
|
return
|
||||||
|
end
|
||||||
|
|
||||||
|
if config['hosts'].length != @members.length
|
||||||
|
@refresh_required = true
|
||||||
|
seed.close
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -59,9 +66,12 @@ module Mongo
|
|||||||
next
|
next
|
||||||
else
|
else
|
||||||
@refresh_required = true
|
@refresh_required = true
|
||||||
|
seed.close
|
||||||
return false
|
return false
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
seed.close
|
||||||
end
|
end
|
||||||
|
|
||||||
# The replica set connection should initiate a full refresh.
|
# The replica set connection should initiate a full refresh.
|
||||||
|
@ -70,7 +70,7 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||||||
|
|
||||||
rescue_connection_failure do
|
rescue_connection_failure do
|
||||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :sync)
|
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
||||||
end
|
end
|
||||||
|
|
||||||
assert_equal [], @conn.secondaries
|
assert_equal [], @conn.secondaries
|
||||||
@ -78,8 +78,7 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||||||
assert_equal @conn.read_pool, @conn.primary_pool
|
assert_equal @conn.read_pool, @conn.primary_pool
|
||||||
|
|
||||||
RS.restart_killed_nodes
|
RS.restart_killed_nodes
|
||||||
|
sleep(4)
|
||||||
sleep(3)
|
|
||||||
|
|
||||||
assert @conn.read_pool != @conn.primary_pool, "Read pool and primary pool are identical."
|
assert @conn.read_pool != @conn.primary_pool, "Read pool and primary pool are identical."
|
||||||
assert @conn.secondaries.length > 0, "No secondaries have been added."
|
assert @conn.secondaries.length > 0, "No secondaries have been added."
|
||||||
@ -87,9 +86,9 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||||||
|
|
||||||
def test_automated_refresh_with_removed_node
|
def test_automated_refresh_with_removed_node
|
||||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :sync)
|
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
||||||
|
|
||||||
p @conn.secondary_pools
|
@conn.secondary_pools
|
||||||
assert_equal 2, @conn.secondary_pools.length
|
assert_equal 2, @conn.secondary_pools.length
|
||||||
assert_equal 2, @conn.secondaries.length
|
assert_equal 2, @conn.secondaries.length
|
||||||
|
|
||||||
@ -107,7 +106,7 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
||||||
|
|
||||||
RS.add_node
|
RS.add_node
|
||||||
sleep(5)
|
sleep(4)
|
||||||
|
|
||||||
@conn2 = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
@conn2 = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
||||||
@ -116,8 +115,12 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||||||
assert_equal 3, @conn.secondary_pools.length
|
assert_equal 3, @conn.secondary_pools.length
|
||||||
assert_equal 3, @conn.secondaries.length
|
assert_equal 3, @conn.secondaries.length
|
||||||
|
|
||||||
|
config = @conn['admin'].command({:ismaster => 1})
|
||||||
|
|
||||||
RS.remove_secondary_node
|
RS.remove_secondary_node
|
||||||
sleep(4)
|
sleep(4)
|
||||||
|
config = @conn['admin'].command({:ismaster => 1})
|
||||||
|
|
||||||
assert_equal 2, @conn.secondary_pools.length
|
assert_equal 2, @conn.secondary_pools.length
|
||||||
assert_equal 2, @conn.secondaries.length
|
assert_equal 2, @conn.secondaries.length
|
||||||
end
|
end
|
||||||
|
Loading…
Reference in New Issue
Block a user