RUBY-291 automate local replica set reads by ping time

This commit is contained in:
Kyle Banker 2011-08-22 11:52:11 -04:00
parent 3027e29f46
commit 9ea718522f
5 changed files with 93 additions and 28 deletions

View File

@ -1,5 +1,6 @@
module Mongo module Mongo
class Node class Node
attr_accessor :host, :port, :address, :config, :repl_set_status, :connection, :socket attr_accessor :host, :port, :address, :config, :repl_set_status, :connection, :socket
def initialize(connection, data) def initialize(connection, data)
@ -22,7 +23,6 @@ module Mongo
# return nil. # return nil.
def connect def connect
begin begin
if self.connection.connect_timeout if self.connection.connect_timeout
Mongo::TimeoutHandler.timeout(self.connection.connect_timeout, OperationTimeout) do Mongo::TimeoutHandler.timeout(self.connection.connect_timeout, OperationTimeout) do
socket = TCPSocket.new(self.host, self.port) socket = TCPSocket.new(self.host, self.port)
@ -31,7 +31,11 @@ module Mongo
socket = TCPSocket.new(self.host, self.port) socket = TCPSocket.new(self.host, self.port)
end end
if socket.nil?
return nil
else
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
end
rescue OperationFailure, SocketError, SystemCallError, IOError => ex rescue OperationFailure, SocketError, SystemCallError, IOError => ex
return nil return nil
end end

View File

@ -21,7 +21,7 @@ module Mongo
# Instantiates and manages connections to a MongoDB replica set. # Instantiates and manages connections to a MongoDB replica set.
class ReplSetConnection < Connection class ReplSetConnection < Connection
attr_reader :nodes, :secondaries, :arbiters, :read_pool, :secondary_pools, attr_reader :nodes, :secondaries, :arbiters, :read_pool, :secondary_pools,
:replica_set_name, :members :replica_set_name, :ping_ranges
# Create a connection to a MongoDB replica set. # Create a connection to a MongoDB replica set.
# #
@ -82,7 +82,7 @@ module Mongo
@seeds = args @seeds = args
# The members of the replica set, stored as instances of Mongo::Node. # The members of the replica set, stored as instances of Mongo::Node.
@members = [] @nodes = []
# Connection pool for primay node # Connection pool for primay node
@primary = nil @primary = nil
@ -98,6 +98,9 @@ module Mongo
# A list of arbiter address (for client information only) # A list of arbiter address (for client information only)
@arbiters = [] @arbiters = []
# An array mapping secondaries by proximity
@ping_ranges = Array.new(3) { |i| Array.new }
# Are we allowing reads from secondaries? # Are we allowing reads from secondaries?
@read_secondary = opts.fetch(:read_secondary, false) @read_secondary = opts.fetch(:read_secondary, false)
@ -118,9 +121,10 @@ module Mongo
def connect def connect
connect_to_members connect_to_members
initialize_pools initialize_pools
pick_secondary_for_read
if connected? if connected?
choose_node_for_reads
update_seed_list
BSON::BSON_CODER.update_max_bson_size(self) BSON::BSON_CODER.update_max_bson_size(self)
else else
close close
@ -134,7 +138,7 @@ module Mongo
end end
def connected? def connected?
@primary_pool || (@read_pool && @read_secondary) @primary_pool || (!@secondary_pools.empty? && @read_secondary)
end end
# @deprecated # @deprecated
@ -168,10 +172,10 @@ module Mongo
# Close the connection to the database. # Close the connection to the database.
def close def close
super super
@members.each do |member| @nodes.each do |member|
member.disconnect member.disconnect
end end
@members = [] @nodes = []
@read_pool = nil @read_pool = nil
@secondary_pools.each do |pool| @secondary_pools.each do |pool|
pool.close pool.close
@ -226,56 +230,86 @@ module Mongo
end end
end end
raise ConnectionFailure, "Cannot connect to a replica set with name using seed nodes " + raise ConnectionFailure, "Cannot connect to a replica set using seeds " +
"#{@seeds.map {|s| "#{s[0]}:#{s[1]}" }.join(', ')}" "#{@seeds.map {|s| "#{s[0]}:#{s[1]}" }.join(', ')}"
end end
# Connect to each member of the replica set # Connect to each member of the replica set
# as reported by the given seed node, and cache # as reported by the given seed node, and cache
# those connections in the @members array. # those connections in the @nodes array.
def connect_to_members def connect_to_members
seed = get_valid_seed_node seed = get_valid_seed_node
seed.node_list.each do |host| seed.node_list.each do |host|
node = Mongo::Node.new(self, host) node = Mongo::Node.new(self, host)
if node.connect && node.set_config if node.connect && node.set_config
@members << node @nodes << node
end end
end end
end end
# Initialize the connection pools to the primary and secondary nodes. # Initialize the connection pools to the primary and secondary nodes.
def initialize_pools def initialize_pools
if @members.empty? if @nodes.empty?
raise ConnectionFailure, "Failed to connect to any given member." raise ConnectionFailure, "Failed to connect to any given member."
end end
@arbiters = @members.first.arbiters @arbiters = @nodes.first.arbiters
@members.each do |member| @nodes.each do |member|
if member.primary? if member.primary?
@primary = member.host_port @primary = member.host_port
@primary_pool = Pool.new(self, member.host, member.port, :size => @pool_size, :timeout => @timeout) @primary_pool = Pool.new(self, member.host, member.port,
:size => @pool_size,
:timeout => @timeout,
:node => member)
elsif member.secondary? && !@secondaries.include?(member.host_port) elsif member.secondary? && !@secondaries.include?(member.host_port)
@secondaries << member.host_port @secondaries << member.host_port
@secondary_pools << Pool.new(self, member.host, member.port, :size => @pool_size, :timeout => @timeout) @secondary_pools << Pool.new(self, member.host, member.port,
:size => @pool_size,
:timeout => @timeout,
:node => member)
end end
end end
end end
# Pick a node randomly from the set of possible secondaries. # Pick a node from the set of possible secondaries.
def pick_secondary_for_read # If more than one node is available, use the ping
return unless @read_secondary # time to figure out which nodes to choose from.
if (size = @secondary_pools.size) > 0 def choose_node_for_reads
@read_pool = @secondary_pools[rand(size)] return if @secondary_pools.empty?
if @secondary_pools.size == 1
@read_pool = @secondary_pools.first
else
@secondary_pools.each do |pool|
case pool.ping_time
when 0..150
@ping_ranges[0] << pool
when 150..1000
@ping_ranges[1] << pool
else
@ping_ranges[2] << pool
end end
end end
for list in @ping_ranges do
break if !list.empty?
end
@read_pool = list[rand(list.length)]
end
end
def update_seed_list
@seeds = @nodes.map { |n| n.host_port }
end
# Checkout a socket for reading (i.e., a secondary node). # Checkout a socket for reading (i.e., a secondary node).
def checkout_reader def checkout_reader
connect unless connected? connect unless connected?
if @read_pool if @read_secondary && @read_pool
@read_pool.checkout @read_pool.checkout
else else
checkout_writer checkout_writer
@ -291,7 +325,7 @@ module Mongo
# Checkin a socket used for reading. # Checkin a socket used for reading.
def checkin_reader(socket) def checkin_reader(socket)
if @read_pool if @read_secondary && @read_pool
@read_pool.checkin(socket) @read_pool.checkin(socket)
else else
checkin_writer(socket) checkin_writer(socket)

