Added some read preference tests; refactoring; nice Pool#inspect.
This commit is contained in:
parent
2ff4169a50
commit
fbeea87c47
|
@ -349,11 +349,11 @@ module Mongo
|
|||
def checkout_tagged(tags)
|
||||
sync_synchronize(:SH) do
|
||||
tags.each do |k, v|
|
||||
pools = @tags_to_pools[{k => v}]
|
||||
if !pools.empty?
|
||||
socket = pools.first.checkout
|
||||
@sockets_to_pools[socket] = pools.first
|
||||
socket
|
||||
pool = @tags_to_pools[{k.to_s => v}]
|
||||
if pool
|
||||
socket = pool.checkout
|
||||
@sockets_to_pools[socket] = pool
|
||||
return socket
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -65,6 +65,11 @@ module Mongo
|
|||
@checked_out.clear
|
||||
end
|
||||
|
||||
def inspect
|
||||
"#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " +
|
||||
"@ping_time=#{ping_time} #{@checked_out.size}/#{@size} sockets available.>"
|
||||
end
|
||||
|
||||
def host_string
|
||||
"#{@host}:#{@port}"
|
||||
end
|
||||
|
|
|
@ -84,16 +84,6 @@ module Mongo
|
|||
end
|
||||
end
|
||||
|
||||
# Sort each tag pool entry in descending order
|
||||
# according to ping time.
|
||||
def sort_tag_pools!
|
||||
@tags_to_pools.each_value do |pool_list|
|
||||
pool_list.sort! do |a, b|
|
||||
a.ping_time <=> b.ping_time
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Initialize the connection pools for the primary and secondary nodes.
|
||||
def initialize_pools(members)
|
||||
members.each do |member|
|
||||
|
@ -117,24 +107,43 @@ module Mongo
|
|||
end
|
||||
end
|
||||
|
||||
sort_tag_pools!
|
||||
|
||||
@max_bson_size = members.first.config['maxBsonObjectSize'] ||
|
||||
Mongo::DEFAULT_MAX_BSON_SIZE
|
||||
@arbiters = members.first.arbiters
|
||||
choose_read_pool
|
||||
|
||||
set_read_pool
|
||||
set_primary_tag_pools
|
||||
end
|
||||
|
||||
# If there's more than one pool associated with
|
||||
# a given tag, choose a close one using the bucket method.
|
||||
def set_primary_tag_pools
|
||||
@tags_to_pools.each do |k, pool_list|
|
||||
if pool_list.length == 1
|
||||
@tags_to_pools[k] = pool_list.first
|
||||
else
|
||||
@tags_to_pools[k] = nearby_pool_from_set(pool_list)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Pick a node from the set of possible secondaries.
|
||||
# If more than one node is available, use the ping
|
||||
# time to figure out which nodes to choose from.
|
||||
def choose_read_pool
|
||||
def set_read_pool
|
||||
if @secondary_pools.empty?
|
||||
@read_pool = @primary_pool
|
||||
@read_pool = @primary_pool
|
||||
elsif @secondary_pools.size == 1
|
||||
@read_pool = @secondary_pools[0]
|
||||
@read_pool = @secondary_pools[0]
|
||||
else
|
||||
ping_ranges = Array.new(3) { |i| Array.new }
|
||||
@secondary_pools.each do |pool|
|
||||
@read_pool = nearby_pool_from_set(@secondary_pools)
|
||||
end
|
||||
end
|
||||
|
||||
def nearby_pool_from_set(pool_set)
|
||||
ping_ranges = Array.new(3) { |i| Array.new }
|
||||
pool_set.each do |pool|
|
||||
case pool.ping_time
|
||||
when 0..150
|
||||
ping_ranges[0] << pool
|
||||
|
@ -149,8 +158,7 @@ module Mongo
|
|||
break if !list.empty?
|
||||
end
|
||||
|
||||
@read_pool = list[rand(list.length)]
|
||||
end
|
||||
list[rand(list.length)]
|
||||
end
|
||||
|
||||
# Iterate through the list of provided seed
|
||||
|
|
|
@ -7,7 +7,7 @@ class ReplicaSetQuerySecondariesTest < Test::Unit::TestCase
|
|||
include Mongo
|
||||
|
||||
def setup
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], :read_secondary => true)
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], :read => :secondary)
|
||||
@db = @conn.db(MONGO_TEST_DB)
|
||||
@db.drop_collection("test-sets")
|
||||
end
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
||||
require './test/replica_sets/rs_test_helper'
|
||||
|
||||
class ReadPreferenceTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
|
||||
def setup
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0], RS.host, RS.ports[1]], :read => :secondary, :pool_size => 50)
|
||||
@db = @conn.db(MONGO_TEST_DB)
|
||||
@db.drop_collection("test-sets")
|
||||
end
|
||||
|
||||
def test_query_tagged
|
||||
col = @db['mongo-test']
|
||||
|
||||
col.insert({:a => 1}, :safe => {:w => 3})
|
||||
col.find_one({}, :read => {:db => "main"})
|
||||
col.find_one({}, :read => {:dc => "ny"})
|
||||
col.find_one({}, :read => {:dc => "sf"})
|
||||
|
||||
assert_raise Mongo::NodeWithTagsNotFound do
|
||||
col.find_one({}, :read => {:foo => "bar"})
|
||||
end
|
||||
|
||||
threads = []
|
||||
100.times do
|
||||
threads << Thread.new do
|
||||
col.find_one({}, :read => {:dc => "sf"})
|
||||
end
|
||||
end
|
||||
|
||||
threads.each {|t| t.join }
|
||||
|
||||
col.remove
|
||||
end
|
||||
|
||||
def teardown
|
||||
RS.restart_killed_nodes
|
||||
end
|
||||
|
||||
end
|
|
@ -43,13 +43,23 @@ class ReplSetManager
|
|||
end
|
||||
|
||||
def start_set
|
||||
puts "** Starting a replica set with #{@count} nodes"
|
||||
begin
|
||||
con = Mongo::Connection.new(@host, @start_port)
|
||||
rescue Mongo::ConnectionFailure
|
||||
end
|
||||
|
||||
system("killall mongod")
|
||||
if con && ensure_up(1, con)
|
||||
should_start = false
|
||||
puts "** Replica set already started."
|
||||
else
|
||||
should_start = true
|
||||
system("killall mongod")
|
||||
puts "** Starting a replica set with #{@count} nodes"
|
||||
end
|
||||
|
||||
n = 0
|
||||
(@primary_count + @secondary_count).times do
|
||||
init_node(n) do |attrs|
|
||||
init_node(n, should_start) do |attrs|
|
||||
if @version[0] >= 2
|
||||
attrs['tags'] = @tags[n % @tags.size]
|
||||
end
|
||||
|
@ -58,21 +68,28 @@ class ReplSetManager
|
|||
end
|
||||
|
||||
@passive_count.times do
|
||||
init_node(n) do |attrs|
|
||||
init_node(n, should_start) do |attrs|
|
||||
attrs['priority'] = 0
|
||||
end
|
||||
n += 1
|
||||
end
|
||||
|
||||
@arbiter_count.times do
|
||||
init_node(n) do |attrs|
|
||||
init_node(n, should_start) do |attrs|
|
||||
attrs['arbiterOnly'] = true
|
||||
end
|
||||
n += 1
|
||||
end
|
||||
|
||||
initiate
|
||||
ensure_up
|
||||
if con && ensure_up(1, con)
|
||||
@mongods.each do |k, v|
|
||||
v['up'] = true
|
||||
v['pid'] = File.open(File.join(v['db_path'], 'mongod.lock')).read.strip
|
||||
end
|
||||
else
|
||||
initiate
|
||||
ensure_up
|
||||
end
|
||||
end
|
||||
|
||||
def cleanup_set
|
||||
|
@ -82,18 +99,20 @@ class ReplSetManager
|
|||
end
|
||||
end
|
||||
|
||||
def init_node(n)
|
||||
def init_node(n, should_start=true)
|
||||
@mongods[n] ||= {}
|
||||
port = @start_port + n
|
||||
@ports << port
|
||||
@mongods[n]['port'] = port
|
||||
@mongods[n]['db_path'] = get_path("rs-#{port}")
|
||||
@mongods[n]['log_path'] = get_path("log-#{port}")
|
||||
system("rm -rf #{@mongods[n]['db_path']}")
|
||||
system("mkdir -p #{@mongods[n]['db_path']}")
|
||||
|
||||
@mongods[n]['start'] = start_cmd(n)
|
||||
start(n)
|
||||
|
||||
if should_start
|
||||
system("rm -rf #{@mongods[n]['db_path']}")
|
||||
system("mkdir -p #{@mongods[n]['db_path']}")
|
||||
start(n)
|
||||
end
|
||||
|
||||
member = {'_id' => n, 'host' => "#{@host}:#{@mongods[n]['port']}"}
|
||||
|
||||
|
@ -231,11 +250,11 @@ class ReplSetManager
|
|||
end
|
||||
alias :restart :start
|
||||
|
||||
def ensure_up
|
||||
def ensure_up(n=nil, connection=nil)
|
||||
print "** Ensuring members are up..."
|
||||
|
||||
attempt do
|
||||
con = get_connection
|
||||
attempt(n) do
|
||||
con = connection || get_connection
|
||||
status = con['admin'].command({'replSetGetStatus' => 1})
|
||||
print "."
|
||||
if status['members'].all? { |m| m['health'] == 1 &&
|
||||
|
@ -249,6 +268,8 @@ class ReplSetManager
|
|||
raise Mongo::OperationFailure
|
||||
end
|
||||
end
|
||||
|
||||
return false
|
||||
end
|
||||
|
||||
def primary
|
||||
|
@ -336,11 +357,11 @@ class ReplSetManager
|
|||
File.join(@path, name)
|
||||
end
|
||||
|
||||
def attempt
|
||||
def attempt(retries=nil)
|
||||
raise "No block given!" unless block_given?
|
||||
count = 0
|
||||
|
||||
while count < @retries do
|
||||
while count < (retries || @retries) do
|
||||
begin
|
||||
return yield
|
||||
rescue Mongo::OperationFailure, Mongo::ConnectionFailure => ex
|
||||
|
|
Loading…
Reference in New Issue