diff --git a/lib/mongo.rb b/lib/mongo.rb index d305178..e0e44fa 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -57,6 +57,7 @@ require 'mongo/util/conversions' require 'mongo/util/support' require 'mongo/util/core_ext' require 'mongo/util/pool' +require 'mongo/util/pool_manager' require 'mongo/util/server_version' require 'mongo/util/uri_parser' diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 33bbd7d..f2d6bcd 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -35,8 +35,8 @@ module Mongo STANDARD_HEADER_SIZE = 16 RESPONSE_HEADER_SIZE = 20 - attr_reader :logger, :size, :auths, :primary, :safe, :primary_pool, - :host_to_try, :pool_size, :connect_timeout + attr_reader :logger, :size, :auths, :primary, :safe, :host_to_try, + :pool_size, :connect_timeout, :primary_pool # Counter for generating unique request ids. @@current_request_id = 0 diff --git a/lib/mongo/node.rb b/lib/mongo/node.rb index 81432f0..f25d24c 100644 --- a/lib/mongo/node.rb +++ b/lib/mongo/node.rb @@ -18,6 +18,10 @@ module Mongo end alias :== :eql? + def host_string + "#{@host}:#{@port}" + end + # Create a connection to the provided node, # and, if successful, return the socket. Otherwise, # return nil. @@ -55,6 +59,15 @@ module Mongo self.socket != nil end + def active? + begin + result = self.connection['admin'].command({:ping => 1}, :socket => self.socket) + return result['ok'] == 1 + rescue OperationFailure, SocketError, SystemCallError, IOError => ex + return nil + end + end + # Get the configuration for the provided node as returned by the # ismaster command. Additionally, check that the replica set name # matches with the name provided. @@ -78,7 +91,9 @@ module Mongo # Note: this excludes arbiters. def node_list connect unless connected? - return [] unless self.config + set_config + + return [] unless config nodes = [] nodes += config['hosts'] if config['hosts'] @@ -88,6 +103,7 @@ module Mongo def arbiters connect unless connected? + return [] unless config['arbiters'] config['arbiters'].map do |arbiter| split_nodes(arbiter) diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index ce07bc7..96ea90f 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -20,8 +20,8 @@ module Mongo # Instantiates and manages connections to a MongoDB replica set. class ReplSetConnection < Connection - attr_reader :nodes, :secondaries, :arbiters, :read_pool, :secondary_pools, - :replica_set_name, :ping_ranges + attr_reader :nodes, :secondaries, :arbiters, :secondary_pools, + :replica_set_name, :read_pool # Create a connection to a MongoDB replica set. # @@ -51,6 +51,12 @@ module Mongo # Disabled by default. # @option opts [Float] :connect_timeout (nil) The number of seconds to wait before timing out a # connection attempt. + # @option opts [Boolean] :auto_refresh (false) Set this to true to enable a background thread that + # periodically updates the state of the connection. If, for example, you initially connect while a secondary + # is down, :auto_refresh will reconnect to that secondary behind the scenes to + # prevent you from having to reconnect manually. + # @option opts [Integer] :refresh_interval (90) If :auto_refresh is enabled, this is the number of seconds + # that the background thread will sleep between calls to check the replica set's state. # # @example Connect to a replica set and provide two seed nodes. Note that the number of seed nodes does # not have to be equal to the number of replica set members. The purpose of seed nodes is to permit @@ -75,16 +81,16 @@ module Mongo end unless args.length > 0 - raise MongoArgumentError, "A ReplSetConnection requires at least one node." + raise MongoArgumentError, "A ReplSetConnection requires at least one seed node." end - # Get the list of seed nodes + # The list of seed nodes @seeds = args # The members of the replica set, stored as instances of Mongo::Node. @nodes = [] - # Connection pool for primay node + # Connection pool for primary node @primary = nil @primary_pool = nil @@ -93,17 +99,26 @@ module Mongo @secondary_pools = [] # The secondary pool to which we'll be sending reads. + # This may be identical to the primary pool. @read_pool = nil - # A list of arbiter address (for client information only) + # A list of arbiter addresses (for client information only) @arbiters = [] - # An array mapping secondaries by proximity - @ping_ranges = Array.new(3) { |i| Array.new } + # Refresh + @auto_refresh = opts.fetch(:auto_refresh, true) + @refresh_interval = opts[:refresh_interval] || 90 # Are we allowing reads from secondaries? @read_secondary = opts.fetch(:read_secondary, false) + # Lock around changes to the global config + @connection_lock = Mutex.new + @connected = false + + # Store the refresher thread + @refresh_thread = nil + # Replica set name if opts[:rs_name] warn ":rs_name option has been deprecated and will be removed in 2.0. " + @@ -116,29 +131,58 @@ module Mongo setup(opts) end - # Use the provided seed nodes to initiate a connection - # to the replica set. + # Initiate a connection to the replica set. def connect - connect_to_members - initialize_pools + @connection_lock.synchronize do + return if @connected + manager = PoolManager.new(self, @seeds) + manager.connect - if connected? - choose_node_for_reads - update_seed_list - BSON::BSON_CODER.update_max_bson_size(self) - else - close + update_config(manager) + #BSON::BSON_CODER.update_max_bson_size(self) + initiate_auto_refresh - if @primary.nil? + if @primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect. raise ConnectionFailure, "Failed to connect to primary node." else - raise ConnectionFailure, "Failed to connect to any given member." + @connected = true + end + end + end + + # Note: this method must be called from within + # a locked @connection_lock + def update_config(manager) + @arbiters = manager.arbiters.nil? ? [] : manager.arbiters.dup + @primary = manager.primary.nil? ? nil : manager.primary.dup + @secondaries = manager.secondaries.dup + + @primary_pool = manager.primary_pool + @read_pool = manager.read_pool + @secondary_pools = manager.secondary_pools + @seeds = manager.seeds + @manager = manager + @hosts = manager.hosts + end + + # If ismaster doesn't match our current view + # then create a new PoolManager, passing in our + # existing view. It should be able to do the diff. + # Then take out the connection lock and replace + # our current values. + def refresh + background_manager = PoolManager.new(self, @seeds) + + if update_struct = background_manager.update_required?(@hosts) + @connection_lock.synchronize do + background_manager.update(@manager, update_struct) + update_config(background_manager) end end end def connected? - @primary_pool || (!@secondary_pools.empty? && @read_secondary) + @connected && !@connection_lock.locked? end # @deprecated @@ -165,21 +209,31 @@ module Mongo # # @return [Boolean] def read_primary? - !@read_pool + @read_pool == @primary_pool end alias :primary? :read_primary? # Close the connection to the database. def close super + + @connected = false + if @refresh_thread + @refresh_thread.kill + @refresh_thread = nil + end + @nodes.each do |member| member.disconnect end + @nodes = [] @read_pool = nil + @secondary_pools.each do |pool| pool.close end + @secondaries = [] @secondary_pools = [] @arbiters = [] @@ -217,102 +271,34 @@ module Mongo private - # Iterate through the list of provided seed - # nodes until we've gotten a response from the - # replica set we're trying to connect to. - # - # If we don't get a response, raise an exception. - def get_valid_seed_node - @seeds.each do |seed| - node = Mongo::Node.new(self, seed) - if node.connect && node.set_config - return node - end + def initiate_auto_refresh + return if @refresh_thread && @refresh_thread.alive? + @refresh_thread = Thread.new do + sleep(@refresh_interval) + refresh end - - raise ConnectionFailure, "Cannot connect to a replica set using seeds " + - "#{@seeds.map {|s| "#{s[0]}:#{s[1]}" }.join(', ')}" - end - - # Connect to each member of the replica set - # as reported by the given seed node, and cache - # those connections in the @nodes array. - def connect_to_members - seed = get_valid_seed_node - - seed.node_list.each do |host| - node = Mongo::Node.new(self, host) - if node.connect && node.set_config - @nodes << node - end - end - end - - # Initialize the connection pools to the primary and secondary nodes. - def initialize_pools - if @nodes.empty? - raise ConnectionFailure, "Failed to connect to any given member." - end - - @arbiters = @nodes.first.arbiters - - @nodes.each do |member| - if member.primary? - @primary = member.host_port - @primary_pool = Pool.new(self, member.host, member.port, - :size => @pool_size, - :timeout => @timeout, - :node => member) - elsif member.secondary? && !@secondaries.include?(member.host_port) - @secondaries << member.host_port - @secondary_pools << Pool.new(self, member.host, member.port, - :size => @pool_size, - :timeout => @timeout, - :node => member) - end - end - end - - # Pick a node from the set of possible secondaries. - # If more than one node is available, use the ping - # time to figure out which nodes to choose from. - def choose_node_for_reads - return if @secondary_pools.empty? - - if @secondary_pools.size == 1 - @read_pool = @secondary_pools.first - else - @secondary_pools.each do |pool| - case pool.ping_time - when 0..150 - @ping_ranges[0] << pool - when 150..1000 - @ping_ranges[1] << pool - else - @ping_ranges[2] << pool - end - end - - for list in @ping_ranges do - break if !list.empty? - end - - @read_pool = list[rand(list.length)] - end - end - - def update_seed_list - @seeds = @nodes.map { |n| n.host_port } end # Checkout a socket for reading (i.e., a secondary node). + # Note that @read_pool might point to the primary pool + # if no read pool has been defined. That's okay; we don't + # want to have to check for the existence of the @read_pool + # because that introduces concurrency issues. def checkout_reader connect unless connected? if @read_secondary && @read_pool - @read_pool.checkout - else - checkout_writer + begin + return @read_pool.checkout + rescue NoMethodError + warn "Read pool was not available." + end + end + + begin + return @primary_pool.checkout + rescue NoMethodError + raise ConnectionFailure, "Not connected to any nodes." end end @@ -320,12 +306,19 @@ module Mongo def checkout_writer connect unless connected? - @primary_pool.checkout + if @primary_pool + begin + return @primary_pool.checkout + rescue NoMethodError + end + end + + raise ConnectionFailure, "Failed to connect to primary node." end # Checkin a socket used for reading. def checkin_reader(socket) - if @read_secondary && @read_pool + if @read_secondary @read_pool.checkin(socket) else checkin_writer(socket) diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index 8745473..9c2aeb3 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -63,19 +63,23 @@ module Mongo @checked_out.clear end + def host_string + "#{@host}:#{@port}" + end + # Return the time it takes on average # to do a round-trip against this node. def ping_time - trials = [] - begin - PING_ATTEMPTS.times do - t1 = Time.now - self.connection['admin'].command({:ping => 1}, :socket => @node.socket) - trials << (Time.now - t1) * 1000 - end - rescue OperationFailure, SocketError, SystemCallError, IOError => ex - return nil - end + trials = [] + begin + PING_ATTEMPTS.times do + t1 = Time.now + self.connection['admin'].command({:ping => 1}, :socket => @node.socket) + trials << (Time.now - t1) * 1000 + end + rescue OperationFailure, SocketError, SystemCallError, IOError => ex + return nil + end trials.sort! trials.delete_at(trials.length-1) diff --git a/lib/mongo/util/pool_manager.rb b/lib/mongo/util/pool_manager.rb new file mode 100644 index 0000000..1d0a421 --- /dev/null +++ b/lib/mongo/util/pool_manager.rb @@ -0,0 +1,219 @@ +module Mongo + class PoolManager + + attr_reader :connection, :seeds, :arbiters, :primary, :secondaries, + :primary_pool, :read_pool, :secondary_pools, :hosts + + def initialize(connection, seeds) + @connection = connection + @seeds = seeds + end + + def connect + initialize_data + nodes = connect_to_members + initialize_pools(nodes) + update_seed_list(nodes) + end + + # Ensure that the view of the replica set is current by + # running the ismaster command and checking to see whether + # we've connected to all known nodes. If not, automatically + # connect to these unconnected nodes. This is handy when we've + # connected to a replica set with no primary or when a secondary + # node comes up after we've connected. + # + # If we're connected to nodes that are no longer part of the set, + # remove these from our set of secondary pools. + def update_required?(hosts) + node = Thread.current[:refresher_node] + if !node || !node.active? + begin + node = get_valid_seed_node + Thread.current[:refresher_node] = node + rescue ConnectionFailure + warn "Could not refresh config because no valid seed node was unavailable." + end + end + + node_list = node.node_list + + unconnected_nodes = node_list - hosts + removed_nodes = hosts - node_list + + if unconnected_nodes.empty? && removed_nodes.empty? + return false + else + {:unconnected => unconnected_nodes, :removed => removed_nodes} + end + end + + def update(manager, node_struct) + reference_manager_data(manager) + unconnected_nodes = node_struct[:unconnected] + removed_nodes = node_struct[:removed] + + if !removed_nodes.empty? + removed_nodes.each do |node| + if @primary_pool && @primary_pool.host_string == node + @primary = nil + @primary_pool.close + @primary_pool = nil + elsif rejected_pool = @secondary_pools.reject! {|pool| pool.host_string == node} + @secondaries.reject! do |secondary| + secondary.port == rejected_pool.port && secondary.host == rejected_pool.host + end + end + end + end + + if !unconnected_nodes.empty? + nodes = [] + unconnected_nodes.each do |host_port| + node = Mongo::Node.new(self.connection, host_port) + if node.connect && node.set_config + nodes << node + end + end + + if !nodes.empty? + initialize_pools(nodes) + end + end + end + + private + + # Note that @arbiters and @read_pool will be + # assigned automatically. + def reference_manager_data(manager) + @primary = manager.primary + @primary_pool = manager.primary_pool + @secondaries = manager.secondaries + @secondary_pools = manager.secondary_pools + @read_pool = manager.read_pool + @arbiters = manager.arbiters + @hosts = manager.hosts + end + + def initialize_data + @primary = nil + @primary_pool = nil + @read_pool = nil + @arbiters = [] + @secondaries = [] + @secondary_pools = [] + @hosts = [] + end + + def connected_nodes + nodes = [] + if @primary_pool + nodes << "#{@primary_pool.host}:#{@primary_pool.port}" + end + + @secondary_pools.each do |pool| + nodes << "#{pool.host}:#{pool.port}" + end + + nodes + end + + # Connect to each member of the replica set + # as reported by the given seed node, and return + # as a list of Mongo::Node objects. + def connect_to_members + nodes = [] + + seed = get_valid_seed_node + + seed.node_list.each do |host| + node = Mongo::Node.new(self.connection, host) + if node.connect && node.set_config + nodes << node + end + end + + if nodes.empty? + raise ConnectionFailure, "Failed to connect to any given member." + end + + nodes + end + + # Initialize the connection pools for the primary and secondary nodes. + def initialize_pools(nodes) + nodes.each do |member| + @hosts << member.host_string + + if member.primary? + @primary = member.host_port + @primary_pool = Pool.new(self.connection, member.host, member.port, + :size => self.connection.pool_size, + :timeout => self.connection.connect_timeout, + :node => member) + elsif member.secondary? && !@secondaries.include?(member.host_port) + @secondaries << member.host_port + @secondary_pools << Pool.new(self.connection, member.host, member.port, + :size => self.connection.pool_size, + :timeout => self.connection.connect_timeout, + :node => member) + end + end + + @arbiters = nodes.first.arbiters + choose_read_pool + end + + # Pick a node from the set of possible secondaries. + # If more than one node is available, use the ping + # time to figure out which nodes to choose from. + def choose_read_pool + if @secondary_pools.empty? + @read_pool = @primary_pool + elsif @secondary_pools.size == 1 + @read_pool = @secondary_pools[0] + else + ping_ranges = Array.new(3) { |i| Array.new } + @secondary_pools.each do |pool| + case pool.ping_time + when 0..150 + ping_ranges[0] << pool + when 150..1000 + ping_ranges[1] << pool + else + ping_ranges[2] << pool + end + end + + for list in ping_ranges do + break if !list.empty? + end + + @read_pool = list[rand(list.length)] + end + end + + # Iterate through the list of provided seed + # nodes until we've gotten a response from the + # replica set we're trying to connect to. + # + # If we don't get a response, raise an exception. + def get_valid_seed_node + @seeds.each do |seed| + node = Mongo::Node.new(self.connection, seed) + if node.connect && node.set_config + return node + end + end + + raise ConnectionFailure, "Cannot connect to a replica set using seeds " + + "#{@seeds.map {|s| "#{s[0]}:#{s[1]}" }.join(', ')}" + end + + def update_seed_list(nodes) + @seeds = nodes.map { |n| n.host_port } + end + + end +end diff --git a/test/replica_sets/refresh_test.rb b/test/replica_sets/refresh_test.rb new file mode 100644 index 0000000..1f9c790 --- /dev/null +++ b/test/replica_sets/refresh_test.rb @@ -0,0 +1,70 @@ +$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) +require './test/replica_sets/rs_test_helper' +require 'benchmark' + +# NOTE: This test expects a replica set of three nodes to be running on RS.host, +# on ports TEST_PORT, RS.ports[1], and TEST + 2. +class ReplicaSetRefreshTest < Test::Unit::TestCase + include Mongo + + def setup + #RS.restart_killed_nodes + end + + def teardown + RS.restart_killed_nodes + end + + def test_connect_and_manual_refresh_with_secondaries_down + RS.kill_all_secondaries + + rescue_connection_failure do + @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]], :auto_refresh => false) + end + + assert_equal [], @conn.secondaries + assert @conn.connected? + assert_equal @conn.read_pool, @conn.primary_pool + + # Refresh with no change to set + @conn.refresh + assert_equal [], @conn.secondaries + assert @conn.connected? + assert_equal @conn.read_pool, @conn.primary_pool + + RS.restart_killed_nodes + assert_equal [], @conn.secondaries + assert @conn.connected? + assert_equal @conn.read_pool, @conn.primary_pool + + # Refresh with everything up + @conn.refresh + assert @conn.read_pool + assert @conn.secondaries.length > 0 + end + + def test_automated_refresh_with_secondaries_down + RS.kill_all_secondaries + + rescue_connection_failure do + @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]], :refresh_interval => 2, :auto_refresh => true) + end + + assert_equal [], @conn.secondaries + assert @conn.connected? + assert_equal @conn.read_pool, @conn.primary_pool + + RS.restart_killed_nodes + + sleep(3) + + assert @conn.read_pool != @conn.primary_pool, "Read pool and primary pool are identical." + assert @conn.secondaries.length > 0, "No secondaries have been added." + end + + def test_automated_refresh_with_removed_node + + end +end diff --git a/test/replica_sets/rs_test_helper.rb b/test/replica_sets/rs_test_helper.rb index b517b83..0864cf0 100644 --- a/test/replica_sets/rs_test_helper.rb +++ b/test/replica_sets/rs_test_helper.rb @@ -17,7 +17,6 @@ class Test::Unit::TestCase yield rescue Mongo::ConnectionFailure => ex puts "Rescue attempt #{retries}: from #{ex}" - puts ex.backtrace retries += 1 raise ex if retries > max_retries sleep(1) diff --git a/test/tools/repl_set_manager.rb b/test/tools/repl_set_manager.rb index 1367bcd..3d72a64 100644 --- a/test/tools/repl_set_manager.rb +++ b/test/tools/repl_set_manager.rb @@ -154,6 +154,15 @@ class ReplSetManager return node end + def kill_all_secondaries + nodes = get_all_nodes_with_state(2) + if nodes + nodes.each do |n| + kill(n) + end + end + end + def restart_killed_nodes nodes = @mongods.keys.select do |key| @mongods[key]['up'] == false @@ -228,13 +237,25 @@ class ReplSetManager end end + def get_all_nodes_with_state(state) + status = ensure_up + nodes = status['members'].select {|m| m['state'] == state} + nodes = nodes.map do |node| + host_port = node['name'].split(':') + port = host_port[1] ? host_port[1].to_i : 27017 + @mongods.keys.detect {|key| @mongods[key]['port'] == port} + end + + nodes == [] ? false : nodes + end + def get_node_with_state(state) status = ensure_up node = status['members'].detect {|m| m['state'] == state} if node host_port = node['name'].split(':') port = host_port[1] ? host_port[1].to_i : 27017 - key = @mongods.keys.detect {|key| @mongods[key]['port'] == port} + key = @mongods.keys.detect {|n| @mongods[n]['port'] == port} return key else return false