From 236d4a821fb49c3804d1da866fe6676093efb1ee Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Tue, 14 Dec 2010 13:14:45 -0500 Subject: [PATCH] More connection refactoring. Updates to repl_set_manager. --- Rakefile | 38 ++++--- lib/mongo/connection.rb | 17 +-- lib/mongo/repl_set_connection.rb | 14 ++- test/replica_sets/connect_test.rb | 74 ++++++++----- test/replica_sets/count_test.rb | 6 +- test/replica_sets/rs_test_helper.rb | 29 +++++ test/tools/repl_set_manager.rb | 161 +++++++++++++++++++++------- 7 files changed, 241 insertions(+), 98 deletions(-) create mode 100644 test/replica_sets/rs_test_helper.rb diff --git a/Rakefile b/Rakefile index c95e73d..7ec359d 100644 --- a/Rakefile +++ b/Rakefile @@ -38,9 +38,8 @@ end desc "Test the MongoDB Ruby driver." task :test do - puts "\nThis option has changed." - puts "\nTo test the driver with the c-extensions:\nrake test:c\n" - puts "To test the pure ruby driver: \nrake test:ruby" + puts "\nTo test the driver with the C-extensions:\nrake test:c\n\n" + puts "To test the pure ruby driver: \nrake test:ruby\n\n" end namespace :test do @@ -48,22 +47,35 @@ namespace :test do desc "Test the driver with the C extension enabled." task :c do ENV['C_EXT'] = 'TRUE' - Rake::Task['test:unit'].invoke - Rake::Task['test:functional'].invoke - Rake::Task['test:bson'].invoke - Rake::Task['test:pooled_threading'].invoke - Rake::Task['test:drop_databases'].invoke + if ENV['TEST'] + Rake::Task['test:functional'].invoke + else + Rake::Task['test:unit'].invoke + Rake::Task['test:functional'].invoke + Rake::Task['test:bson'].invoke + Rake::Task['test:pooled_threading'].invoke + Rake::Task['test:drop_databases'].invoke + end ENV['C_EXT'] = nil end desc "Test the driver using pure ruby (no C extension)" task :ruby do ENV['C_EXT'] = nil - Rake::Task['test:unit'].invoke - Rake::Task['test:functional'].invoke - Rake::Task['test:bson'].invoke - Rake::Task['test:pooled_threading'].invoke - Rake::Task['test:drop_databases'].invoke + if ENV['TEST'] + Rake::Task['test:functional'].invoke + else + Rake::Task['test:unit'].invoke + Rake::Task['test:functional'].invoke + Rake::Task['test:bson'].invoke + Rake::Task['test:pooled_threading'].invoke + Rake::Task['test:drop_databases'].invoke + end + end + + Rake::TestTask.new(:rs) do |t| + t.test_files = FileList['test/replica_sets/*_test.rb'] + t.verbose = true end Rake::TestTask.new(:unit) do |t| diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 26c755e..7701185 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -35,9 +35,6 @@ module Mongo STANDARD_HEADER_SIZE = 16 RESPONSE_HEADER_SIZE = 20 - MONGODB_URI_MATCHER = /(([-_.\w\d]+):([-_\w\d]+)@)?([-.\w\d]+)(:([\w\d]+))?(\/([-\d\w]+))?/ - MONGODB_URI_SPEC = "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/database]" - attr_reader :logger, :size, :nodes, :auths, :primary, :secondaries, :arbiters, :safe, :primary_pool, :read_pool, :secondary_pools, :host_to_try @@ -103,7 +100,6 @@ module Mongo setup(options) end - # DEPRECATED # # Initialize a connection to a MongoDB replica set using an array of seed nodes. @@ -464,7 +460,6 @@ module Mongo @primary_pool = nil end - # Checkout a socket for reading (i.e., a secondary node). def checkout_reader connect unless connected? @@ -597,19 +592,9 @@ module Mongo socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) config = self['admin'].command({:ismaster => 1}, :sock => socket) - rescue OperationFailure, SocketError, SystemCallError, IOError => ex - close# unless connected? + close ensure - # @nodes_tried << node - # if config - # update_node_list(config['hosts']) if config['hosts'] - - # if config['msg'] && @logger - # @logger.warn("MONGODB #{config['msg']}") - # end - # end - socket.close if socket end diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index 5b8ca35..d443c45 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -77,7 +77,13 @@ module Mongo pick_secondary_for_read if @read_secondary - raise ConnectionFailure, "failed to connect to any given host:port" unless connected? + if !connected? + if @secondary_pools.empty? + raise ConnectionFailure, "Failed to connect any given host:port" + else + raise ConnectionFailure, "Failed to connect to primary node." + end + end end def connecting? @@ -121,7 +127,11 @@ module Mongo ensure @nodes_tried << node if config - update_node_list(config['hosts']) if config['hosts'] + nodes = [] + nodes += config['hosts'] if config['hosts'] + nodes += config['arbiters'] if config['arbiters'] + nodes += config['passives'] if config['passives'] + update_node_list(nodes) if config['msg'] && @logger @logger.warn("MONGODB #{config['msg']}") diff --git a/test/replica_sets/connect_test.rb b/test/replica_sets/connect_test.rb index 8b7cf3c..66a87fc 100644 --- a/test/replica_sets/connect_test.rb +++ b/test/replica_sets/connect_test.rb @@ -1,50 +1,74 @@ $:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) -require 'mongo' -require 'test/unit' -require './test/test_helper' +require './test/replica_sets/rs_test_helper' -# NOTE: This test expects a replica set of three nodes to be running on TEST_HOST, -# on ports TEST_PORT, TEST_PORT + 1, and TEST + 2. +# NOTE: This test expects a replica set of three nodes to be running on RS.host, +# on ports TEST_PORT, RS.ports[1], and TEST + 2. class ConnectTest < Test::Unit::TestCase include Mongo + def setup + RS.restart_killed_nodes + end + def test_connect_bad_name - assert_raise_error(ReplicaSetReplSetConnectionError, "expected 'wrong-repl-set-name'") do - ReplSetConnection.multi([TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], - [TEST_HOST, TEST_PORT + 2], :rs_name => "wrong-repl-set-name") + assert_raise_error(ReplicaSetConnectionError, "-wrong") do + ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]], :rs_name => RS.name + "-wrong") end end def test_connect - @conn = ReplSetConnection.multi([TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], - [TEST_HOST, TEST_PORT + 2], :name => "foo") + @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]], :name => RS.name) + assert @conn.connected? + + assert_equal RS.primary, @conn.primary + assert_equal RS.secondaries, @conn.secondaries + assert_equal RS.arbiters, @conn.arbiters + end + + def test_connect_with_primary_node_killed + node = RS.kill_primary + + # Becuase we're killing the primary and trying to connect right away, + # this is going to fail right away. + assert_raise_error(ConnectionFailure, "Failed to connect to primary node") do + @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]]) + end + + # This allows the secondary to come up as a primary + rescue_connection_failure do + @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]]) + end assert @conn.connected? end - def test_connect_with_first_node_down - puts "Please kill the node at #{TEST_PORT}." - gets + def test_connect_with_secondary_node_killed + node = RS.kill_secondary - @conn = ReplSetConnection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], - [TEST_HOST, TEST_PORT + 2]]) + @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]]) assert @conn.connected? end - def test_connect_with_second_node_down - puts "Please kill the node at #{TEST_PORT + 1}." - gets + def test_connect_with_third_node_killed + RS.kill(RS.get_node_from_port(RS.ports[2])) - @conn = ReplSetConnection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], - [TEST_HOST, TEST_PORT + 2]]) + @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]]) assert @conn.connected? end - def test_connect_with_third_node_down - puts "Please kill the node at #{TEST_PORT + 2}." - gets + def test_connect_with_primary_stepped_down + RS.step_down_primary - @conn = ReplSetConnection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], - [TEST_HOST, TEST_PORT + 2]]) + rescue_connection_failure do + @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], + [RS.host, RS.ports[2]]) + end assert @conn.connected? end + end diff --git a/test/replica_sets/count_test.rb b/test/replica_sets/count_test.rb index 7ad2e41..1476a2f 100644 --- a/test/replica_sets/count_test.rb +++ b/test/replica_sets/count_test.rb @@ -1,6 +1,4 @@ $:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) -require 'mongo' -require 'test/unit' require './test/replica_sets/rs_test_helper' # NOTE: This test expects a replica set of three nodes to be running @@ -20,10 +18,10 @@ class ReplicaSetCountTest < Test::Unit::TestCase end def test_correct_count_after_insertion_reconnect - @coll.insert({:a => 20})#, :safe => {:w => 3, :wtimeout => 10000}) + @coll.insert({:a => 20}, :safe => {:w => 2, :wtimeout => 10000}) assert_equal 1, @coll.count - # Disconnecting the current master node + # Kill the current master node @node = RS.kill_primary rescue_connection_failure do diff --git a/test/replica_sets/rs_test_helper.rb b/test/replica_sets/rs_test_helper.rb new file mode 100644 index 0000000..8fd02db --- /dev/null +++ b/test/replica_sets/rs_test_helper.rb @@ -0,0 +1,29 @@ +$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) +require './test/test_helper' +require './test/tools/repl_set_manager' + +unless defined? RS + RS = ReplSetManager.new + RS.start_set +end + +class Test::Unit::TestCase + + # Generic code for rescuing connection failures and retrying operations. + # This could be combined with some timeout functionality. + def rescue_connection_failure(max_retries=60) + success = false + tries = 0 + while !success && tries < max_retries + begin + yield + success = true + rescue Mongo::ConnectionFailure + puts "Rescuing attempt #{tries}" + tries += 1 + sleep(1) + end + end + end + +end diff --git a/test/tools/repl_set_manager.rb b/test/tools/repl_set_manager.rb index ed7599f..cdb7a59 100644 --- a/test/tools/repl_set_manager.rb +++ b/test/tools/repl_set_manager.rb @@ -8,18 +8,27 @@ end class ReplSetManager - attr_accessor :host, :start_port, :ports + attr_accessor :host, :start_port, :ports, :name, :mongods def initialize(opts={}) @start_port = opts[:start_port] || 30000 @ports = [] @name = opts[:name] || 'replica-set-foo' - @count = opts[:count] || 3 @host = opts[:host] || 'localhost' @retries = opts[:retries] || 60 @config = {"_id" => @name, "members" => []} @path = File.join(File.expand_path(File.dirname(__FILE__)), "data") + @passive_count = opts[:secondary_count] || 1 + @arbiter_count = opts[:arbiter_count] || 1 + @secondary_count = opts[:passive_count] || 1 + @primary_count = 1 + + @count = @primary_count + @passive_count + @arbiter_count + @secondary_count + if @count > 7 + raise StandardError, "Cannot create a replica set with #{node_count} nodes. 7 is the max." + end + @mongods = {} end @@ -28,34 +37,57 @@ class ReplSetManager system("killall mongod") - @count.times do |n| - @mongods[n] ||= {} - port = @start_port + n - @ports << port - @mongods[n]['port'] = port - @mongods[n]['db_path'] = get_path("rs-#{port}") - @mongods[n]['log_path'] = get_path("log-#{port}") - system("rm -rf #{@mongods[n]['db_path']}") - system("mkdir -p #{@mongods[n]['db_path']}") - - @mongods[n]['start'] = "mongod --replSet #{@name} --logpath '#{@mongods[n]['log_path']}' " + - " --dbpath #{@mongods[n]['db_path']} --port #{@mongods[n]['port']} --fork" - - start(n) - - member = {'_id' => n, 'host' => "#{@host}:#{@mongods[n]['port']}"} - if n == @count-1 - @mongods[n]['arbiter'] = true - member['arbiterOnly'] = true - end - - @config['members'] << member + n = 0 + (@primary_count + @secondary_count).times do |n| + init_node(n) + n += 1 end - init + @passive_count.times do + init_node(n) do |attrs| + attrs['priority'] = 0 + end + n += 1 + end + + @arbiter_count.times do + init_node(n) do |attrs| + attrs['arbiterOnly'] = true + end + n += 1 + end + + initiate ensure_up end + def init_node(n) + @mongods[n] ||= {} + port = @start_port + n + @ports << port + @mongods[n]['port'] = port + @mongods[n]['db_path'] = get_path("rs-#{port}") + @mongods[n]['log_path'] = get_path("log-#{port}") + system("rm -rf #{@mongods[n]['db_path']}") + system("mkdir -p #{@mongods[n]['db_path']}") + + @mongods[n]['start'] = "mongod --replSet #{@name} --logpath '#{@mongods[n]['log_path']}' " + + " --dbpath #{@mongods[n]['db_path']} --port #{@mongods[n]['port']} --fork" + + start(n) + + member = {'_id' => n, 'host' => "#{@host}:#{@mongods[n]['port']}"} + + if block_given? + custom_attrs = {} + yield custom_attrs + member.merge!(custom_attrs) + @mongods[n].merge!(custom_attrs) + end + + @config['members'] << member + end + def kill(node) system("kill -2 #{@mongods[node]['pid']}") @mongods[node]['up'] = false @@ -68,12 +100,40 @@ class ReplSetManager return node end + # Note that we have to rescue a connection failure + # when we run the StepDown command because that + # command will close the connection. + def step_down_primary + primary = get_node_with_state(1) + con = get_connection(primary) + begin + con['admin'].command({'replSetStepDown' => 90}) + rescue Mongo::ConnectionFailure + end + end + def kill_secondary node = get_node_with_state(2) kill(node) return node end + def restart_killed_nodes + nodes = @mongods.keys.select do |key| + @mongods[key]['up'] == false + end + + nodes.each do |node| + start(node) + end + + ensure_up + end + + def get_node_from_port(port) + @mongods.keys.detect { |key| @mongods[key]['port'] == port } + end + def start(node) system(@mongods[node]['start']) @mongods[node]['up'] = true @@ -84,12 +144,13 @@ class ReplSetManager def ensure_up print "Ensuring members are up..." - @con = get_connection attempt(Mongo::OperationFailure) do - status = @con['admin'].command({'replSetGetStatus' => 1}) + con = get_connection + status = con['admin'].command({'replSetGetStatus' => 1}) print "." - if status['members'].all? { |m| [1, 2, 7].include?(m['state']) } + if status['members'].all? { |m| [1, 2, 7].include?(m['state']) } && + status['members'].any? { |m| m['state'] == 1 } puts "All members up!" return status else @@ -98,13 +159,26 @@ class ReplSetManager end end + def primary + nodes = get_all_host_pairs_with_state(1) + nodes.empty? ? nil : nodes[0] + end + + def secondaries + get_all_host_pairs_with_state(2) + end + + def arbiters + get_all_host_pairs_with_state(7) + end + private - def init - get_connection + def initiate + con = get_connection attempt(Mongo::OperationFailure) do - @con['admin'].command({'replSetInitiate' => @config}) + con['admin'].command({'replSetInitiate' => @config}) end end @@ -121,13 +195,25 @@ class ReplSetManager end end - def get_connection - attempt(Mongo::ConnectionFailure) do - node = @mongods.keys.detect {|key| !@mongods[key]['arbiter'] && @mongods[key]['up'] } - @con = Mongo::Connection.new(@host, @mongods[node]['port'], :slave_ok => true) + def get_all_host_pairs_with_state(state) + status = ensure_up + nodes = status['members'].select {|m| m['state'] == state} + nodes.map do |node| + host_port = node['name'].split(':') + port = host_port[1] ? host_port[1].to_i : 27017 + [host, port] + end + end + + def get_connection(node=nil) + con = attempt(Mongo::ConnectionFailure) do + if !node + node = @mongods.keys.detect {|key| !@mongods[key]['arbiterOnly'] && @mongods[key]['up'] } + end + con = Mongo::Connection.new(@host, @mongods[node]['port'], :slave_ok => true) end - return @con + return con end def get_path(name) @@ -140,8 +226,7 @@ class ReplSetManager while count < @retries do begin - yield - return + return yield rescue exception sleep(1) count += 1