More connection refactoring. Updates to repl_set_manager.

This commit is contained in:
Kyle Banker 2010-12-14 13:14:45 -05:00
parent 0a47b76fca
commit 236d4a821f
7 changed files with 241 additions and 98 deletions

View File

@ -38,9 +38,8 @@ end
desc "Test the MongoDB Ruby driver." desc "Test the MongoDB Ruby driver."
task :test do task :test do
puts "\nThis option has changed." puts "\nTo test the driver with the C-extensions:\nrake test:c\n\n"
puts "\nTo test the driver with the c-extensions:\nrake test:c\n" puts "To test the pure ruby driver: \nrake test:ruby\n\n"
puts "To test the pure ruby driver: \nrake test:ruby"
end end
namespace :test do namespace :test do
@ -48,22 +47,35 @@ namespace :test do
desc "Test the driver with the C extension enabled." desc "Test the driver with the C extension enabled."
task :c do task :c do
ENV['C_EXT'] = 'TRUE' ENV['C_EXT'] = 'TRUE'
Rake::Task['test:unit'].invoke if ENV['TEST']
Rake::Task['test:functional'].invoke Rake::Task['test:functional'].invoke
Rake::Task['test:bson'].invoke else
Rake::Task['test:pooled_threading'].invoke Rake::Task['test:unit'].invoke
Rake::Task['test:drop_databases'].invoke Rake::Task['test:functional'].invoke
Rake::Task['test:bson'].invoke
Rake::Task['test:pooled_threading'].invoke
Rake::Task['test:drop_databases'].invoke
end
ENV['C_EXT'] = nil ENV['C_EXT'] = nil
end end
desc "Test the driver using pure ruby (no C extension)" desc "Test the driver using pure ruby (no C extension)"
task :ruby do task :ruby do
ENV['C_EXT'] = nil ENV['C_EXT'] = nil
Rake::Task['test:unit'].invoke if ENV['TEST']
Rake::Task['test:functional'].invoke Rake::Task['test:functional'].invoke
Rake::Task['test:bson'].invoke else
Rake::Task['test:pooled_threading'].invoke Rake::Task['test:unit'].invoke
Rake::Task['test:drop_databases'].invoke Rake::Task['test:functional'].invoke
Rake::Task['test:bson'].invoke
Rake::Task['test:pooled_threading'].invoke
Rake::Task['test:drop_databases'].invoke
end
end
Rake::TestTask.new(:rs) do |t|
t.test_files = FileList['test/replica_sets/*_test.rb']
t.verbose = true
end end
Rake::TestTask.new(:unit) do |t| Rake::TestTask.new(:unit) do |t|

View File

@ -35,9 +35,6 @@ module Mongo
STANDARD_HEADER_SIZE = 16 STANDARD_HEADER_SIZE = 16
RESPONSE_HEADER_SIZE = 20 RESPONSE_HEADER_SIZE = 20
MONGODB_URI_MATCHER = /(([-_.\w\d]+):([-_\w\d]+)@)?([-.\w\d]+)(:([\w\d]+))?(\/([-\d\w]+))?/
MONGODB_URI_SPEC = "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/database]"
attr_reader :logger, :size, :nodes, :auths, :primary, :secondaries, :arbiters, attr_reader :logger, :size, :nodes, :auths, :primary, :secondaries, :arbiters,
:safe, :primary_pool, :read_pool, :secondary_pools, :host_to_try :safe, :primary_pool, :read_pool, :secondary_pools, :host_to_try
@ -103,7 +100,6 @@ module Mongo
setup(options) setup(options)
end end
# DEPRECATED # DEPRECATED
# #
# Initialize a connection to a MongoDB replica set using an array of seed nodes. # Initialize a connection to a MongoDB replica set using an array of seed nodes.
@ -464,7 +460,6 @@ module Mongo
@primary_pool = nil @primary_pool = nil
end end
# Checkout a socket for reading (i.e., a secondary node). # Checkout a socket for reading (i.e., a secondary node).
def checkout_reader def checkout_reader
connect unless connected? connect unless connected?
@ -597,19 +592,9 @@ module Mongo
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
config = self['admin'].command({:ismaster => 1}, :sock => socket) config = self['admin'].command({:ismaster => 1}, :sock => socket)
rescue OperationFailure, SocketError, SystemCallError, IOError => ex rescue OperationFailure, SocketError, SystemCallError, IOError => ex
close# unless connected? close
ensure ensure
# @nodes_tried << node
# if config
# update_node_list(config['hosts']) if config['hosts']
# if config['msg'] && @logger
# @logger.warn("MONGODB #{config['msg']}")
# end
# end
socket.close if socket socket.close if socket
end end

