Replica Set test harness refactoring.
This commit is contained in:
parent
3e7c28e1ef
commit
68627cef3d
|
@ -154,12 +154,19 @@ module Mongo
|
|||
def connect
|
||||
log(:info, "Connecting...")
|
||||
return if @connected
|
||||
p @seeds
|
||||
manager = PoolManager.new(self, @seeds)
|
||||
manager.connect
|
||||
|
||||
update_config(manager)
|
||||
initiate_refresh_mode
|
||||
|
||||
puts "Primary: #{@manager.primary}"
|
||||
puts "Secondaries: #{@manager.secondaries}"
|
||||
if @manager.read_pool
|
||||
c = Connection.new(@manager.read_pool.host, @manager.read_pool.port, :slave_ok => true)
|
||||
p c['admin'].command({:replSetGetStatus => 1})
|
||||
end
|
||||
if @require_primary && self.primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
|
||||
close
|
||||
raise ConnectionFailure, "Failed to connect to primary node."
|
||||
|
@ -311,7 +318,7 @@ module Mongo
|
|||
socket = get_socket_from_pool(self.read_pool)
|
||||
|
||||
if !socket
|
||||
refresh
|
||||
connect
|
||||
socket = get_socket_from_pool(self.primary_pool)
|
||||
end
|
||||
|
||||
|
@ -328,7 +335,7 @@ module Mongo
|
|||
socket = get_socket_from_pool(self.primary_pool)
|
||||
|
||||
if !socket
|
||||
refresh
|
||||
connect
|
||||
socket = get_socket_from_pool(self.primary_pool)
|
||||
end
|
||||
|
||||
|
@ -412,10 +419,6 @@ module Mongo
|
|||
@manager.tag_map
|
||||
end
|
||||
|
||||
def seeds
|
||||
@manager.seeds
|
||||
end
|
||||
|
||||
def max_bson_size
|
||||
@manager.max_bson_size
|
||||
end
|
||||
|
@ -468,10 +471,6 @@ module Mongo
|
|||
# Condition variable for signal and wait
|
||||
@queue = ConditionVariable.new
|
||||
|
||||
# Connection pool for primay node
|
||||
@primary = nil
|
||||
@primary_pool = nil
|
||||
|
||||
@logger = opts[:logger] || nil
|
||||
|
||||
if @logger
|
||||
|
@ -488,6 +487,7 @@ module Mongo
|
|||
def update_config(new_manager)
|
||||
old_manager = @manager
|
||||
@manager = new_manager
|
||||
@seeds = @manager.seeds.dup
|
||||
@sockets_to_pools.clear
|
||||
old_manager.close if old_manager
|
||||
end
|
||||
|
|
|
@ -1,48 +1,46 @@
|
|||
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
||||
require './test/replica_sets/rs_test_helper'
|
||||
|
||||
class ConnectTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
class BasicTest < Test::Unit::TestCase
|
||||
include ReplicaSetTest
|
||||
|
||||
def teardown
|
||||
RS.restart_killed_nodes
|
||||
self.rs.restart_killed_nodes
|
||||
@conn.close if defined?(@conn) && @conn
|
||||
end
|
||||
|
||||
def test_connect
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[1]], [RS.host, RS.ports[0]],
|
||||
[RS.host, RS.ports[2]], :name => RS.name)
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[1]], [self.rs.host, self.rs.ports[0]],
|
||||
[self.rs.host, self.rs.ports[2]], :name => self.rs.name)
|
||||
assert @conn.connected?
|
||||
|
||||
assert_equal RS.primary, @conn.primary
|
||||
assert_equal RS.secondaries.sort, @conn.secondaries.sort
|
||||
assert_equal RS.arbiters.sort, @conn.arbiters.sort
|
||||
assert_equal self.rs.primary, @conn.primary
|
||||
assert_equal self.rs.secondaries.sort, @conn.secondaries.sort
|
||||
assert_equal self.rs.arbiters.sort, @conn.arbiters.sort
|
||||
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[1]], [RS.host, RS.ports[0]],
|
||||
:name => RS.name)
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[1]], [self.rs.host, self.rs.ports[0]],
|
||||
:name => self.rs.name)
|
||||
assert @conn.connected?
|
||||
end
|
||||
|
||||
def test_accessors
|
||||
seeds = [RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]]
|
||||
args = seeds << {:name => RS.name}
|
||||
seeds = [[self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]]]
|
||||
args = seeds << {:name => self.rs.name}
|
||||
@conn = ReplSetConnection.new(*args)
|
||||
|
||||
assert_equal @conn.host, RS.primary[0]
|
||||
assert_equal @conn.port, RS.primary[1]
|
||||
assert_equal @conn.host, self.rs.primary[0]
|
||||
assert_equal @conn.port, self.rs.primary[1]
|
||||
assert_equal @conn.host, @conn.primary_pool.host
|
||||
assert_equal @conn.port, @conn.primary_pool.port
|
||||
assert_equal @conn.nodes, @conn.seeds
|
||||
assert_equal @conn.nodes.sort, @conn.seeds.sort
|
||||
assert_equal 2, @conn.secondaries.length
|
||||
assert_equal 2, @conn.arbiters.length
|
||||
assert_equal 0, @conn.arbiters.length
|
||||
assert_equal 2, @conn.secondary_pools.length
|
||||
assert_equal RS.name, @conn.replica_set_name
|
||||
assert_equal self.rs.name, @conn.replica_set_name
|
||||
assert @conn.secondary_pools.include?(@conn.read_pool)
|
||||
assert_equal seeds.sort {|a,b| a[1] <=> b[1]},
|
||||
@conn.seeds.sort {|a,b| a[1] <=> b[1]}
|
||||
assert_equal 5, @conn.tag_map.keys.length
|
||||
assert_equal 90, @conn.refresh_interval
|
||||
assert_equal @conn.refresh_mode, :sync
|
||||
assert_equal @conn.refresh_mode, false
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,76 +1,88 @@
|
|||
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
||||
require './test/replica_sets/rs_test_helper'
|
||||
|
||||
# NOTE: This test expects a replica set of three nodes to be running on RS.host,
|
||||
# on ports TEST_PORT, RS.ports[1], and TEST + 2.
|
||||
class ConnectTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
include ReplicaSetTest
|
||||
|
||||
def teardown
|
||||
RS.restart_killed_nodes
|
||||
self.rs.restart_killed_nodes
|
||||
@conn.close if defined?(@conn) && @conn
|
||||
end
|
||||
|
||||
# TODO: test connect timeout.
|
||||
|
||||
def test_connect_with_deprecated_multi
|
||||
@conn = Connection.multi([[RS.host, RS.ports[0]], [RS.host, RS.ports[1]]], :name => RS.name)
|
||||
@conn = Connection.multi([[self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]]], :name => self.rs.name)
|
||||
assert @conn.is_a?(ReplSetConnection)
|
||||
assert @conn.connected?
|
||||
end
|
||||
|
||||
def test_connect_bad_name
|
||||
assert_raise_error(ReplicaSetConnectionError, "-wrong") do
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]], :name => RS.name + "-wrong")
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]], :name => self.rs.name + "-wrong")
|
||||
end
|
||||
end
|
||||
|
||||
def test_connect_with_primary_node_killed
|
||||
node = RS.kill_primary
|
||||
node = self.rs.kill_primary
|
||||
|
||||
# Becuase we're killing the primary and trying to connect right away,
|
||||
# this is going to fail right away.
|
||||
assert_raise_error(ConnectionFailure, "Failed to connect to primary node") do
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]])
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]])
|
||||
end
|
||||
|
||||
# This allows the secondary to come up as a primary
|
||||
rescue_connection_failure do
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]])
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]])
|
||||
end
|
||||
end
|
||||
|
||||
def test_connect_with_secondary_node_killed
|
||||
node = RS.kill_secondary
|
||||
node = self.rs.kill_secondary
|
||||
|
||||
rescue_connection_failure do
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]])
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]])
|
||||
end
|
||||
assert @conn.connected?
|
||||
end
|
||||
|
||||
def test_connect_with_third_node_killed
|
||||
RS.kill(RS.get_node_from_port(RS.ports[2]))
|
||||
self.rs.kill(self.rs.get_node_from_port(self.rs.ports[2]))
|
||||
|
||||
rescue_connection_failure do
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]])
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]])
|
||||
end
|
||||
assert @conn.connected?
|
||||
end
|
||||
|
||||
def test_connect_with_primary_stepped_down
|
||||
RS.step_down_primary
|
||||
self.rs.step_down_primary
|
||||
|
||||
rescue_connection_failure do
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]])
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]])
|
||||
end
|
||||
assert @conn.connected?
|
||||
end
|
||||
|
||||
def test_connect_with_connection_string
|
||||
@conn = Connection.from_uri("mongodb://#{self.rs.host}:#{self.rs.ports[0]},#{self.rs.host}:#{self.rs.ports[1]}?replicaset=#{self.rs.name}")
|
||||
assert @conn.is_a?(ReplSetConnection)
|
||||
assert @conn.connected?
|
||||
end
|
||||
|
||||
def test_connect_with_full_connection_string
|
||||
@conn = Connection.from_uri("mongodb://#{self.rs.host}:#{self.rs.ports[0]},#{self.rs.host}:#{self.rs.ports[1]}?replicaset=#{self.rs.name};safe=true;w=2;fsync=true;slaveok=true")
|
||||
assert @conn.is_a?(ReplSetConnection)
|
||||
assert @conn.connected?
|
||||
assert_equal 2, @conn.safe[:w]
|
||||
assert @conn.safe[:fsync]
|
||||
assert @conn.read_pool
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,29 +0,0 @@
|
|||
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
||||
require './test/replica_sets/rs_test_helper'
|
||||
|
||||
# NOTE: This test expects a replica set of three nodes to be running on RS.host,
|
||||
# on ports TEST_PORT, RS.ports[1], and TEST + 2.
|
||||
class ConnectionStringTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
|
||||
def teardown
|
||||
RS.restart_killed_nodes
|
||||
@conn.close if @conn
|
||||
end
|
||||
|
||||
def test_connect_with_connection_string
|
||||
@conn = Connection.from_uri("mongodb://#{RS.host}:#{RS.ports[0]},#{RS.host}:#{RS.ports[1]}?replicaset=#{RS.name}")
|
||||
assert @conn.is_a?(ReplSetConnection)
|
||||
assert @conn.connected?
|
||||
end
|
||||
|
||||
def test_connect_with_full_connection_string
|
||||
@conn = Connection.from_uri("mongodb://#{RS.host}:#{RS.ports[0]},#{RS.host}:#{RS.ports[1]}?replicaset=#{RS.name};safe=true;w=2;fsync=true;slaveok=true")
|
||||
assert @conn.is_a?(ReplSetConnection)
|
||||
assert @conn.connected?
|
||||
assert_equal 2, @conn.safe[:w]
|
||||
assert @conn.safe[:fsync]
|
||||
assert @conn.read_pool
|
||||
end
|
||||
|
||||
end
|
|
@ -1,13 +1,12 @@
|
|||
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
||||
require './test/replica_sets/rs_test_helper'
|
||||
|
||||
# NOTE: This test expects a replica set of three nodes to be running
|
||||
# on the local host.
|
||||
class ReplicaSetCountTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
include ReplicaSetTest
|
||||
|
||||
def setup
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], [RS.host, RS.ports[2]],
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]],
|
||||
[self.rs.host, self.rs.ports[1]], [self.rs.host, self.rs.ports[2]],
|
||||
:read => :secondary)
|
||||
assert @conn.primary_pool
|
||||
@primary = Connection.new(@conn.primary_pool.host, @conn.primary_pool.port)
|
||||
|
@ -17,7 +16,7 @@ class ReplicaSetCountTest < Test::Unit::TestCase
|
|||
end
|
||||
|
||||
def teardown
|
||||
RS.restart_killed_nodes
|
||||
self.rs.restart_killed_nodes
|
||||
@conn.close if @conn
|
||||
end
|
||||
|
||||
|
@ -26,7 +25,7 @@ class ReplicaSetCountTest < Test::Unit::TestCase
|
|||
assert_equal 1, @coll.count
|
||||
|
||||
# Kill the current master node
|
||||
@node = RS.kill_primary
|
||||
@node = self.rs.kill_primary
|
||||
|
||||
rescue_connection_failure do
|
||||
@coll.insert({:a => 30}, :safe => true)
|
||||
|
|
|
@ -1,27 +1,26 @@
|
|||
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
||||
require './test/replica_sets/rs_test_helper'
|
||||
|
||||
# NOTE: This test expects a replica set of three nodes to be running
|
||||
# on the local host.
|
||||
class ReplicaSetInsertTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
include ReplicaSetTest
|
||||
|
||||
def setup
|
||||
@conn = ReplSetConnection.new([TEST_HOST, RS.ports[0]], [TEST_HOST, RS.ports[1]], [TEST_HOST, RS.ports[2]])
|
||||
@conn = ReplSetConnection.new([TEST_HOST, self.rs.ports[0]],
|
||||
[TEST_HOST, self.rs.ports[1]], [TEST_HOST, self.rs.ports[2]])
|
||||
@db = @conn.db(MONGO_TEST_DB)
|
||||
@db.drop_collection("test-sets")
|
||||
@coll = @db.collection("test-sets")
|
||||
end
|
||||
|
||||
def teardown
|
||||
RS.restart_killed_nodes
|
||||
self.rs.restart_killed_nodes
|
||||
@conn.close if @conn
|
||||
end
|
||||
|
||||
def test_insert
|
||||
@coll.save({:a => 20}, :safe => true)
|
||||
|
||||
RS.kill_primary
|
||||
self.rs.kill_primary
|
||||
|
||||
rescue_connection_failure do
|
||||
@coll.save({:a => 30}, :safe => true)
|
||||
|
@ -33,7 +32,7 @@ class ReplicaSetInsertTest < Test::Unit::TestCase
|
|||
@coll.save({:a => 70}, :safe => true)
|
||||
|
||||
# Restart the old master and wait for sync
|
||||
RS.restart_killed_nodes
|
||||
self.rs.restart_killed_nodes
|
||||
sleep(1)
|
||||
results = []
|
||||
|
||||
|
|
|
@ -1,114 +0,0 @@
|
|||
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
||||
require './test/replica_sets/rs_test_helper'
|
||||
|
||||
# NOTE: This test expects a replica set of three nodes to be running
|
||||
# on the local host.
|
||||
class ReplicaSetQuerySecondariesTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
|
||||
def setup
|
||||
RS.restart_killed_nodes
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], :read => :secondary)
|
||||
@secondary = Connection.new(@conn.read_pool.host, @conn.read_pool.port, :slave_ok => true)
|
||||
@db = @conn.db(MONGO_TEST_DB)
|
||||
@db.drop_collection("test-sets")
|
||||
end
|
||||
|
||||
def teardown
|
||||
RS.restart_killed_nodes
|
||||
@conn.close if @conn
|
||||
end
|
||||
|
||||
def test_read_primary
|
||||
rescue_connection_failure do
|
||||
assert !@conn.read_primary?
|
||||
assert !@conn.primary?
|
||||
end
|
||||
end
|
||||
|
||||
def test_con
|
||||
assert @conn.primary_pool, "No primary pool!"
|
||||
assert @conn.read_pool, "No read pool!"
|
||||
assert @conn.primary_pool.port != @conn.read_pool.port,
|
||||
"Primary port and read port at the same!"
|
||||
end
|
||||
|
||||
def test_query_secondaries
|
||||
@coll = @db.collection("test-sets", :safe => {:w => 3, :wtimeout => 20000})
|
||||
@coll.save({:a => 20})
|
||||
@coll.save({:a => 30})
|
||||
@coll.save({:a => 40})
|
||||
results = []
|
||||
queries_before = @secondary['admin'].command({:serverStatus => 1})['opcounters']['query']
|
||||
@coll.find.each {|r| results << r["a"]}
|
||||
queries_after = @secondary['admin'].command({:serverStatus => 1})['opcounters']['query']
|
||||
assert_equal 1, queries_after - queries_before
|
||||
assert results.include?(20)
|
||||
assert results.include?(30)
|
||||
assert results.include?(40)
|
||||
|
||||
RS.kill_primary
|
||||
|
||||
results = []
|
||||
rescue_connection_failure do
|
||||
@coll.find.each {|r| results << r}
|
||||
[20, 30, 40].each do |a|
|
||||
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def test_kill_primary
|
||||
@coll = @db.collection("test-sets", :safe => {:w => 3, :wtimeout => 10000})
|
||||
@coll.save({:a => 20})
|
||||
@coll.save({:a => 30})
|
||||
assert_equal 2, @coll.find.to_a.length
|
||||
|
||||
# Should still be able to read immediately after killing master node
|
||||
RS.kill_primary
|
||||
assert_equal 2, @coll.find.to_a.length
|
||||
rescue_connection_failure do
|
||||
@coll.save({:a => 50}, :safe => {:w => 2, :wtimeout => 10000})
|
||||
end
|
||||
RS.restart_killed_nodes
|
||||
@coll.save({:a => 50}, :safe => {:w => 2, :wtimeout => 10000})
|
||||
assert_equal 4, @coll.find.to_a.length
|
||||
end
|
||||
|
||||
def test_kill_secondary
|
||||
@coll = @db.collection("test-sets", {:safe => {:w => 3, :wtimeout => 20000}})
|
||||
@coll.save({:a => 20})
|
||||
@coll.save({:a => 30})
|
||||
assert_equal 2, @coll.find.to_a.length
|
||||
|
||||
read_node = RS.get_node_from_port(@conn.read_pool.port)
|
||||
RS.kill(read_node)
|
||||
|
||||
# Should fail immediately on next read
|
||||
old_read_pool_port = @conn.read_pool.port
|
||||
assert_raise ConnectionFailure do
|
||||
@coll.find.to_a.length
|
||||
end
|
||||
|
||||
# Should eventually reconnect and be able to read
|
||||
rescue_connection_failure do
|
||||
length = @coll.find.to_a.length
|
||||
assert_equal 2, length
|
||||
end
|
||||
new_read_pool_port = @conn.read_pool.port
|
||||
assert old_read_pool_port != new_read_pool_port
|
||||
end
|
||||
|
||||
def test_write_lots_of_data
|
||||
@coll = @db.collection("test-sets", {:safe => {:w => 2}})
|
||||
|
||||
6000.times do |n|
|
||||
@coll.save({:a => n})
|
||||
end
|
||||
|
||||
cursor = @coll.find()
|
||||
cursor.next
|
||||
cursor.close
|
||||
end
|
||||
|
||||
end
|
|
@ -1,20 +1,18 @@
|
|||
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
||||
require './test/replica_sets/rs_test_helper'
|
||||
|
||||
# NOTE: This test expects a replica set of three nodes to be running
|
||||
# on the local host.
|
||||
class ReplicaSetQueryTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
include ReplicaSetTest
|
||||
|
||||
def setup
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0], RS.ports[1]])
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0], self.rs.ports[1]])
|
||||
@db = @conn.db(MONGO_TEST_DB)
|
||||
@db.drop_collection("test-sets")
|
||||
@coll = @db.collection("test-sets")
|
||||
end
|
||||
|
||||
def teardown
|
||||
RS.restart_killed_nodes
|
||||
self.rs.restart_killed_nodes
|
||||
@conn.close if @conn
|
||||
end
|
||||
|
||||
|
@ -30,7 +28,7 @@ class ReplicaSetQueryTest < Test::Unit::TestCase
|
|||
|
||||
puts "Benchmark before failover: #{benchmark_queries}"
|
||||
|
||||
RS.kill_primary
|
||||
self.rs.kill_primary
|
||||
|
||||
results = []
|
||||
rescue_connection_failure do
|
||||
|
|
|
@ -2,26 +2,117 @@ $:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
|||
require './test/replica_sets/rs_test_helper'
|
||||
require 'logger'
|
||||
|
||||
# TODO: enable this once we enable reads from tags.
|
||||
class ReadPreferenceTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
include ReplicaSetTest
|
||||
|
||||
def test_long_write_with_async_refresh
|
||||
def setup
|
||||
log = Logger.new("test.log")
|
||||
conn = ReplSetConnection.new([RS.host, RS.ports[0], RS.host, RS.ports[1]],
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]],
|
||||
[self.rs.host, self.rs.ports[1]],
|
||||
:read => :secondary, :pool_size => 50,
|
||||
:refresh_mode => :sync, :refresh_interval => 5, :logger => log)
|
||||
:refresh_mode => false, :refresh_interval => 5, :logger => log)
|
||||
@db = @conn.db(MONGO_TEST_DB)
|
||||
@db.drop_collection("test-sets")
|
||||
col = @db['mongo-test']
|
||||
end
|
||||
|
||||
db = conn.db(MONGO_TEST_DB)
|
||||
db.drop_collection("test-sets")
|
||||
col = db['mongo-test']
|
||||
def teardown
|
||||
self.rs.restart_killed_nodes
|
||||
end
|
||||
|
||||
100000.times do |n|
|
||||
col.insert({:n => n, :str => "0000000000"})
|
||||
def test_read_primary
|
||||
rescue_connection_failure do
|
||||
assert !@conn.read_primary?
|
||||
assert !@conn.primary?
|
||||
end
|
||||
end
|
||||
|
||||
def test_con
|
||||
assert @conn.primary_pool, "No primary pool!"
|
||||
assert @conn.read_pool, "No read pool!"
|
||||
assert @conn.primary_pool.port != @conn.read_pool.port,
|
||||
"Primary port and read port at the same!"
|
||||
end
|
||||
|
||||
def test_query_secondaries
|
||||
@secondary = Connection.new(self.rs.host, @conn.read_pool.port, :slave_ok => true)
|
||||
@coll = @db.collection("test-sets", :safe => {:w => 3, :wtimeout => 20000})
|
||||
@coll.save({:a => 20})
|
||||
@coll.save({:a => 30})
|
||||
@coll.save({:a => 40})
|
||||
results = []
|
||||
queries_before = @secondary['admin'].command({:serverStatus => 1})['opcounters']['query']
|
||||
@coll.find.each {|r| results << r["a"]}
|
||||
queries_after = @secondary['admin'].command({:serverStatus => 1})['opcounters']['query']
|
||||
assert_equal 1, queries_after - queries_before
|
||||
assert results.include?(20)
|
||||
assert results.include?(30)
|
||||
assert results.include?(40)
|
||||
|
||||
self.rs.kill_primary
|
||||
|
||||
results = []
|
||||
rescue_connection_failure do
|
||||
puts "@coll.find().each"
|
||||
@coll.find.each {|r| results << r}
|
||||
[20, 30, 40].each do |a|
|
||||
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def test_kill_primary
|
||||
@coll = @db.collection("test-sets", :safe => {:w => 3, :wtimeout => 10000})
|
||||
@coll.save({:a => 20})
|
||||
@coll.save({:a => 30})
|
||||
assert_equal 2, @coll.find.to_a.length
|
||||
|
||||
# Should still be able to read immediately after killing master node
|
||||
self.rs.kill_primary
|
||||
assert_equal 2, @coll.find.to_a.length
|
||||
rescue_connection_failure do
|
||||
puts "@coll.save()"
|
||||
@coll.save({:a => 50}, :safe => {:w => 2, :wtimeout => 10000})
|
||||
end
|
||||
self.rs.restart_killed_nodes
|
||||
@coll.save({:a => 50}, :safe => {:w => 2, :wtimeout => 10000})
|
||||
assert_equal 4, @coll.find.to_a.length
|
||||
end
|
||||
|
||||
def test_kill_secondary
|
||||
@coll = @db.collection("test-sets", {:safe => {:w => 3, :wtimeout => 20000}})
|
||||
@coll.save({:a => 20})
|
||||
@coll.save({:a => 30})
|
||||
assert_equal 2, @coll.find.to_a.length
|
||||
|
||||
read_node = self.rs.get_node_from_port(@conn.read_pool.port)
|
||||
self.rs.kill(read_node)
|
||||
|
||||
# Should fail immediately on next read
|
||||
old_read_pool_port = @conn.read_pool.port
|
||||
assert_raise ConnectionFailure do
|
||||
@coll.find.to_a.length
|
||||
end
|
||||
|
||||
assert col.find.to_a
|
||||
col.remove
|
||||
# Should eventually reconnect and be able to read
|
||||
rescue_connection_failure do
|
||||
length = @coll.find.to_a.length
|
||||
assert_equal 2, length
|
||||
end
|
||||
new_read_pool_port = @conn.read_pool.port
|
||||
assert old_read_pool_port != new_read_pool_port
|
||||
end
|
||||
|
||||
def test_write_lots_of_data
|
||||
@coll = @db.collection("test-sets", {:safe => {:w => 2}})
|
||||
|
||||
6000.times do |n|
|
||||
@coll.save({:a => n})
|
||||
end
|
||||
|
||||
cursor = @coll.find()
|
||||
cursor.next
|
||||
cursor.close
|
||||
end
|
||||
|
||||
# TODO: enable this once we enable reads from tags.
|
||||
|
@ -50,7 +141,7 @@ class ReadPreferenceTest < Test::Unit::TestCase
|
|||
# end
|
||||
|
||||
#def teardown
|
||||
# RS.restart_killed_nodes
|
||||
# self.rs.restart_killed_nodes
|
||||
#end
|
||||
|
||||
end
|
||||
|
|
|
@ -2,16 +2,15 @@ $:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
|||
require './test/replica_sets/rs_test_helper'
|
||||
require 'benchmark'
|
||||
|
||||
# on ports TEST_PORT, RS.ports[1], and TEST + 2.
|
||||
class ReplicaSetRefreshTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
include ReplicaSetTest
|
||||
|
||||
def setup
|
||||
@conn = nil
|
||||
end
|
||||
|
||||
def teardown
|
||||
RS.restart_killed_nodes
|
||||
self.rs.restart_killed_nodes
|
||||
@conn.close if @conn
|
||||
end
|
||||
|
||||
|
@ -19,13 +18,13 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||
Benchmark.bm do |x|
|
||||
x.report("Connect") do
|
||||
10.times do
|
||||
ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]], :refresh_mode => false)
|
||||
ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]], :refresh_mode => false)
|
||||
end
|
||||
end
|
||||
|
||||
@con = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]], :refresh_mode => false)
|
||||
@con = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]], :refresh_mode => false)
|
||||
|
||||
x.report("manager") do
|
||||
man = Mongo::PoolManager.new(@con, @con.seeds)
|
||||
|
@ -37,11 +36,11 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||
end
|
||||
|
||||
def test_connect_and_manual_refresh_with_secondaries_down
|
||||
RS.kill_all_secondaries
|
||||
self.rs.kill_all_secondaries
|
||||
|
||||
rescue_connection_failure do
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]], :refresh_mode => false)
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]], :refresh_mode => false)
|
||||
end
|
||||
|
||||
assert_equal [], @conn.secondaries
|
||||
|
@ -54,7 +53,7 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||
assert @conn.connected?
|
||||
assert_equal @conn.read_pool, @conn.primary_pool
|
||||
|
||||
RS.restart_killed_nodes
|
||||
self.rs.restart_killed_nodes
|
||||
assert_equal [], @conn.secondaries
|
||||
assert @conn.connected?
|
||||
assert_equal @conn.read_pool, @conn.primary_pool
|
||||
|
@ -66,18 +65,18 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||
end
|
||||
|
||||
def test_automated_refresh_with_secondaries_down
|
||||
RS.kill_all_secondaries
|
||||
self.rs.kill_all_secondaries
|
||||
|
||||
rescue_connection_failure do
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
||||
end
|
||||
|
||||
assert_equal [], @conn.secondaries
|
||||
assert @conn.connected?
|
||||
assert_equal @conn.read_pool, @conn.primary_pool
|
||||
|
||||
RS.restart_killed_nodes
|
||||
self.rs.restart_killed_nodes
|
||||
sleep(4)
|
||||
|
||||
assert @conn.read_pool != @conn.primary_pool, "Read pool and primary pool are identical."
|
||||
|
@ -85,31 +84,31 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||
end
|
||||
|
||||
def test_automated_refresh_with_removed_node
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
||||
|
||||
@conn.secondary_pools
|
||||
assert_equal 2, @conn.secondary_pools.length
|
||||
assert_equal 2, @conn.secondaries.length
|
||||
|
||||
n = RS.remove_secondary_node
|
||||
n = self.rs.remove_secondary_node
|
||||
sleep(4)
|
||||
|
||||
assert_equal 1, @conn.secondaries.length
|
||||
assert_equal 1, @conn.secondary_pools.length
|
||||
|
||||
RS.add_node(n)
|
||||
self.rs.add_node(n)
|
||||
end
|
||||
|
||||
def test_adding_and_removing_nodes
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
||||
|
||||
RS.add_node
|
||||
self.rs.add_node
|
||||
sleep(4)
|
||||
|
||||
@conn2 = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
||||
@conn2 = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
|
||||
[self.rs.host, self.rs.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
|
||||
|
||||
assert @conn2.secondaries == @conn.secondaries
|
||||
assert_equal 3, @conn.secondary_pools.length
|
||||
|
@ -117,7 +116,7 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||
|
||||
config = @conn['admin'].command({:ismaster => 1})
|
||||
|
||||
RS.remove_secondary_node
|
||||
self.rs.remove_secondary_node
|
||||
sleep(4)
|
||||
config = @conn['admin'].command({:ismaster => 1})
|
||||
|
||||
|
|
|
@ -1,14 +1,11 @@
|
|||
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
||||
require './test/replica_sets/rs_test_helper'
|
||||
|
||||
# NOTE: This test expects a replica set of three nodes to be running on local host.
|
||||
class ReplicaSetAckTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
include ReplicaSetTest
|
||||
|
||||
def setup
|
||||
RS.ensure_up
|
||||
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]])
|
||||
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]])
|
||||
|
||||
@slave1 = Connection.new(@conn.secondary_pools[0].host,
|
||||
@conn.secondary_pools[0].port, :slave_ok => true)
|
||||
|
@ -21,7 +18,7 @@ class ReplicaSetAckTest < Test::Unit::TestCase
|
|||
end
|
||||
|
||||
def teardown
|
||||
RS.restart_killed_nodes
|
||||
self.rs.restart_killed_nodes
|
||||
@conn.close if @conn
|
||||
end
|
||||
|
||||
|
|
|
@ -2,12 +2,19 @@ $:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
|||
require './test/test_helper'
|
||||
require './test/tools/repl_set_manager'
|
||||
|
||||
unless defined? RS
|
||||
RS = ReplSetManager.new
|
||||
RS.start_set
|
||||
end
|
||||
module ReplicaSetTest
|
||||
|
||||
class Test::Unit::TestCase
|
||||
def self.rs
|
||||
unless defined?(@rs)
|
||||
@rs = ReplSetManager.new
|
||||
@rs.start_set
|
||||
end
|
||||
@rs
|
||||
end
|
||||
|
||||
def rs
|
||||
ReplicaSetTest.rs
|
||||
end
|
||||
|
||||
# Generic code for rescuing connection failures and retrying operations.
|
||||
# This could be combined with some timeout functionality.
|
||||
|
@ -23,5 +30,4 @@ class Test::Unit::TestCase
|
|||
retry
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -20,13 +20,14 @@ class ReplSetManager
|
|||
@retries = opts[:retries] || 30
|
||||
@config = {"_id" => @name, "members" => []}
|
||||
@durable = opts.fetch(:durable, false)
|
||||
@smallfiles = opts.fetch(:smallfiles, true)
|
||||
@path = File.join(File.expand_path(File.dirname(__FILE__)), "data")
|
||||
@oplog_size = opts.fetch(:oplog_size, 32)
|
||||
@oplog_size = opts.fetch(:oplog_size, 16)
|
||||
@tags = [{"dc" => "ny", "rack" => "a", "db" => "main"},
|
||||
{"dc" => "ny", "rack" => "b", "db" => "main"},
|
||||
{"dc" => "sf", "rack" => "a", "db" => "main"}]
|
||||
|
||||
@arbiter_count = opts[:arbiter_count] || 2
|
||||
@arbiter_count = opts[:arbiter_count] || 0
|
||||
@secondary_count = opts[:secondary_count] || 2
|
||||
@passive_count = opts[:passive_count] || 0
|
||||
@primary_count = 1
|
||||
|
@ -43,20 +44,11 @@ class ReplSetManager
|
|||
end
|
||||
|
||||
def start_set
|
||||
begin
|
||||
con = Mongo::Connection.new(@host, @start_port)
|
||||
rescue Mongo::ConnectionFailure
|
||||
con = false
|
||||
end
|
||||
|
||||
if con && ensure_up(1, con)
|
||||
should_start = false
|
||||
puts "** Replica set already started."
|
||||
else
|
||||
should_start = true
|
||||
system("killall mongod")
|
||||
puts "** Starting a replica set with #{@count} nodes"
|
||||
end
|
||||
system("killall mongod")
|
||||
sleep(1)
|
||||
p system("ps aux | grep mongod")
|
||||
should_start = true
|
||||
puts "** Starting a replica set with #{@count} nodes"
|
||||
|
||||
n = 0
|
||||
(@primary_count + @secondary_count).times do
|
||||
|
@ -82,15 +74,9 @@ class ReplSetManager
|
|||
n += 1
|
||||
end
|
||||
|
||||
if con && ensure_up(1, con)
|
||||
@mongods.each do |k, v|
|
||||
v['up'] = true
|
||||
v['pid'] = File.open(File.join(v['db_path'], 'mongod.lock')).read.strip
|
||||
end
|
||||
else
|
||||
initiate
|
||||
ensure_up
|
||||
end
|
||||
initiate
|
||||
p system("ps aux | grep mongod")
|
||||
ensure_up
|
||||
end
|
||||
|
||||
def cleanup_set
|
||||
|
@ -143,6 +129,7 @@ class ReplSetManager
|
|||
@mongods[n]['start'] = "mongod --replSet #{@name} --logpath '#{@mongods[n]['log_path']}' " +
|
||||
"--oplogSize #{@oplog_size} #{journal_switch} --dbpath #{@mongods[n]['db_path']} --port #{@mongods[n]['port']} --fork"
|
||||
@mongods[n]['start'] += " --dur" if @durable
|
||||
@mongods[n]['start'] += " --smallfiles" if @smallfiles
|
||||
@mongods[n]['start']
|
||||
end
|
||||
|
||||
|
@ -254,21 +241,42 @@ class ReplSetManager
|
|||
print "** Ensuring members are up..."
|
||||
|
||||
attempt(n) do
|
||||
con = connection || get_connection
|
||||
status = con['admin'].command({'replSetGetStatus' => 1})
|
||||
print "."
|
||||
con = connection || get_connection
|
||||
status = con['admin'].command({:replSetGetStatus => 1})
|
||||
if status['members'].all? { |m| m['health'] == 1 &&
|
||||
[1, 2, 7].include?(m['state']) } &&
|
||||
status['members'].any? { |m| m['state'] == 1 }
|
||||
print "all members up!\n\n"
|
||||
con.close
|
||||
return status
|
||||
else
|
||||
con.close
|
||||
raise Mongo::OperationFailure
|
||||
end
|
||||
end
|
||||
[1, 2, 7].include?(m['state']) } &&
|
||||
status['members'].any? { |m| m['state'] == 1 }
|
||||
|
||||
connections = []
|
||||
states = []
|
||||
status['members'].each do |member|
|
||||
begin
|
||||
host, port = member['name'].split(':')
|
||||
port = port.to_i
|
||||
con = Mongo::Connection.new(host, port, :slave_ok => true)
|
||||
connections << con
|
||||
state = con['admin'].command({:ismaster => 1})
|
||||
states << state
|
||||
rescue ConnectionFailure
|
||||
connections.each {|c| c.close }
|
||||
raise Mongo::OperationFailure
|
||||
end
|
||||
end
|
||||
|
||||
if states.any? {|s| s['ismaster']}
|
||||
print "all members up!\n\n"
|
||||
connections.each {|c| c.close }
|
||||
con.close
|
||||
return status
|
||||
else
|
||||
raise Mongo::OperationFailure
|
||||
end
|
||||
else
|
||||
con.close
|
||||
raise Mongo::OperationFailure
|
||||
end
|
||||
end
|
||||
return false
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in New Issue