View File

@ -17,8 +17,9 @@
module Mongo module Mongo
class Pool class Pool
PING_ATTEMPTS = 6
attr_accessor :host, :port, :size, :timeout, :safe, :checked_out attr_accessor :host, :port, :size, :timeout, :safe, :checked_out, :connection
# Create a new pool of connections. # Create a new pool of connections.
# #
@ -27,6 +28,9 @@ module Mongo
@host, @port = host, port @host, @port = host, port
# A Mongo::Node object.
@node = opts[:node]
# Pool size and timeout. # Pool size and timeout.
@size = opts[:size] || 1 @size = opts[:size] || 1
@timeout = opts[:timeout] || 5.0 @timeout = opts[:timeout] || 5.0
@ -59,6 +63,30 @@ module Mongo
@checked_out.clear @checked_out.clear
end end
# Return the time it takes on average
# to do a round-trip against this node.
def ping_time
trials = []
begin
PING_ATTEMPTS.times do
t1 = Time.now
self.connection['admin'].command({:ping => 1}, :socket => @node.socket)
trials << (Time.now - t1) * 1000
end
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
return nil
end
trials.sort!
trials.delete_at(trials.length-1)
trials.delete_at(0)
total = 0.0
trials.each { |t| total += t }
(total / trials.length).floor
end
# Return a socket to the pool. # Return a socket to the pool.
def checkin(socket) def checkin(socket)
@connection_mutex.synchronize do @connection_mutex.synchronize do

View File

@ -46,8 +46,6 @@ class ConnectTest < Test::Unit::TestCase
@conn = ReplSetConnection.new([RS.host, RS.ports[1]], [RS.host, RS.ports[0]], @conn = ReplSetConnection.new([RS.host, RS.ports[1]], [RS.host, RS.ports[0]],
[RS.host, RS.ports[2]], :name => RS.name) [RS.host, RS.ports[2]], :name => RS.name)
assert @conn.connected? assert @conn.connected?
assert @conn.read_primary?
assert @conn.primary?
assert_equal RS.primary, @conn.primary assert_equal RS.primary, @conn.primary
assert_equal RS.secondaries.sort, @conn.secondaries.sort assert_equal RS.secondaries.sort, @conn.secondaries.sort

View File

@ -17,6 +17,7 @@ class Test::Unit::TestCase
yield yield
rescue Mongo::ConnectionFailure => ex rescue Mongo::ConnectionFailure => ex
puts "Rescue attempt #{retries}: from #{ex}" puts "Rescue attempt #{retries}: from #{ex}"
puts ex.backtrace
retries += 1 retries += 1
raise ex if retries > max_retries raise ex if retries > max_retries
sleep(1) sleep(1)