View File

@ -77,7 +77,13 @@ module Mongo
pick_secondary_for_read if @read_secondary pick_secondary_for_read if @read_secondary
raise ConnectionFailure, "failed to connect to any given host:port" unless connected? if !connected?
if @secondary_pools.empty?
raise ConnectionFailure, "Failed to connect any given host:port"
else
raise ConnectionFailure, "Failed to connect to primary node."
end
end
end end
def connecting? def connecting?
@ -121,7 +127,11 @@ module Mongo
ensure ensure
@nodes_tried << node @nodes_tried << node
if config if config
update_node_list(config['hosts']) if config['hosts'] nodes = []
nodes += config['hosts'] if config['hosts']
nodes += config['arbiters'] if config['arbiters']
nodes += config['passives'] if config['passives']
update_node_list(nodes)
if config['msg'] && @logger if config['msg'] && @logger
@logger.warn("MONGODB #{config['msg']}") @logger.warn("MONGODB #{config['msg']}")

View File

@ -1,50 +1,74 @@
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) $:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require 'mongo' require './test/replica_sets/rs_test_helper'
require 'test/unit'
require './test/test_helper'
# NOTE: This test expects a replica set of three nodes to be running on TEST_HOST, # NOTE: This test expects a replica set of three nodes to be running on RS.host,
# on ports TEST_PORT, TEST_PORT + 1, and TEST + 2. # on ports TEST_PORT, RS.ports[1], and TEST + 2.
class ConnectTest < Test::Unit::TestCase class ConnectTest < Test::Unit::TestCase
include Mongo include Mongo
def setup
RS.restart_killed_nodes
end
def test_connect_bad_name def test_connect_bad_name
assert_raise_error(ReplicaSetReplSetConnectionError, "expected 'wrong-repl-set-name'") do assert_raise_error(ReplicaSetConnectionError, "-wrong") do
ReplSetConnection.multi([TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[TEST_HOST, TEST_PORT + 2], :rs_name => "wrong-repl-set-name") [RS.host, RS.ports[2]], :rs_name => RS.name + "-wrong")
end end
end end
def test_connect def test_connect
@conn = ReplSetConnection.multi([TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[TEST_HOST, TEST_PORT + 2], :name => "foo") [RS.host, RS.ports[2]], :name => RS.name)
assert @conn.connected?
assert_equal RS.primary, @conn.primary
assert_equal RS.secondaries, @conn.secondaries
assert_equal RS.arbiters, @conn.arbiters
end
def test_connect_with_primary_node_killed
node = RS.kill_primary
# Becuase we're killing the primary and trying to connect right away,
# this is going to fail right away.
assert_raise_error(ConnectionFailure, "Failed to connect to primary node") do
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[RS.host, RS.ports[2]])
end
# This allows the secondary to come up as a primary
rescue_connection_failure do
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[RS.host, RS.ports[2]])
end
assert @conn.connected? assert @conn.connected?
end end
def test_connect_with_first_node_down def test_connect_with_secondary_node_killed
puts "Please kill the node at #{TEST_PORT}." node = RS.kill_secondary
gets
@conn = ReplSetConnection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[TEST_HOST, TEST_PORT + 2]]) [RS.host, RS.ports[2]])
assert @conn.connected? assert @conn.connected?
end end
def test_connect_with_second_node_down def test_connect_with_third_node_killed
puts "Please kill the node at #{TEST_PORT + 1}." RS.kill(RS.get_node_from_port(RS.ports[2]))
gets
@conn = ReplSetConnection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[TEST_HOST, TEST_PORT + 2]]) [RS.host, RS.ports[2]])
assert @conn.connected? assert @conn.connected?
end end
def test_connect_with_third_node_down def test_connect_with_primary_stepped_down
puts "Please kill the node at #{TEST_PORT + 2}." RS.step_down_primary
gets
@conn = ReplSetConnection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], rescue_connection_failure do
[TEST_HOST, TEST_PORT + 2]]) @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[RS.host, RS.ports[2]])
end
assert @conn.connected? assert @conn.connected?
end end
end end

