RUBY-320 ReplSetConnection now caches tags and maps them to pools for reading.
This commit is contained in:
parent
45c40e7267
commit
8b2de82464
@ -12,6 +12,7 @@ module Mongo
|
||||
self.port = data[1].nil? ? Connection::DEFAULT_PORT : data[1].to_i
|
||||
end
|
||||
self.address = "#{host}:#{port}"
|
||||
self.config = nil
|
||||
end
|
||||
|
||||
def eql?(other)
|
||||
@ -98,7 +99,7 @@ module Mongo
|
||||
# Note: this excludes arbiters.
|
||||
def node_list
|
||||
connect unless connected?
|
||||
set_config
|
||||
set_config unless self.config
|
||||
|
||||
return [] unless config
|
||||
|
||||
@ -110,6 +111,7 @@ module Mongo
|
||||
|
||||
def arbiters
|
||||
connect unless connected?
|
||||
set_config unless self.config
|
||||
return [] unless config['arbiters']
|
||||
|
||||
config['arbiters'].map do |arbiter|
|
||||
@ -117,6 +119,14 @@ module Mongo
|
||||
end
|
||||
end
|
||||
|
||||
def tags
|
||||
connect unless connected?
|
||||
set_config unless self.config
|
||||
return {} unless config['tags'] && !config['tags'].empty?
|
||||
|
||||
config['tags']
|
||||
end
|
||||
|
||||
def primary?
|
||||
self.config['ismaster'] == true || self.config['ismaster'] == 1
|
||||
end
|
||||
|
@ -21,7 +21,7 @@ module Mongo
|
||||
# Instantiates and manages connections to a MongoDB replica set.
|
||||
class ReplSetConnection < Connection
|
||||
attr_reader :nodes, :secondaries, :arbiters, :secondary_pools,
|
||||
:replica_set_name, :read_pool, :seeds
|
||||
:replica_set_name, :read_pool, :seeds, :tags_to_pools
|
||||
|
||||
# Create a connection to a MongoDB replica set.
|
||||
#
|
||||
@ -172,6 +172,7 @@ module Mongo
|
||||
@primary_pool = manager.primary_pool
|
||||
@read_pool = manager.read_pool
|
||||
@secondary_pools = manager.secondary_pools
|
||||
@tags_to_pools = manager.tags_to_pools
|
||||
@seeds = manager.seeds
|
||||
@manager = manager
|
||||
@hosts = manager.hosts
|
||||
@ -334,9 +335,10 @@ module Mongo
|
||||
# an exception.
|
||||
def checkout_tagged(tags)
|
||||
tags.each do |k, v|
|
||||
if pool = @tags_to_pools[{k.to_s => v}]
|
||||
socket = pool.checkout
|
||||
@sockets_to_pools[socket] = pool
|
||||
pools = @tags_to_pools[{k => v}]
|
||||
if !pools.empty?
|
||||
socket = pools.first.checkout
|
||||
@sockets_to_pools[socket] = pools.first
|
||||
return socket
|
||||
end
|
||||
end
|
||||
|
@ -47,6 +47,8 @@ module Mongo
|
||||
@sockets = []
|
||||
@pids = {}
|
||||
@checked_out = []
|
||||
@ping_time = nil
|
||||
@last_ping = nil
|
||||
end
|
||||
|
||||
def close
|
||||
@ -71,9 +73,23 @@ module Mongo
|
||||
[@host, @port]
|
||||
end
|
||||
|
||||
# Refresh ping time only if we haven't
|
||||
# checked within the last five minutes.
|
||||
def ping_time
|
||||
if !@last_ping
|
||||
@last_ping = Time.now
|
||||
@ping_time = refresh_ping_time
|
||||
elsif Time.now - @last_ping > 300
|
||||
@last_ping = Time.now
|
||||
@ping_time = refresh_ping_time
|
||||
else
|
||||
@ping_time
|
||||
end
|
||||
end
|
||||
|
||||
# Return the time it takes on average
|
||||
# to do a round-trip against this node.
|
||||
def ping_time
|
||||
def refresh_ping_time
|
||||
trials = []
|
||||
begin
|
||||
PING_ATTEMPTS.times do
|
||||
@ -86,13 +102,15 @@ module Mongo
|
||||
end
|
||||
|
||||
trials.sort!
|
||||
|
||||
# Delete shortest and longest times
|
||||
trials.delete_at(trials.length-1)
|
||||
trials.delete_at(0)
|
||||
|
||||
total = 0.0
|
||||
trials.each { |t| total += t }
|
||||
|
||||
(total / trials.length).floor
|
||||
(total / trials.length).ceil
|
||||
end
|
||||
|
||||
# Return a socket to the pool.
|
||||
|
@ -2,7 +2,8 @@ module Mongo
|
||||
class PoolManager
|
||||
|
||||
attr_reader :connection, :seeds, :arbiters, :primary, :secondaries,
|
||||
:primary_pool, :read_pool, :secondary_pools, :hosts, :nodes, :max_bson_size
|
||||
:primary_pool, :read_pool, :secondary_pools, :hosts, :nodes, :max_bson_size,
|
||||
:tags_to_pools
|
||||
|
||||
def initialize(connection, seeds)
|
||||
@connection = connection
|
||||
@ -85,8 +86,6 @@ module Mongo
|
||||
|
||||
private
|
||||
|
||||
# Note that @arbiters and @read_pool will be
|
||||
# assigned automatically.
|
||||
def reference_manager_data(manager)
|
||||
@primary = manager.primary
|
||||
@primary_pool = manager.primary_pool
|
||||
@ -106,6 +105,7 @@ module Mongo
|
||||
@secondary_pools = []
|
||||
@hosts = []
|
||||
@members = []
|
||||
@tags_to_pools = {}
|
||||
end
|
||||
|
||||
# Connect to each member of the replica set
|
||||
@ -130,6 +130,23 @@ module Mongo
|
||||
members
|
||||
end
|
||||
|
||||
def associate_tags_with_pool(tags, pool)
|
||||
tags.each_key do |key|
|
||||
@tags_to_pools[{key => tags[key]}] ||= []
|
||||
@tags_to_pools[{key => tags[key]}] << pool
|
||||
end
|
||||
end
|
||||
|
||||
# Sort each tag pool entry in descending order
|
||||
# according to ping time.
|
||||
def sort_tag_pools!
|
||||
@tags_to_pools.each_value do |pool_list|
|
||||
pool_list.sort! do |a, b|
|
||||
a.ping_time <=> b.ping_time
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Initialize the connection pools for the primary and secondary nodes.
|
||||
def initialize_pools(members)
|
||||
members.each do |member|
|
||||
@ -141,15 +158,19 @@ module Mongo
|
||||
:size => self.connection.pool_size,
|
||||
:timeout => self.connection.connect_timeout,
|
||||
:node => member)
|
||||
associate_tags_with_pool(member.tags, @primary_pool)
|
||||
elsif member.secondary? && !@secondaries.include?(member.host_port)
|
||||
@secondaries << member.host_port
|
||||
@secondary_pools << Pool.new(self.connection, member.host, member.port,
|
||||
pool = Pool.new(self.connection, member.host, member.port,
|
||||
:size => self.connection.pool_size,
|
||||
:timeout => self.connection.connect_timeout,
|
||||
:node => member)
|
||||
@secondary_pools << pool
|
||||
associate_tags_with_pool(member.tags, pool)
|
||||
end
|
||||
end
|
||||
|
||||
sort_tag_pools!
|
||||
@max_bson_size = members.first.config['maxBsonObjectSize'] ||
|
||||
Mongo::DEFAULT_MAX_BSON_SIZE
|
||||
@arbiters = members.first.arbiters
|
||||
|
@ -10,7 +10,7 @@ end
|
||||
|
||||
class ReplSetManager
|
||||
|
||||
attr_accessor :host, :start_port, :ports, :name, :mongods
|
||||
attr_accessor :host, :start_port, :ports, :name, :mongods, :tags, :version
|
||||
|
||||
def initialize(opts={})
|
||||
@start_port = opts[:start_port] || 30000
|
||||
@ -21,6 +21,10 @@ class ReplSetManager
|
||||
@config = {"_id" => @name, "members" => []}
|
||||
@durable = opts.fetch(:durable, false)
|
||||
@path = File.join(File.expand_path(File.dirname(__FILE__)), "data")
|
||||
@oplog_size = opts.fetch(:oplog_size, 512)
|
||||
@tags = [{"dc" => "ny", "rack" => "a", "db" => "main"},
|
||||
{"dc" => "ny", "rack" => "b", "db" => "main"},
|
||||
{"dc" => "sf", "rack" => "a", "db" => "main"}]
|
||||
|
||||
@arbiter_count = opts[:arbiter_count] || 2
|
||||
@secondary_count = opts[:secondary_count] || 2
|
||||
@ -33,6 +37,9 @@ class ReplSetManager
|
||||
end
|
||||
|
||||
@mongods = {}
|
||||
version_string = `mongod --version`
|
||||
version_string =~ /(\d\.\d\.\d)/
|
||||
@version = $1.split(".").map {|d| d.to_i }
|
||||
end
|
||||
|
||||
def start_set
|
||||
@ -42,7 +49,11 @@ class ReplSetManager
|
||||
|
||||
n = 0
|
||||
(@primary_count + @secondary_count).times do
|
||||
init_node(n)
|
||||
init_node(n) do |attrs|
|
||||
if @version[0] >= 2
|
||||
attrs['tags'] = @tags[n % @tags.size]
|
||||
end
|
||||
end
|
||||
n += 1
|
||||
end
|
||||
|
||||
@ -96,9 +107,21 @@ class ReplSetManager
|
||||
@config['members'] << member
|
||||
end
|
||||
|
||||
def journal_switch
|
||||
if @version[0] >= 2
|
||||
if @durable
|
||||
"--journal"
|
||||
else
|
||||
"--nojournal"
|
||||
end
|
||||
elsif @durable
|
||||
"--journal"
|
||||
end
|
||||
end
|
||||
|
||||
def start_cmd(n)
|
||||
@mongods[n]['start'] = "mongod --replSet #{@name} --logpath '#{@mongods[n]['log_path']}' " +
|
||||
" --dbpath #{@mongods[n]['db_path']} --port #{@mongods[n]['port']} --fork"
|
||||
"--oplogSize #{@oplog_size} #{journal_switch} --dbpath #{@mongods[n]['db_path']} --port #{@mongods[n]['port']} --fork"
|
||||
@mongods[n]['start'] += " --dur" if @durable
|
||||
@mongods[n]['start']
|
||||
end
|
||||
|
Loading…
Reference in New Issue
Block a user