diff --git a/lib/mongo/node.rb b/lib/mongo/node.rb index aa9a302..b13f5b5 100644 --- a/lib/mongo/node.rb +++ b/lib/mongo/node.rb @@ -86,7 +86,7 @@ module Mongo check_set_membership(config) check_set_name(config) - rescue ReplicaSetConnectionError, OperationFailure, SocketError, SystemCallError, IOError => ex + rescue ConnectionFailure, OperationFailure, SocketError, SystemCallError, IOError => ex self.connection.log(:warn, "Attempted connection to node #{host_string} raised " + "#{ex.class}: #{ex.message}") return nil @@ -158,7 +158,7 @@ module Mongo if !config['hosts'] message = "Will not connect to #{host_string} because it's not a member " + "of a replica set." - raise ReplicaSetConnectionError, message + raise ConnectionFailure, message end end diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index 32cc25c..d60b882 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -16,12 +16,14 @@ # limitations under the License. # ++ +require 'sync' + module Mongo # Instantiates and manages connections to a MongoDB replica set. class ReplSetConnection < Connection attr_reader :nodes, :secondaries, :arbiters, :secondary_pools, - :replica_set_name, :read_pool, :seeds, :tags_to_pools + :replica_set_name, :read_pool, :seeds, :tags_to_pools, :refresh_interval # Create a connection to a MongoDB replica set. # @@ -74,6 +76,8 @@ module Mongo # @raise [ReplicaSetConnectionError] This is raised if a replica set name is specified and the # driver fails to connect to a replica set with that name. def initialize(*args) + extend Sync_m + if args.last.is_a?(Hash) opts = args.pop else @@ -87,6 +91,7 @@ module Mongo # The list of seed nodes @seeds = args + # TODO: get rid of this @nodes = @seeds.dup # The members of the replica set, stored as instances of Mongo::Node. @@ -121,8 +126,6 @@ module Mongo @read = opts.fetch(:read, :primary) end - # Lock around changes to the global config - @connection_lock = Mutex.new @connected = false # Store the refresher thread @@ -146,7 +149,7 @@ module Mongo # Initiate a connection to the replica set. def connect - @connection_lock.synchronize do + sync_synchronize(:EX) do return if @connected manager = PoolManager.new(self, @seeds) manager.connect @@ -163,11 +166,12 @@ module Mongo end # Note: this method must be called from within - # a locked @connection_lock + # an exclusive lock. def update_config(manager) @arbiters = manager.arbiters.nil? ? [] : manager.arbiters.dup @primary = manager.primary.nil? ? nil : manager.primary.dup @secondaries = manager.secondaries.dup + @hosts = manager.hosts.dup @primary_pool = manager.primary_pool @read_pool = manager.read_pool @@ -175,7 +179,6 @@ module Mongo @tags_to_pools = manager.tags_to_pools @seeds = manager.seeds @manager = manager - @hosts = manager.hosts @nodes = manager.nodes @max_bson_size = manager.max_bson_size end @@ -193,16 +196,18 @@ module Mongo end background_manager = Thread.current[:background] - if update_struct = background_manager.update_required?(@hosts) - @connection_lock.synchronize do - background_manager.update(@manager, update_struct) + if background_manager.update_required?(@hosts) + sync_synchronize(:EX) do + background_manager.connect update_config(background_manager) end end end def connected? - @connected && !@connection_lock.locked? + sync_synchronize(:SH) do + @connected + end end # @deprecated @@ -234,7 +239,9 @@ module Mongo # # @return [Boolean] def read_primary? - @read_pool == @primary_pool + sync_synchronize(:SH) do + @read_pool == @primary_pool + end end alias :primary? :read_primary? @@ -243,10 +250,12 @@ module Mongo end # Close the connection to the database. + # TODO: we should get an exclusive lock here. def close + @connected = false + super - @connected = false if @refresh_thread @refresh_thread.kill @refresh_thread = nil @@ -314,8 +323,10 @@ module Mongo return unless @auto_refresh return if @refresh_thread && @refresh_thread.alive? @refresh_thread = Thread.new do - sleep(@refresh_interval) - refresh + while true do + sleep(@refresh_interval) + refresh + end end end @@ -325,21 +336,25 @@ module Mongo def checkout_reader connect unless connected? - socket = @read_pool.checkout - @sockets_to_pools[socket] = @read_pool - return socket + sync_synchronize(:SH) do + socket = @read_pool.checkout + @sockets_to_pools[socket] = @read_pool + socket + end end # Checkout a socket connected to a node with one of # the provided tags. If no such node exists, raise # an exception. def checkout_tagged(tags) - tags.each do |k, v| - pools = @tags_to_pools[{k => v}] - if !pools.empty? - socket = pools.first.checkout - @sockets_to_pools[socket] = pools.first - return socket + sync_synchronize(:SH) do + tags.each do |k, v| + pools = @tags_to_pools[{k => v}] + if !pools.empty? + socket = pools.first.checkout + @sockets_to_pools[socket] = pools.first + socket + end end end @@ -351,16 +366,13 @@ module Mongo def checkout_writer connect unless connected? - if @primary_pool - begin + sync_synchronize(:SH) do + if @primary_pool socket = @primary_pool.checkout @sockets_to_pools[socket] = @primary_pool - return socket - rescue NoMethodError + socket end end - - raise ConnectionFailure, "Failed to connect to primary node." end # Checkin a socket used for reading. diff --git a/lib/mongo/util/pool_manager.rb b/lib/mongo/util/pool_manager.rb index df5ae73..e12f3b2 100644 --- a/lib/mongo/util/pool_manager.rb +++ b/lib/mongo/util/pool_manager.rb @@ -29,7 +29,7 @@ module Mongo # If we're connected to nodes that are no longer part of the set, # remove these from our set of secondary pools. def update_required?(hosts) - if !@refresh_node || !@refresh_node.active? + if !@refresh_node || !@refresh_node.set_config begin @refresh_node = get_valid_seed_node rescue ConnectionFailure @@ -37,22 +37,13 @@ module Mongo return end end - node = @refresh_node - node_list = node.node_list - - unconnected_nodes = node_list - hosts - removed_nodes = hosts - node_list - - if unconnected_nodes.empty? && removed_nodes.empty? - return false - else - {:unconnected => unconnected_nodes, :removed => removed_nodes} - end + hosts != @refresh_node.node_list end def update(manager, node_struct) reference_manager_data(manager) + unconnected_nodes = node_struct[:unconnected] removed_nodes = node_struct[:removed] @@ -104,8 +95,8 @@ module Mongo @arbiters = [] @secondaries = [] @secondary_pools = [] - @hosts = [] - @members = [] + @hosts = Set.new + @members = Set.new @tags_to_pools = {} end diff --git a/test/replica_sets/connect_test.rb b/test/replica_sets/connect_test.rb index 3ce994e..95bd8b6 100644 --- a/test/replica_sets/connect_test.rb +++ b/test/replica_sets/connect_test.rb @@ -20,7 +20,7 @@ class ConnectTest < Test::Unit::TestCase def test_connect_bad_name assert_raise_error(ReplicaSetConnectionError, "-wrong") do @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], - [RS.host, RS.ports[2]], :rs_name => RS.name + "-wrong") + [RS.host, RS.ports[2]], :name => RS.name + "-wrong") end end diff --git a/test/replica_sets/reconfigure_test.rb b/test/replica_sets/reconfigure_test.rb deleted file mode 100644 index 5b0a2ac..0000000 --- a/test/replica_sets/reconfigure_test.rb +++ /dev/null @@ -1,34 +0,0 @@ -$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) -require './test/replica_sets/rs_test_helper' - -class ReplicaSetReconfigureTest < Test::Unit::TestCase - include Mongo - - def setup - @conn = ReplSetConnection.new([RS.host, RS.ports[0]]) - @db = @conn.db(MONGO_TEST_DB) - @db.drop_collection("test-sets") - @coll = @db.collection("test-sets") - end - - def teardown - RS.restart_killed_nodes - @conn.close if @conn - end - - def test_query - assert @coll.save({:a => 1}, :safe => {:w => 3}) - RS.add_node - assert_raise_error(Mongo::ConnectionFailure, "") do - @coll.save({:a => 1}, :safe => {:w => 3}) - end - assert @coll.save({:a => 1}, :safe => {:w => 3}) - end - - def benchmark_queries - t1 = Time.now - 10000.times { @coll.find_one } - Time.now - t1 - end - -end diff --git a/test/replica_sets/refresh_test.rb b/test/replica_sets/refresh_test.rb index fcf65c0..f35d04d 100644 --- a/test/replica_sets/refresh_test.rb +++ b/test/replica_sets/refresh_test.rb @@ -2,7 +2,6 @@ $:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) require './test/replica_sets/rs_test_helper' require 'benchmark' -# NOTE: This test expects a replica set of three nodes to be running on RS.host, # on ports TEST_PORT, RS.ports[1], and TEST + 2. class ReplicaSetRefreshTest < Test::Unit::TestCase include Mongo @@ -12,6 +11,27 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase @conn.close if @conn end + def test_connect_speed + Benchmark.bm do |x| + x.report("Connect") do + 10.times do + ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]], :auto_refresh => false) + end + end + + @con = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]], :auto_refresh => false) + + x.report("manager") do + man = Mongo::PoolManager.new(@con, @con.seeds) + 10.times do + man.connect + end + end + end + end + def test_connect_and_manual_refresh_with_secondaries_down RS.kill_all_secondaries @@ -65,14 +85,36 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], [RS.host, RS.ports[2]], :refresh_interval => 2, :auto_refresh => true) - assert_equal 2, @conn.secondaries.length assert_equal 2, @conn.secondary_pools.length + assert_equal 2, @conn.secondaries.length - RS.remove_secondary_node + n = RS.remove_secondary_node sleep(4) assert_equal 1, @conn.secondaries.length assert_equal 1, @conn.secondary_pools.length + + RS.add_node(n) end + def test_adding_and_removing_nodes + puts "ADDING AND REMOVING" + @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]], :refresh_interval => 2, :auto_refresh => true) + + RS.add_node + sleep(5) + + @conn2 = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]], :refresh_interval => 2, :auto_refresh => true) + + assert @conn2.secondaries == @conn.secondaries + assert_equal 3, @conn.secondary_pools.length + assert_equal 3, @conn.secondaries.length + + RS.remove_secondary_node + sleep(4) + assert_equal 2, @conn.secondary_pools.length + assert_equal 2, @conn.secondaries.length + end end diff --git a/test/tools/repl_set_manager.rb b/test/tools/repl_set_manager.rb index 48128ab..034ba30 100644 --- a/test/tools/repl_set_manager.rb +++ b/test/tools/repl_set_manager.rb @@ -132,25 +132,25 @@ class ReplSetManager config = con['local']['system.replset'].find_one secondary = get_node_with_state(2) host_port = "#{@host}:#{@mongods[secondary]['port']}" + kill(secondary) + @mongods.delete(secondary) @config['members'].reject! {|m| m['host'] == host_port} - @config['version'] = config['version'] + 1 - primary = get_node_with_state(1) - con = get_connection(primary) - begin con['admin'].command({'replSetReconfig' => @config}) rescue Mongo::ConnectionFailure end con.close + + return secondary end - def add_node + def add_node(n=nil) primary = get_node_with_state(1) con = get_connection(primary) - init_node(@mongods.length) + init_node(n || @mongods.length) config = con['local']['system.replset'].find_one @config['version'] = config['version'] + 1