View File

@ -1,6 +1,4 @@
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) $:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require 'mongo'
require 'test/unit'
require './test/replica_sets/rs_test_helper' require './test/replica_sets/rs_test_helper'
# NOTE: This test expects a replica set of three nodes to be running # NOTE: This test expects a replica set of three nodes to be running
@ -20,10 +18,10 @@ class ReplicaSetCountTest < Test::Unit::TestCase
end end
def test_correct_count_after_insertion_reconnect def test_correct_count_after_insertion_reconnect
@coll.insert({:a => 20})#, :safe => {:w => 3, :wtimeout => 10000}) @coll.insert({:a => 20}, :safe => {:w => 2, :wtimeout => 10000})
assert_equal 1, @coll.count assert_equal 1, @coll.count
# Disconnecting the current master node # Kill the current master node
@node = RS.kill_primary @node = RS.kill_primary
rescue_connection_failure do rescue_connection_failure do

View File

@ -0,0 +1,29 @@
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require './test/test_helper'
require './test/tools/repl_set_manager'
unless defined? RS
RS = ReplSetManager.new
RS.start_set
end
class Test::Unit::TestCase
# Generic code for rescuing connection failures and retrying operations.
# This could be combined with some timeout functionality.
def rescue_connection_failure(max_retries=60)
success = false
tries = 0
while !success && tries < max_retries
begin
yield
success = true
rescue Mongo::ConnectionFailure
puts "Rescuing attempt #{tries}"
tries += 1
sleep(1)
end
end
end
end

View File

