Cleanup for proxying to PoolManager.
This commit is contained in:
parent
f98c1099dc
commit
1f068ce127
@ -101,26 +101,12 @@ module Mongo
|
|||||||
# TODO: get rid of this
|
# TODO: get rid of this
|
||||||
@nodes = @seeds.dup
|
@nodes = @seeds.dup
|
||||||
|
|
||||||
# Connection pool for primary node
|
|
||||||
@primary = nil
|
|
||||||
@primary_pool = nil
|
|
||||||
|
|
||||||
# Connection pools for each secondary node
|
|
||||||
@secondaries = []
|
|
||||||
@secondary_pools = []
|
|
||||||
|
|
||||||
# The secondary pool to which we'll be sending reads.
|
|
||||||
# This may be identical to the primary pool.
|
|
||||||
@read_pool = nil
|
|
||||||
|
|
||||||
# A list of arbiter addresses (for client information only)
|
|
||||||
@arbiters = []
|
|
||||||
|
|
||||||
# Refresh
|
# Refresh
|
||||||
@refresh_mode = opts.fetch(:refresh_mode, false)
|
@refresh_mode = opts.fetch(:refresh_mode, false)
|
||||||
@refresh_interval = opts[:refresh_interval] || 90
|
@refresh_interval = opts[:refresh_interval] || 90
|
||||||
@last_refresh = Time.now
|
@last_refresh = Time.now
|
||||||
|
|
||||||
|
# No connection manager by default.
|
||||||
@manager = nil
|
@manager = nil
|
||||||
|
|
||||||
if ![:sync, :async, false].include?(@refresh_mode)
|
if ![:sync, :async, false].include?(@refresh_mode)
|
||||||
@ -179,9 +165,11 @@ module Mongo
|
|||||||
update_config(manager)
|
update_config(manager)
|
||||||
initiate_refresh_mode
|
initiate_refresh_mode
|
||||||
|
|
||||||
if @require_primary && @primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
|
if @require_primary && self.primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
|
||||||
|
close
|
||||||
raise ConnectionFailure, "Failed to connect to primary node."
|
raise ConnectionFailure, "Failed to connect to primary node."
|
||||||
elsif !@read_pool
|
elsif self.read_pool.nil?
|
||||||
|
close
|
||||||
raise ConnectionFailure, "Failed to connect to any node."
|
raise ConnectionFailure, "Failed to connect to any node."
|
||||||
else
|
else
|
||||||
@connected = true
|
@connected = true
|
||||||
@ -234,7 +222,7 @@ module Mongo
|
|||||||
end
|
end
|
||||||
|
|
||||||
def connected?
|
def connected?
|
||||||
!@primary_pool.nil? || !@read_pool.nil?
|
self.primary_pool || self.read_pool
|
||||||
end
|
end
|
||||||
|
|
||||||
# @deprecated
|
# @deprecated
|
||||||
@ -268,9 +256,7 @@ module Mongo
|
|||||||
#
|
#
|
||||||
# @return [Boolean]
|
# @return [Boolean]
|
||||||
def read_primary?
|
def read_primary?
|
||||||
sync_synchronize(:SH) do
|
self.read_pool == self.primary_pool
|
||||||
@read_pool == @primary_pool
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
alias :primary? :read_primary?
|
alias :primary? :read_primary?
|
||||||
|
|
||||||
@ -289,18 +275,7 @@ module Mongo
|
|||||||
@refresh_thread = nil
|
@refresh_thread = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
@read_pool = nil
|
@manager.close if @manager
|
||||||
|
|
||||||
if @secondary_pools
|
|
||||||
@secondary_pools.each do |pool|
|
|
||||||
pool.close
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
@secondaries = []
|
|
||||||
@secondary_pools = []
|
|
||||||
@arbiters = []
|
|
||||||
@tag_map = nil
|
|
||||||
@sockets_to_pools.clear
|
@sockets_to_pools.clear
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@ -327,14 +302,14 @@ module Mongo
|
|||||||
|
|
||||||
def authenticate_pools
|
def authenticate_pools
|
||||||
super
|
super
|
||||||
@secondary_pools.each do |pool|
|
self.secondary_pools.each do |pool|
|
||||||
pool.authenticate_existing
|
pool.authenticate_existing
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def logout_pools(db)
|
def logout_pools(db)
|
||||||
super
|
super
|
||||||
@secondary_pools.each do |pool|
|
self.secondary_pools.each do |pool|
|
||||||
pool.logout_existing(db)
|
pool.logout_existing(db)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@ -344,11 +319,11 @@ module Mongo
|
|||||||
# if no read pool has been defined.
|
# if no read pool has been defined.
|
||||||
def checkout_reader
|
def checkout_reader
|
||||||
connect unless connected?
|
connect unless connected?
|
||||||
socket = get_socket_from_pool(@read_pool)
|
socket = get_socket_from_pool(self.read_pool)
|
||||||
|
|
||||||
if !socket
|
if !socket
|
||||||
refresh
|
refresh
|
||||||
socket = get_socket_from_pool(@primary_pool)
|
socket = get_socket_from_pool(self.primary_pool)
|
||||||
end
|
end
|
||||||
|
|
||||||
if socket
|
if socket
|
||||||
@ -361,11 +336,11 @@ module Mongo
|
|||||||
# Checkout a socket for writing (i.e., a primary node).
|
# Checkout a socket for writing (i.e., a primary node).
|
||||||
def checkout_writer
|
def checkout_writer
|
||||||
connect unless connected?
|
connect unless connected?
|
||||||
socket = get_socket_from_pool(@primary_pool)
|
socket = get_socket_from_pool(self.primary_pool)
|
||||||
|
|
||||||
if !socket
|
if !socket
|
||||||
refresh
|
refresh
|
||||||
socket = get_socket_from_pool(@primary_pool)
|
socket = get_socket_from_pool(self.primary_pool)
|
||||||
end
|
end
|
||||||
|
|
||||||
if socket
|
if socket
|
||||||
@ -414,19 +389,20 @@ module Mongo
|
|||||||
end
|
end
|
||||||
|
|
||||||
def arbiters
|
def arbiters
|
||||||
@manager.arbiters.nil? ? [] : manager.arbiters.dup
|
@manager.arbiters.nil? ? [] : @manager.arbiters
|
||||||
end
|
end
|
||||||
|
|
||||||
def primary
|
def primary
|
||||||
@manager.primary.nil? ? nil : manager.primary.dup
|
@manager.primary
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Note: might want to freeze these after connecting.
|
||||||
def secondaries
|
def secondaries
|
||||||
@manager.secondaries.dup
|
@manager.secondaries
|
||||||
end
|
end
|
||||||
|
|
||||||
def hosts
|
def hosts
|
||||||
@manager.hosts.dup
|
@manager.hosts
|
||||||
end
|
end
|
||||||
|
|
||||||
def primary_pool
|
def primary_pool
|
||||||
@ -464,6 +440,9 @@ module Mongo
|
|||||||
old_manager.close if old_manager
|
old_manager.close if old_manager
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# If we're using async refresh, start
|
||||||
|
# a background thread to run the refresh method
|
||||||
|
# every @refresh_interval seconds.
|
||||||
def initiate_refresh_mode
|
def initiate_refresh_mode
|
||||||
if @refresh_mode == :async
|
if @refresh_mode == :async
|
||||||
return if @refresh_thread && @refresh_thread.alive?
|
return if @refresh_thread && @refresh_thread.alive?
|
||||||
@ -486,7 +465,7 @@ module Mongo
|
|||||||
def checkout_tagged(tags)
|
def checkout_tagged(tags)
|
||||||
sync_synchronize(:SH) do
|
sync_synchronize(:SH) do
|
||||||
tags.each do |k, v|
|
tags.each do |k, v|
|
||||||
pool = @tag_map[{k.to_s => v}]
|
pool = self.tag_map[{k.to_s => v}]
|
||||||
if pool
|
if pool
|
||||||
socket = pool.checkout
|
socket = pool.checkout
|
||||||
@sockets_to_pools[socket] = pool
|
@sockets_to_pools[socket] = pool
|
||||||
|
Loading…
Reference in New Issue
Block a user