minor: test framework fixes; minor bug fixes

This commit is contained in:
Kyle Banker 2011-10-13 17:58:20 -04:00
parent 1647c89721
commit 1001e59e44
8 changed files with 45 additions and 94 deletions

View File

@ -30,6 +30,8 @@ module Mongo
Mutex = ::Mutex
ConditionVariable = ::ConditionVariable
Thread.abort_on_exception = true
DEFAULT_PORT = 27017
STANDARD_HEADER_SIZE = 16
RESPONSE_HEADER_SIZE = 20
@ -487,8 +489,10 @@ module Mongo
end
result = ''
send_message_on_socket(packed_message, sock)
result = receive(sock, request_id, exhaust)
@safe_mutexes[sock].synchronize do
send_message_on_socket(packed_message, sock)
result = receive(sock, request_id, exhaust)
end
ensure
if should_checkin
checkin(sock)
@ -628,6 +632,9 @@ module Mongo
# Default maximum BSON object size
@max_bson_size = Mongo::DEFAULT_MAX_BSON_SIZE
@safe_mutex_lock = Mutex.new
@safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}
# Determine whether to use SSL.
@ssl = opts.fetch(:ssl, false)
if @ssl

View File

@ -23,7 +23,7 @@ module Mongo
# Instantiates and manages connections to a MongoDB replica set.
class ReplSetConnection < Connection
attr_reader :secondaries, :arbiters, :secondary_pools,
:replica_set_name, :read_pool, :seeds, :primary_tag_pool,
:replica_set_name, :read_pool, :seeds, :tag_map,
:refresh_interval, :refresh_mode
# Create a connection to a MongoDB replica set.
@ -143,7 +143,7 @@ module Mongo
# Maps
@sockets_to_pools = {}
@primary_tag_pool = nil
@tag_map = nil
# Replica set name
if opts[:rs_name]
@ -197,7 +197,10 @@ module Mongo
# to get the refresh lock.
def refresh(opts={})
if !connected?
@logger.warn("Not connected")
log(:info, "Trying to refresh but not connected..." +
"skipping replica set health check.")
hard_refresh!
return true
end
log(:info, "Checking replica set connection health...")
@ -286,13 +289,6 @@ module Mongo
@refresh_thread = nil
end
if @nodes
@nodes.each do |member|
member.close
end
end
@nodes = []
@read_pool = nil
if @secondary_pools
@ -304,7 +300,7 @@ module Mongo
@secondaries = []
@secondary_pools = []
@arbiters = []
@primary_tag_pool = nil
@tag_map = nil
@sockets_to_pools.clear
end
end
@ -359,7 +355,7 @@ module Mongo
@primary_pool = manager.primary_pool
@read_pool = manager.read_pool
@secondary_pools = manager.secondary_pools
@primary_tag_pool = manager.primary_tag_pool
@tag_map = manager.tag_map
@seeds = manager.seeds
@manager = manager
@nodes = manager.nodes
@ -407,7 +403,7 @@ module Mongo
def checkout_tagged(tags)
sync_synchronize(:SH) do
tags.each do |k, v|
pool = @primary_tag_pool[{k.to_s => v}]
pool = @tag_map[{k.to_s => v}]
if pool
socket = pool.checkout
@sockets_to_pools[socket] = pool
@ -472,7 +468,11 @@ module Mongo
if pool = @sockets_to_pools[socket]
pool.checkin(socket)
elsif socket
begin
socket.close
rescue IOError
log(:info, "Tried to close socket #{socket} but already closed.")
end
end
end

View File

@ -14,6 +14,7 @@ module Mongo
end
@address = "#{host}:#{port}"
@config = nil
@socket = nil
end
def eql?(other)

View File

@ -3,12 +3,13 @@ module Mongo
attr_reader :connection, :seeds, :arbiters, :primary, :secondaries,
:primary_pool, :read_pool, :secondary_pools, :hosts, :nodes, :max_bson_size,
:tags_to_pools, :primary_tag_pool, :members
:tags_to_pools, :tag_map, :members
def initialize(connection, seeds)
@connection = connection
@seeds = seeds
@previously_connected = false
@refresh_required = false
end
def inspect
@ -25,7 +26,7 @@ module Mongo
initialize_pools(members)
update_seed_list(members)
set_read_pool
set_primary_tag_pools
set_tag_mappings
@members = members
@previously_connected = true
@ -36,8 +37,6 @@ module Mongo
# to our view. If any of these isn't the case,
# set @refresh_require to true, and return.
def check_connection_health
@refresh_required = false
begin
seed = get_valid_seed_node
rescue ConnectionFailure
@ -108,6 +107,8 @@ module Mongo
elsif member.last_state == :secondary &&
member.secondary?
return true
else # This node isn't what it used to be.
return false
end
end
end
@ -122,7 +123,7 @@ module Mongo
@hosts = Set.new
@members = Set.new
@tags_to_pools = {}
@primary_tag_pool = {}
@tag_map = {}
end
# Connect to each member of the replica set
@ -195,12 +196,12 @@ module Mongo
# If there's more than one pool associated with
# a given tag, choose a close one using the bucket method.
def set_primary_tag_pools
def set_tag_mappings
@tags_to_pools.each do |key, pool_list|
if pool_list.length == 1
@primary_tag_pool[key] = pool_list.first
@tag_map[key] = pool_list.first
else
@primary_tag_pool[key] = nearby_pool_from_set(pool_list)
@tag_map[key] = nearby_pool_from_set(pool_list)
end
end
end