@ -8,18 +8,27 @@ end
class ReplSetManager class ReplSetManager
attr_accessor :host, :start_port, :ports attr_accessor :host, :start_port, :ports, :name, :mongods
def initialize(opts={}) def initialize(opts={})
@start_port = opts[:start_port] || 30000 @start_port = opts[:start_port] || 30000
@ports = [] @ports = []
@name = opts[:name] || 'replica-set-foo' @name = opts[:name] || 'replica-set-foo'
@count = opts[:count] || 3
@host = opts[:host] || 'localhost' @host = opts[:host] || 'localhost'
@retries = opts[:retries] || 60 @retries = opts[:retries] || 60
@config = {"_id" => @name, "members" => []} @config = {"_id" => @name, "members" => []}
@path = File.join(File.expand_path(File.dirname(__FILE__)), "data") @path = File.join(File.expand_path(File.dirname(__FILE__)), "data")
@passive_count = opts[:secondary_count] || 1
@arbiter_count = opts[:arbiter_count] || 1
@secondary_count = opts[:passive_count] || 1
@primary_count = 1
@count = @primary_count + @passive_count + @arbiter_count + @secondary_count
if @count > 7
raise StandardError, "Cannot create a replica set with #{node_count} nodes. 7 is the max."
end
@mongods = {} @mongods = {}
end end
@ -28,34 +37,57 @@ class ReplSetManager
system("killall mongod") system("killall mongod")
@count.times do |n| n = 0
@mongods[n] ||= {} (@primary_count + @secondary_count).times do |n|
port = @start_port + n init_node(n)
@ports << port n += 1
@mongods[n]['port'] = port
@mongods[n]['db_path'] = get_path("rs-#{port}")
@mongods[n]['log_path'] = get_path("log-#{port}")
system("rm -rf #{@mongods[n]['db_path']}")
system("mkdir -p #{@mongods[n]['db_path']}")
@mongods[n]['start'] = "mongod --replSet #{@name} --logpath '#{@mongods[n]['log_path']}' " +
" --dbpath #{@mongods[n]['db_path']} --port #{@mongods[n]['port']} --fork"
start(n)
member = {'_id' => n, 'host' => "#{@host}:#{@mongods[n]['port']}"}
if n == @count-1
@mongods[n]['arbiter'] = true
member['arbiterOnly'] = true
end
@config['members'] << member
end end
init @passive_count.times do
init_node(n) do |attrs|
attrs['priority'] = 0
end
n += 1
end
@arbiter_count.times do
init_node(n) do |attrs|
attrs['arbiterOnly'] = true
end
n += 1
end
initiate
ensure_up ensure_up
end end
def init_node(n)
@mongods[n] ||= {}
port = @start_port + n
@ports << port
@mongods[n]['port'] = port
@mongods[n]['db_path'] = get_path("rs-#{port}")
@mongods[n]['log_path'] = get_path("log-#{port}")
system("rm -rf #{@mongods[n]['db_path']}")
system("mkdir -p #{@mongods[n]['db_path']}")
@mongods[n]['start'] = "mongod --replSet #{@name} --logpath '#{@mongods[n]['log_path']}' " +
" --dbpath #{@mongods[n]['db_path']} --port #{@mongods[n]['port']} --fork"
start(n)
member = {'_id' => n, 'host' => "#{@host}:#{@mongods[n]['port']}"}
if block_given?
custom_attrs = {}
yield custom_attrs
member.merge!(custom_attrs)
@mongods[n].merge!(custom_attrs)
end
@config['members'] << member
end
def kill(node) def kill(node)
system("kill -2 #{@mongods[node]['pid']}") system("kill -2 #{@mongods[node]['pid']}")
@mongods[node]['up'] = false @mongods[node]['up'] = false
@ -68,12 +100,40 @@ class ReplSetManager
return node return node
end end
# Note that we have to rescue a connection failure
# when we run the StepDown command because that
# command will close the connection.
def step_down_primary
primary = get_node_with_state(1)
con = get_connection(primary)
begin
con['admin'].command({'replSetStepDown' => 90})
rescue Mongo::ConnectionFailure
end
end
def kill_secondary def kill_secondary
node = get_node_with_state(2) node = get_node_with_state(2)
kill(node) kill(node)
return node return node
end end
def restart_killed_nodes
nodes = @mongods.keys.select do |key|
@mongods[key]['up'] == false
end
nodes.each do |node|
start(node)
end
ensure_up
end
def get_node_from_port(port)
@mongods.keys.detect { |key| @mongods[key]['port'] == port }
end
def start(node) def start(node)
system(@mongods[node]['start']) system(@mongods[node]['start'])
@mongods[node]['up'] = true @mongods[node]['up'] = true
@ -84,12 +144,13 @@ class ReplSetManager
def ensure_up def ensure_up
print "Ensuring members are up..." print "Ensuring members are up..."
@con = get_connection
attempt(Mongo::OperationFailure) do attempt(Mongo::OperationFailure) do
status = @con['admin'].command({'replSetGetStatus' => 1}) con = get_connection
status = con['admin'].command({'replSetGetStatus' => 1})
print "." print "."
if status['members'].all? { |m| [1, 2, 7].include?(m['state']) } if status['members'].all? { |m| [1, 2, 7].include?(m['state']) } &&
status['members'].any? { |m| m['state'] == 1 }
puts "All members up!" puts "All members up!"
return status return status
else else
@ -98,13 +159,26 @@ class ReplSetManager
end end
end end
def primary
nodes = get_all_host_pairs_with_state(1)
nodes.empty? ? nil : nodes[0]
end
def secondaries
get_all_host_pairs_with_state(2)
end
def arbiters
get_all_host_pairs_with_state(7)
end
private private
def init def initiate
get_connection con = get_connection
attempt(Mongo::OperationFailure) do attempt(Mongo::OperationFailure) do
@con['admin'].command({'replSetInitiate' => @config}) con['admin'].command({'replSetInitiate' => @config})
end end
end end
@ -121,13 +195,25 @@ class ReplSetManager
end end
end end
def get_connection def get_all_host_pairs_with_state(state)
attempt(Mongo::ConnectionFailure) do status = ensure_up
node = @mongods.keys.detect {|key| !@mongods[key]['arbiter'] && @mongods[key]['up'] } nodes = status['members'].select {|m| m['state'] == state}
@con = Mongo::Connection.new(@host, @mongods[node]['port'], :slave_ok => true) nodes.map do |node|
host_port = node['name'].split(':')
port = host_port[1] ? host_port[1].to_i : 27017
[host, port]
end
end
def get_connection(node=nil)
con = attempt(Mongo::ConnectionFailure) do
if !node
node = @mongods.keys.detect {|key| !@mongods[key]['arbiterOnly'] && @mongods[key]['up'] }
end
con = Mongo::Connection.new(@host, @mongods[node]['port'], :slave_ok => true)
end end
return @con return con
end end
def get_path(name) def get_path(name)
@ -140,8 +226,7 @@ class ReplSetManager
while count < @retries do while count < @retries do
begin begin
yield return yield
return
rescue exception rescue exception
sleep(1) sleep(1)
count += 1 count += 1