Fixed threaded replica set reconnection.
This commit is contained in:
parent
6301a41254
commit
ebfe279784
|
@ -123,6 +123,8 @@ module Mongo
|
|||
@safe_mutex_lock = Mutex.new
|
||||
@safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}
|
||||
|
||||
@connect_mutex = Mutex.new
|
||||
|
||||
check_opts(opts)
|
||||
setup(opts)
|
||||
end
|
||||
|
@ -139,22 +141,24 @@ module Mongo
|
|||
# Initiate a connection to the replica set.
|
||||
def connect
|
||||
log(:info, "Connecting...")
|
||||
return if @connected
|
||||
@connect_mutex.synchronize do
|
||||
return if @connected
|
||||
|
||||
discovered_seeds = @manager ? @manager.seeds : []
|
||||
@manager = PoolManager.new(self, discovered_seeds)
|
||||
discovered_seeds = @manager ? @manager.seeds : []
|
||||
@manager = PoolManager.new(self, discovered_seeds)
|
||||
|
||||
@manager.connect
|
||||
@refresh_version += 1
|
||||
@manager.connect
|
||||
@refresh_version += 1
|
||||
|
||||
if @require_primary && self.primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
|
||||
close
|
||||
raise ConnectionFailure, "Failed to connect to primary node."
|
||||
elsif self.read_pool.nil?
|
||||
close
|
||||
raise ConnectionFailure, "Failed to connect to any node."
|
||||
else
|
||||
@connected = true
|
||||
if @require_primary && self.primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
|
||||
close
|
||||
raise ConnectionFailure, "Failed to connect to primary node."
|
||||
elsif self.read_pool.nil?
|
||||
close
|
||||
raise ConnectionFailure, "Failed to connect to any node."
|
||||
else
|
||||
@connected = true
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -312,10 +316,11 @@ module Mongo
|
|||
if socket
|
||||
socket
|
||||
else
|
||||
@connected = false
|
||||
raise ConnectionFailure.new("Could not connect to a node for reading.")
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
def checkout_secondary
|
||||
if connected?
|
||||
sync_refresh
|
||||
|
@ -332,6 +337,7 @@ module Mongo
|
|||
if socket
|
||||
socket
|
||||
else
|
||||
@connected = false
|
||||
raise ConnectionFailure.new("Could not connect to a secondary for reading.")
|
||||
end
|
||||
end
|
||||
|
@ -358,6 +364,7 @@ module Mongo
|
|||
if socket
|
||||
socket
|
||||
else
|
||||
@connected = false
|
||||
raise ConnectionFailure.new("Could not connect to primary node.")
|
||||
end
|
||||
end
|
||||
|
@ -423,7 +430,7 @@ module Mongo
|
|||
def read_pool
|
||||
@manager ? @manager.read_pool : nil
|
||||
end
|
||||
|
||||
|
||||
def secondary_pool
|
||||
@manager ? @manager.secondary_pool : nil
|
||||
end
|
||||
|
@ -482,9 +489,9 @@ module Mongo
|
|||
else
|
||||
@replica_set_name = opts[:name]
|
||||
end
|
||||
|
||||
|
||||
opts[:connect_timeout] = opts[:connect_timeout] || 30
|
||||
|
||||
|
||||
super opts
|
||||
end
|
||||
|
||||
|
@ -507,10 +514,12 @@ module Mongo
|
|||
end
|
||||
|
||||
def sync_refresh
|
||||
if @refresh_mode == :sync &&
|
||||
((Time.now - @last_refresh) > @refresh_interval)
|
||||
@last_refresh = Time.now
|
||||
refresh
|
||||
@connect_mutex.synchronize do
|
||||
if @refresh_mode == :sync &&
|
||||
((Time.now - @last_refresh) > @refresh_interval)
|
||||
@last_refresh = Time.now
|
||||
refresh
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -7,7 +7,7 @@ class ReplicaSetPooledInsertTest < Test::Unit::TestCase
|
|||
|
||||
def setup
|
||||
ensure_rs
|
||||
@conn = ReplSetConnection.new(build_seeds(3), :pool_size => 5, :timeout => 5, :refresh_mode => false)
|
||||
@conn = ReplSetConnection.new(build_seeds(3), :pool_size => 10, :timeout => 5, :refresh_mode => false)
|
||||
@db = @conn.db(MONGO_TEST_DB)
|
||||
@db.drop_collection("test-sets")
|
||||
@coll = @db.collection("test-sets")
|
||||
|
|
Loading…
Reference in New Issue