View File

@ -40,7 +40,7 @@ class ConnectTest < Test::Unit::TestCase
assert @conn.secondary_pools.include?(@conn.read_pool)
assert_equal seeds.sort {|a,b| a[1] <=> b[1]},
@conn.seeds.sort {|a,b| a[1] <=> b[1]}
assert_equal 5, @conn.tags_to_pools.keys.length
assert_equal 5, @conn.tag_map.keys.length
assert_equal 90, @conn.refresh_interval
assert_equal @conn.refresh_mode, :sync
end

View File

@ -1,58 +0,0 @@
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require './test/replica_sets/rs_test_helper'
# NOTE: This test expects a replica set of three nodes to be running
# on the local host.
class ReplicaSetPooledInsertTest < Test::Unit::TestCase
include Mongo
def setup
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[RS.host, RS.ports[2]], :pool_size => 5, :timeout => 5)
@db = @conn.db(MONGO_TEST_DB)
@db.drop_collection("test-sets")
@coll = @db.collection("test-sets")
end
def teardown
RS.restart_killed_nodes
@conn.close if @conn
end
def test_insert
expected_results = [-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
@coll.save({:a => -1}, :safe => true)
RS.kill_primary
threads = []
10.times do |i|
threads[i] = Thread.new do
rescue_connection_failure do
@coll.save({:a => i}, :safe => true)
end
end
end
threads.each {|t| t.join}
# Restart the old master and wait for sync
RS.restart_killed_nodes
sleep(1)
results = []
rescue_connection_failure do
@coll.find.each {|r| results << r}
expected_results.each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end
end
@coll.save({:a => 10}, :safe => true)
@coll.find.each {|r| results << r}
(expected_results + [10]).each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a} on second find"
end
end
end

View File

@ -20,12 +20,12 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
x.report("Connect") do
10.times do
ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[RS.host, RS.ports[2]], :background_refresh => false)
[RS.host, RS.ports[2]], :refresh_mode => false)
end
end
@con = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[RS.host, RS.ports[2]], :background_refresh => false)
[RS.host, RS.ports[2]], :refresh_mode => false)
x.report("manager") do
man = Mongo::PoolManager.new(@con, @con.seeds)
@ -41,7 +41,7 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
rescue_connection_failure do
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[RS.host, RS.ports[2]], :background_refresh => false)
[RS.host, RS.ports[2]], :refresh_mode => false)
end
assert_equal [], @conn.secondaries
@ -70,7 +70,7 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
rescue_connection_failure do
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[RS.host, RS.ports[2]], :refresh_interval => 2, :background_refresh => true)
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :sync)
end
assert_equal [], @conn.secondaries
@ -87,8 +87,9 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
def test_automated_refresh_with_removed_node
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[RS.host, RS.ports[2]], :refresh_interval => 2, :background_refresh => true)
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :sync)
p @conn.secondary_pools
assert_equal 2, @conn.secondary_pools.length
assert_equal 2, @conn.secondaries.length
@ -103,13 +104,13 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
def test_adding_and_removing_nodes
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[RS.host, RS.ports[2]], :refresh_interval => 2, :background_refresh => true)
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
RS.add_node
sleep(5)
@conn2 = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
[RS.host, RS.ports[2]], :refresh_interval => 2, :background_refresh => true)
[RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
assert @conn2.secondaries == @conn.secondaries
assert_equal 3, @conn.secondary_pools.length

View File

@ -46,6 +46,7 @@ class ReplSetManager
begin
con = Mongo::Connection.new(@host, @start_port)
rescue Mongo::ConnectionFailure
con = false
end
if con && ensure_up(1, con)
@ -187,10 +188,7 @@ class ReplSetManager
def kill(node, signal=2)
pid = @mongods[node]['pid']
puts "** Killing node with pid #{pid} at port #{@mongods[node]['port']}"
begin
get_connection(node)['admin'].command({'shutdown' => 1})
rescue Mongo::ConnectionFailure
end
system("kill -2 #{pid}")
@mongods[node]['up'] = false
end
@ -300,10 +298,11 @@ class ReplSetManager
private
def initiate
puts "Initiating replica set..."
con = get_connection
attempt do
con['admin'].command({'replSetInitiate' => @config})
p con['admin'].command({'replSetInitiate' => @config})
end
con.close