Add synchronous refresh; fix connection leak.
This commit is contained in:
parent
83eaa4d51b
commit
b910e3e635
@ -200,28 +200,21 @@ module Mongo
|
|||||||
@max_bson_size = manager.max_bson_size
|
@max_bson_size = manager.max_bson_size
|
||||||
end
|
end
|
||||||
|
|
||||||
# If ismaster doesn't match our current view
|
# Refresh the current replica set configuration.
|
||||||
# then create a new PoolManager, passing in our
|
def refresh(opts={})
|
||||||
# existing view. It should be able to do the diff.
|
return false if !connected?
|
||||||
# Then take out the connection lock and replace
|
|
||||||
# our current values.
|
|
||||||
def refresh
|
|
||||||
return if !connected?
|
|
||||||
|
|
||||||
if !Thread.current[:background]
|
|
||||||
Thread.current[:background] = PoolManager.new(self, @seeds)
|
|
||||||
end
|
|
||||||
|
|
||||||
background_manager = Thread.current[:background]
|
|
||||||
|
|
||||||
# Return if another thread is already in the process of refreshing.
|
# Return if another thread is already in the process of refreshing.
|
||||||
return if sync_exclusive?
|
return if sync_exclusive?
|
||||||
|
|
||||||
sync_synchronize(:EX) do
|
sync_synchronize(:EX) do
|
||||||
log(:debug, "Refreshing...")
|
log(:debug, "Refreshing...")
|
||||||
background_manager.connect
|
@background_manager ||= PoolManager.new(self, @seeds)
|
||||||
update_config(background_manager)
|
@background_manager.connect
|
||||||
|
update_config(@background_manager)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
return true
|
||||||
end
|
end
|
||||||
|
|
||||||
def connected?
|
def connected?
|
||||||
@ -281,7 +274,7 @@ module Mongo
|
|||||||
|
|
||||||
if @nodes
|
if @nodes
|
||||||
@nodes.each do |member|
|
@nodes.each do |member|
|
||||||
member.disconnect
|
member.close
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -339,7 +332,7 @@ module Mongo
|
|||||||
private
|
private
|
||||||
|
|
||||||
def initiate_auto_refresh
|
def initiate_auto_refresh
|
||||||
return unless @auto_refresh
|
if @auto_refresh
|
||||||
return if @refresh_thread && @refresh_thread.alive?
|
return if @refresh_thread && @refresh_thread.alive?
|
||||||
@refresh_thread = Thread.new do
|
@refresh_thread = Thread.new do
|
||||||
while true do
|
while true do
|
||||||
@ -347,6 +340,9 @@ module Mongo
|
|||||||
refresh
|
refresh
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
else
|
||||||
|
@last_refresh = Time.now
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Checkout a socket for reading (i.e., a secondary node).
|
# Checkout a socket for reading (i.e., a secondary node).
|
||||||
@ -444,6 +440,11 @@ module Mongo
|
|||||||
socket.close
|
socket.close
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
if !@auto_refresh &&
|
||||||
|
((Time.now - @last_refresh) > @refresh_interval)
|
||||||
|
refresh
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -20,10 +20,6 @@ module Mongo
|
|||||||
end
|
end
|
||||||
alias :== :eql?
|
alias :== :eql?
|
||||||
|
|
||||||
def close
|
|
||||||
self.socket.close if self.socket
|
|
||||||
end
|
|
||||||
|
|
||||||
def host_string
|
def host_string
|
||||||
address
|
address
|
||||||
end
|
end
|
||||||
@ -60,7 +56,7 @@ module Mongo
|
|||||||
self.socket = socket
|
self.socket = socket
|
||||||
end
|
end
|
||||||
|
|
||||||
def disconnect
|
def close
|
||||||
if self.socket
|
if self.socket
|
||||||
self.socket.close
|
self.socket.close
|
||||||
self.socket = nil
|
self.socket = nil
|
||||||
|
@ -53,7 +53,7 @@ module Mongo
|
|||||||
|
|
||||||
def close
|
def close
|
||||||
@connection_mutex.synchronize do
|
@connection_mutex.synchronize do
|
||||||
(@sockets - @checked_out).each do |sock|
|
@sockets.each do |sock|
|
||||||
begin
|
begin
|
||||||
sock.close
|
sock.close
|
||||||
rescue IOError => ex
|
rescue IOError => ex
|
||||||
@ -140,6 +140,7 @@ module Mongo
|
|||||||
rescue => ex
|
rescue => ex
|
||||||
socket.close if socket
|
socket.close if socket
|
||||||
raise ConnectionFailure, "Failed to connect to host #{@host} and port #{@port}: #{ex}"
|
raise ConnectionFailure, "Failed to connect to host #{@host} and port #{@port}: #{ex}"
|
||||||
|
@node.close if @node
|
||||||
end
|
end
|
||||||
|
|
||||||
# If any saved authentications exist, we want to apply those
|
# If any saved authentications exist, we want to apply those
|
||||||
|
@ -9,6 +9,7 @@ module Mongo
|
|||||||
@connection = connection
|
@connection = connection
|
||||||
@seeds = seeds
|
@seeds = seeds
|
||||||
@refresh_node = nil
|
@refresh_node = nil
|
||||||
|
@previously_connected = false
|
||||||
end
|
end
|
||||||
|
|
||||||
def inspect
|
def inspect
|
||||||
@ -16,16 +17,28 @@ module Mongo
|
|||||||
end
|
end
|
||||||
|
|
||||||
def connect
|
def connect
|
||||||
|
if @previously_connected
|
||||||
|
close
|
||||||
|
end
|
||||||
|
|
||||||
initialize_data
|
initialize_data
|
||||||
members = connect_to_members
|
members = connect_to_members
|
||||||
initialize_pools(members)
|
initialize_pools(members)
|
||||||
update_seed_list(members)
|
update_seed_list(members)
|
||||||
|
|
||||||
@members = members
|
@members = members
|
||||||
|
@previously_connected = true
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
def healthy?
|
||||||
|
if !@refresh_node || !refresh_node.set_config
|
||||||
|
return false
|
||||||
|
end
|
||||||
|
|
||||||
def initialize_data
|
#if refresh_node.node_list
|
||||||
|
end
|
||||||
|
|
||||||
|
def close
|
||||||
begin
|
begin
|
||||||
if @primary_pool
|
if @primary_pool
|
||||||
@primary_pool.close
|
@primary_pool.close
|
||||||
@ -45,7 +58,11 @@ module Mongo
|
|||||||
|
|
||||||
rescue ConnectionFailure
|
rescue ConnectionFailure
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def initialize_data
|
||||||
@primary = nil
|
@primary = nil
|
||||||
@primary_pool = nil
|
@primary_pool = nil
|
||||||
@read_pool = nil
|
@read_pool = nil
|
||||||
@ -71,6 +88,7 @@ module Mongo
|
|||||||
members << node
|
members << node
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
seed.close
|
||||||
|
|
||||||
if members.empty?
|
if members.empty?
|
||||||
raise ConnectionFailure, "Failed to connect to any given member."
|
raise ConnectionFailure, "Failed to connect to any given member."
|
||||||
@ -174,7 +192,7 @@ module Mongo
|
|||||||
if node.connect && node.set_config
|
if node.connect && node.set_config
|
||||||
return node
|
return node
|
||||||
else
|
else
|
||||||
node.disconnect
|
node.close
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user