minor: simply and refactor auto refresh
This commit is contained in:
parent
566d1a844f
commit
c2070bb90a
|
@ -23,7 +23,7 @@ module Mongo
|
|||
# Instantiates and manages connections to a MongoDB replica set.
|
||||
class ReplSetConnection < Connection
|
||||
attr_reader :secondaries, :arbiters, :secondary_pools,
|
||||
:replica_set_name, :read_pool, :seeds, :tags_to_pools,
|
||||
:replica_set_name, :read_pool, :seeds, :primary_tag_pool,
|
||||
:refresh_interval, :refresh_mode
|
||||
|
||||
# Create a connection to a MongoDB replica set.
|
||||
|
@ -143,7 +143,7 @@ module Mongo
|
|||
|
||||
# Maps
|
||||
@sockets_to_pools = {}
|
||||
@tags_to_pools = {}
|
||||
@primary_tag_pool = nil
|
||||
|
||||
# Replica set name
|
||||
if opts[:rs_name]
|
||||
|
@ -186,35 +186,44 @@ module Mongo
|
|||
end
|
||||
end
|
||||
|
||||
# Note: this method must be called from within
|
||||
# an exclusive lock.
|
||||
def update_config(manager)
|
||||
@arbiters = manager.arbiters.nil? ? [] : manager.arbiters.dup
|
||||
@primary = manager.primary.nil? ? nil : manager.primary.dup
|
||||
@secondaries = manager.secondaries.dup
|
||||
@hosts = manager.hosts.dup
|
||||
|
||||
@primary_pool = manager.primary_pool
|
||||
@read_pool = manager.read_pool
|
||||
@secondary_pools = manager.secondary_pools
|
||||
@tags_to_pools = manager.tags_to_pools
|
||||
@seeds = manager.seeds
|
||||
@manager = manager
|
||||
@nodes = manager.nodes
|
||||
@max_bson_size = manager.max_bson_size
|
||||
# Refresh the current replica set configuration.
|
||||
# This method will attempt to do a soft refresh,
|
||||
# updating only those parts of the replica set that
|
||||
# have changed. If that's not possible, the method
|
||||
# will perform a hard refresh.
|
||||
#
|
||||
# @return [Boolean] +true+ if hard refresh
|
||||
# occurred. +false+ is returned when unable
|
||||
# to get the refresh lock.
|
||||
def refresh(opts={})
|
||||
if !connected?
|
||||
@logger.warn("Not connected")
|
||||
end
|
||||
|
||||
# Refresh the current replica set configuration.
|
||||
def refresh(opts={})
|
||||
return false if !connected?
|
||||
log(:info, "Checking replica set connection health...")
|
||||
@manager.check_connection_health
|
||||
|
||||
# Return if another thread is already in the process of refreshing.
|
||||
return if sync_exclusive?
|
||||
if @manager.refresh_required?
|
||||
hard_refresh!
|
||||
end
|
||||
|
||||
sync_synchronize(:EX) do
|
||||
log(:info, "Refreshing...")
|
||||
return true
|
||||
end
|
||||
|
||||
# Force a hard refresh of this connection's view
|
||||
# of the replica set.
|
||||
#
|
||||
# @return [Boolean] +true+ if hard refresh
|
||||
# occurred. +false+ is returned when unable
|
||||
# to get the refresh lock.
|
||||
def hard_refresh!
|
||||
return false if sync_exclusive?
|
||||
|
||||
log(:info, "Initiating hard refresh...")
|
||||
@background_manager = PoolManager.new(self, @seeds)
|
||||
@background_manager.connect
|
||||
|
||||
sync_synchronize(:EX) do
|
||||
update_config(@background_manager)
|
||||
end
|
||||
|
||||
|
@ -295,7 +304,7 @@ module Mongo
|
|||
@secondaries = []
|
||||
@secondary_pools = []
|
||||
@arbiters = []
|
||||
@tags_to_pools.clear
|
||||
@primary_tag_pool = nil
|
||||
@sockets_to_pools.clear
|
||||
end
|
||||
end
|
||||
|
@ -336,6 +345,27 @@ module Mongo
|
|||
|
||||
private
|
||||
|
||||
# Given a pool manager, update this connection's
|
||||
# view of the replica set.
|
||||
#
|
||||
# This method must be called within
|
||||
# an exclusive lock.
|
||||
def update_config(manager)
|
||||
@arbiters = manager.arbiters.nil? ? [] : manager.arbiters.dup
|
||||
@primary = manager.primary.nil? ? nil : manager.primary.dup
|
||||
@secondaries = manager.secondaries.dup
|
||||
@hosts = manager.hosts.dup
|
||||
|
||||
@primary_pool = manager.primary_pool
|
||||
@read_pool = manager.read_pool
|
||||
@secondary_pools = manager.secondary_pools
|
||||
@primary_tag_pool = manager.primary_tag_pool
|
||||
@seeds = manager.seeds
|
||||
@manager = manager
|
||||
@nodes = manager.nodes
|
||||
@max_bson_size = manager.max_bson_size
|
||||
end
|
||||
|
||||
def initiate_refresh_mode
|
||||
if @refresh_mode == :async
|
||||
return if @refresh_thread && @refresh_thread.alive?
|
||||
|
@ -377,7 +407,7 @@ module Mongo
|
|||
def checkout_tagged(tags)
|
||||
sync_synchronize(:SH) do
|
||||
tags.each do |k, v|
|
||||
pool = @tags_to_pools[{k.to_s => v}]
|
||||
pool = @primary_tag_pool[{k.to_s => v}]
|
||||
if pool
|
||||
socket = pool.checkout
|
||||
@sockets_to_pools[socket] = pool
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
module Mongo
|
||||
class Node
|
||||
|
||||
attr_accessor :host, :port, :address, :config, :connection, :socket
|
||||
attr_accessor :host, :port, :address, :config, :connection, :socket,
|
||||
:last_state
|
||||
|
||||
def initialize(connection, data)
|
||||
@connection = connection
|
||||
|
|
|
@ -19,10 +19,10 @@ module Mongo
|
|||
class Pool
|
||||
PING_ATTEMPTS = 6
|
||||
|
||||
attr_accessor :host, :port, :size, :timeout, :safe, :checked_out, :connection
|
||||
attr_accessor :host, :port, :address,
|
||||
:size, :timeout, :safe, :checked_out, :connection
|
||||
|
||||
# Create a new pool of connections.
|
||||
#
|
||||
def initialize(connection, host, port, opts={})
|
||||
@connection = connection
|
||||
|
||||
|
@ -31,6 +31,9 @@ module Mongo
|
|||
# A Mongo::Node object.
|
||||
@node = opts[:node]
|
||||
|
||||
# The string address
|
||||
@address = "#{@host}:#{@port}"
|
||||
|
||||
# Pool size and timeout.
|
||||
@size = opts[:size] || 1
|
||||
@timeout = opts[:timeout] || 5.0
|
||||
|
|
|
@ -3,7 +3,7 @@ module Mongo
|
|||
|
||||
attr_reader :connection, :seeds, :arbiters, :primary, :secondaries,
|
||||
:primary_pool, :read_pool, :secondary_pools, :hosts, :nodes, :max_bson_size,
|
||||
:tags_to_pools, :members
|
||||
:tags_to_pools, :primary_tag_pool, :members
|
||||
|
||||
def initialize(connection, seeds)
|
||||
@connection = connection
|
||||
|
@ -24,13 +24,50 @@ module Mongo
|
|||
members = connect_to_members
|
||||
initialize_pools(members)
|
||||
update_seed_list(members)
|
||||
set_read_pool
|
||||
set_primary_tag_pools
|
||||
|
||||
@members = members
|
||||
@previously_connected = true
|
||||
end
|
||||
|
||||
def healthy?
|
||||
# We're healthy if all members are pingable and if the view
|
||||
# of the replica set returned by isMaster is equivalent
|
||||
# to our view. If any of these isn't the case,
|
||||
# set @refresh_require to true, and return.
|
||||
def check_connection_health
|
||||
@refresh_required = false
|
||||
|
||||
begin
|
||||
seed = get_valid_seed_node
|
||||
rescue ConnectionFailure
|
||||
@refresh_required = true
|
||||
return
|
||||
end
|
||||
|
||||
config = seed.set_config
|
||||
if !config
|
||||
@refresh_required = true
|
||||
return
|
||||
end
|
||||
|
||||
config['hosts'].each do |host|
|
||||
member = @members.detect do |m|
|
||||
m.address == host
|
||||
end
|
||||
|
||||
if member && validate_existing_member(member)
|
||||
next
|
||||
else
|
||||
@refresh_required = true
|
||||
return false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# The replica set connection should initiate a full refresh.
|
||||
def refresh_required?
|
||||
@refresh_required
|
||||
end
|
||||
|
||||
def close
|
||||
|
@ -57,6 +94,24 @@ module Mongo
|
|||
|
||||
private
|
||||
|
||||
def validate_existing_member(member)
|
||||
config = member.set_config
|
||||
if !config
|
||||
return false
|
||||
else
|
||||
if member.primary?
|
||||
if member.last_state == :primary
|
||||
return true
|
||||
else # This node is now primary, but didn't used to be.
|
||||
return false
|
||||
end
|
||||
elsif member.last_state == :secondary &&
|
||||
member.secondary?
|
||||
return true
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def initialize_data
|
||||
@primary = nil
|
||||
@primary_pool = nil
|
||||
|
@ -67,6 +122,7 @@ module Mongo
|
|||
@hosts = Set.new
|
||||
@members = Set.new
|
||||
@tags_to_pools = {}
|
||||
@primary_tag_pool = {}
|
||||
end
|
||||
|
||||
# Connect to each member of the replica set
|
||||
|
@ -105,13 +161,29 @@ module Mongo
|
|||
@hosts << member.host_string
|
||||
|
||||
if member.primary?
|
||||
assign_primary(member)
|
||||
elsif member.secondary? && !@secondaries.include?(member.host_port)
|
||||
assign_secondary(member)
|
||||
end
|
||||
end
|
||||
|
||||
@max_bson_size = members.first.config['maxBsonObjectSize'] ||
|
||||
Mongo::DEFAULT_MAX_BSON_SIZE
|
||||
@arbiters = members.first.arbiters
|
||||
end
|
||||
|
||||
def assign_primary(member)
|
||||
member.last_state = :primary
|
||||
@primary = member.host_port
|
||||
@primary_pool = Pool.new(self.connection, member.host, member.port,
|
||||
:size => self.connection.pool_size,
|
||||
:timeout => self.connection.connect_timeout,
|
||||
:node => member)
|
||||
associate_tags_with_pool(member.tags, @primary_pool)
|
||||
elsif member.secondary? && !@secondaries.include?(member.host_port)
|
||||
end
|
||||
|
||||
def assign_secondary(member)
|
||||
member.last_state = :secondary
|
||||
@secondaries << member.host_port
|
||||
pool = Pool.new(self.connection, member.host, member.port,
|
||||
:size => self.connection.pool_size,
|
||||
|
@ -120,25 +192,15 @@ module Mongo
|
|||
@secondary_pools << pool
|
||||
associate_tags_with_pool(member.tags, pool)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@max_bson_size = members.first.config['maxBsonObjectSize'] ||
|
||||
Mongo::DEFAULT_MAX_BSON_SIZE
|
||||
@arbiters = members.first.arbiters
|
||||
|
||||
set_read_pool
|
||||
set_primary_tag_pools
|
||||
end
|
||||
|
||||
# If there's more than one pool associated with
|
||||
# a given tag, choose a close one using the bucket method.
|
||||
def set_primary_tag_pools
|
||||
@tags_to_pools.each do |k, pool_list|
|
||||
@tags_to_pools.each do |key, pool_list|
|
||||
if pool_list.length == 1
|
||||
@tags_to_pools[k] = pool_list.first
|
||||
@primary_tag_pool[key] = pool_list.first
|
||||
else
|
||||
@tags_to_pools[k] = nearby_pool_from_set(pool_list)
|
||||
@primary_tag_pool[key] = nearby_pool_from_set(pool_list)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,18 +1,20 @@
|
|||
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
||||
require './test/replica_sets/rs_test_helper'
|
||||
require 'logger'
|
||||
|
||||
# TODO: enable this once we enable reads from tags.
|
||||
class ReadPreferenceTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
|
||||
def test_long_write_with_async_refresh
|
||||
log = Logger.new("test.log")
|
||||
conn = ReplSetConnection.new([RS.host, RS.ports[0], RS.host, RS.ports[1]],
|
||||
:read => :secondary, :pool_size => 50,
|
||||
:refresh_mode => :async, :refresh_interval => 5)
|
||||
:refresh_mode => :sync, :refresh_interval => 5, :logger => log)
|
||||
|
||||
db = conn.db(MONGO_TEST_DB)
|
||||
db.drop_collection("test-sets")
|
||||
col = @db['mongo-test']
|
||||
col = db['mongo-test']
|
||||
|
||||
100000.times do |n|
|
||||
col.insert({:n => n, :str => "0000000000"})
|
||||
|
|
Loading…
Reference in New Issue