From c2070bb90a2fca6dd1a08552b3bf447de90690d3 Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Wed, 12 Oct 2011 17:13:48 -0400 Subject: [PATCH] minor: simply and refactor auto refresh --- lib/mongo/repl_set_connection.rb | 90 ++++++++++++------- lib/mongo/util/node.rb | 3 +- lib/mongo/util/pool.rb | 7 +- lib/mongo/util/pool_manager.rb | 104 +++++++++++++++++----- test/replica_sets/read_preference_test.rb | 6 +- 5 files changed, 154 insertions(+), 56 deletions(-) diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index 95e5741..10617fe 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -23,7 +23,7 @@ module Mongo # Instantiates and manages connections to a MongoDB replica set. class ReplSetConnection < Connection attr_reader :secondaries, :arbiters, :secondary_pools, - :replica_set_name, :read_pool, :seeds, :tags_to_pools, + :replica_set_name, :read_pool, :seeds, :primary_tag_pool, :refresh_interval, :refresh_mode # Create a connection to a MongoDB replica set. @@ -143,7 +143,7 @@ module Mongo # Maps @sockets_to_pools = {} - @tags_to_pools = {} + @primary_tag_pool = nil # Replica set name if opts[:rs_name] @@ -186,35 +186,44 @@ module Mongo end end - # Note: this method must be called from within - # an exclusive lock. - def update_config(manager) - @arbiters = manager.arbiters.nil? ? [] : manager.arbiters.dup - @primary = manager.primary.nil? ? nil : manager.primary.dup - @secondaries = manager.secondaries.dup - @hosts = manager.hosts.dup + # Refresh the current replica set configuration. + # This method will attempt to do a soft refresh, + # updating only those parts of the replica set that + # have changed. If that's not possible, the method + # will perform a hard refresh. + # + # @return [Boolean] +true+ if hard refresh + # occurred. +false+ is returned when unable + # to get the refresh lock. + def refresh(opts={}) + if !connected? + @logger.warn("Not connected") + end - @primary_pool = manager.primary_pool - @read_pool = manager.read_pool - @secondary_pools = manager.secondary_pools - @tags_to_pools = manager.tags_to_pools - @seeds = manager.seeds - @manager = manager - @nodes = manager.nodes - @max_bson_size = manager.max_bson_size + log(:info, "Checking replica set connection health...") + @manager.check_connection_health + + if @manager.refresh_required? + hard_refresh! + end + + return true end - # Refresh the current replica set configuration. - def refresh(opts={}) - return false if !connected? + # Force a hard refresh of this connection's view + # of the replica set. + # + # @return [Boolean] +true+ if hard refresh + # occurred. +false+ is returned when unable + # to get the refresh lock. + def hard_refresh! + return false if sync_exclusive? - # Return if another thread is already in the process of refreshing. - return if sync_exclusive? + log(:info, "Initiating hard refresh...") + @background_manager = PoolManager.new(self, @seeds) + @background_manager.connect sync_synchronize(:EX) do - log(:info, "Refreshing...") - @background_manager = PoolManager.new(self, @seeds) - @background_manager.connect update_config(@background_manager) end @@ -292,10 +301,10 @@ module Mongo end end - @secondaries = [] - @secondary_pools = [] - @arbiters = [] - @tags_to_pools.clear + @secondaries = [] + @secondary_pools = [] + @arbiters = [] + @primary_tag_pool = nil @sockets_to_pools.clear end end @@ -336,6 +345,27 @@ module Mongo private + # Given a pool manager, update this connection's + # view of the replica set. + # + # This method must be called within + # an exclusive lock. + def update_config(manager) + @arbiters = manager.arbiters.nil? ? [] : manager.arbiters.dup + @primary = manager.primary.nil? ? nil : manager.primary.dup + @secondaries = manager.secondaries.dup + @hosts = manager.hosts.dup + + @primary_pool = manager.primary_pool + @read_pool = manager.read_pool + @secondary_pools = manager.secondary_pools + @primary_tag_pool = manager.primary_tag_pool + @seeds = manager.seeds + @manager = manager + @nodes = manager.nodes + @max_bson_size = manager.max_bson_size + end + def initiate_refresh_mode if @refresh_mode == :async return if @refresh_thread && @refresh_thread.alive? @@ -377,7 +407,7 @@ module Mongo def checkout_tagged(tags) sync_synchronize(:SH) do tags.each do |k, v| - pool = @tags_to_pools[{k.to_s => v}] + pool = @primary_tag_pool[{k.to_s => v}] if pool socket = pool.checkout @sockets_to_pools[socket] = pool diff --git a/lib/mongo/util/node.rb b/lib/mongo/util/node.rb index 8402808..de639b9 100644 --- a/lib/mongo/util/node.rb +++ b/lib/mongo/util/node.rb @@ -1,7 +1,8 @@ module Mongo class Node - attr_accessor :host, :port, :address, :config, :connection, :socket + attr_accessor :host, :port, :address, :config, :connection, :socket, + :last_state def initialize(connection, data) @connection = connection diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index fdaa1b1..720b9d4 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -19,10 +19,10 @@ module Mongo class Pool PING_ATTEMPTS = 6 - attr_accessor :host, :port, :size, :timeout, :safe, :checked_out, :connection + attr_accessor :host, :port, :address, + :size, :timeout, :safe, :checked_out, :connection # Create a new pool of connections. - # def initialize(connection, host, port, opts={}) @connection = connection @@ -31,6 +31,9 @@ module Mongo # A Mongo::Node object. @node = opts[:node] + # The string address + @address = "#{@host}:#{@port}" + # Pool size and timeout. @size = opts[:size] || 1 @timeout = opts[:timeout] || 5.0 diff --git a/lib/mongo/util/pool_manager.rb b/lib/mongo/util/pool_manager.rb index 4b1e48c..a273d1c 100644 --- a/lib/mongo/util/pool_manager.rb +++ b/lib/mongo/util/pool_manager.rb @@ -3,7 +3,7 @@ module Mongo attr_reader :connection, :seeds, :arbiters, :primary, :secondaries, :primary_pool, :read_pool, :secondary_pools, :hosts, :nodes, :max_bson_size, - :tags_to_pools, :members + :tags_to_pools, :primary_tag_pool, :members def initialize(connection, seeds) @connection = connection @@ -24,13 +24,50 @@ module Mongo members = connect_to_members initialize_pools(members) update_seed_list(members) + set_read_pool + set_primary_tag_pools @members = members @previously_connected = true end - def healthy? + # We're healthy if all members are pingable and if the view + # of the replica set returned by isMaster is equivalent + # to our view. If any of these isn't the case, + # set @refresh_require to true, and return. + def check_connection_health + @refresh_required = false + begin + seed = get_valid_seed_node + rescue ConnectionFailure + @refresh_required = true + return + end + + config = seed.set_config + if !config + @refresh_required = true + return + end + + config['hosts'].each do |host| + member = @members.detect do |m| + m.address == host + end + + if member && validate_existing_member(member) + next + else + @refresh_required = true + return false + end + end + end + + # The replica set connection should initiate a full refresh. + def refresh_required? + @refresh_required end def close @@ -57,6 +94,24 @@ module Mongo private + def validate_existing_member(member) + config = member.set_config + if !config + return false + else + if member.primary? + if member.last_state == :primary + return true + else # This node is now primary, but didn't used to be. + return false + end + elsif member.last_state == :secondary && + member.secondary? + return true + end + end + end + def initialize_data @primary = nil @primary_pool = nil @@ -67,6 +122,7 @@ module Mongo @hosts = Set.new @members = Set.new @tags_to_pools = {} + @primary_tag_pool = {} end # Connect to each member of the replica set @@ -105,40 +161,46 @@ module Mongo @hosts << member.host_string if member.primary? - @primary = member.host_port - @primary_pool = Pool.new(self.connection, member.host, member.port, - :size => self.connection.pool_size, - :timeout => self.connection.connect_timeout, - :node => member) - associate_tags_with_pool(member.tags, @primary_pool) + assign_primary(member) elsif member.secondary? && !@secondaries.include?(member.host_port) - @secondaries << member.host_port - pool = Pool.new(self.connection, member.host, member.port, - :size => self.connection.pool_size, - :timeout => self.connection.connect_timeout, - :node => member) - @secondary_pools << pool - associate_tags_with_pool(member.tags, pool) + assign_secondary(member) end end - @max_bson_size = members.first.config['maxBsonObjectSize'] || Mongo::DEFAULT_MAX_BSON_SIZE @arbiters = members.first.arbiters + end - set_read_pool - set_primary_tag_pools + def assign_primary(member) + member.last_state = :primary + @primary = member.host_port + @primary_pool = Pool.new(self.connection, member.host, member.port, + :size => self.connection.pool_size, + :timeout => self.connection.connect_timeout, + :node => member) + associate_tags_with_pool(member.tags, @primary_pool) + end + + def assign_secondary(member) + member.last_state = :secondary + @secondaries << member.host_port + pool = Pool.new(self.connection, member.host, member.port, + :size => self.connection.pool_size, + :timeout => self.connection.connect_timeout, + :node => member) + @secondary_pools << pool + associate_tags_with_pool(member.tags, pool) end # If there's more than one pool associated with # a given tag, choose a close one using the bucket method. def set_primary_tag_pools - @tags_to_pools.each do |k, pool_list| + @tags_to_pools.each do |key, pool_list| if pool_list.length == 1 - @tags_to_pools[k] = pool_list.first + @primary_tag_pool[key] = pool_list.first else - @tags_to_pools[k] = nearby_pool_from_set(pool_list) + @primary_tag_pool[key] = nearby_pool_from_set(pool_list) end end end diff --git a/test/replica_sets/read_preference_test.rb b/test/replica_sets/read_preference_test.rb index bd8801e..9057e40 100644 --- a/test/replica_sets/read_preference_test.rb +++ b/test/replica_sets/read_preference_test.rb @@ -1,18 +1,20 @@ $:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) require './test/replica_sets/rs_test_helper' +require 'logger' # TODO: enable this once we enable reads from tags. class ReadPreferenceTest < Test::Unit::TestCase include Mongo def test_long_write_with_async_refresh + log = Logger.new("test.log") conn = ReplSetConnection.new([RS.host, RS.ports[0], RS.host, RS.ports[1]], :read => :secondary, :pool_size => 50, - :refresh_mode => :async, :refresh_interval => 5) + :refresh_mode => :sync, :refresh_interval => 5, :logger => log) db = conn.db(MONGO_TEST_DB) db.drop_collection("test-sets") - col = @db['mongo-test'] + col = db['mongo-test'] 100000.times do |n| col.insert({:n => n, :str => "0000000000"})