From 9ea718522f546fb7915bfdf0b7650837fb096a80 Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Mon, 22 Aug 2011 11:52:11 -0400 Subject: [PATCH] RUBY-291 automate local replica set reads by ping time --- lib/mongo/node.rb | 10 ++-- lib/mongo/repl_set_connection.rb | 78 +++++++++++++++++++++-------- lib/mongo/util/pool.rb | 30 ++++++++++- test/replica_sets/connect_test.rb | 2 - test/replica_sets/rs_test_helper.rb | 1 + 5 files changed, 93 insertions(+), 28 deletions(-) diff --git a/lib/mongo/node.rb b/lib/mongo/node.rb index ea938fc..81432f0 100644 --- a/lib/mongo/node.rb +++ b/lib/mongo/node.rb @@ -1,5 +1,6 @@ module Mongo class Node + attr_accessor :host, :port, :address, :config, :repl_set_status, :connection, :socket def initialize(connection, data) @@ -22,7 +23,6 @@ module Mongo # return nil. def connect begin - if self.connection.connect_timeout Mongo::TimeoutHandler.timeout(self.connection.connect_timeout, OperationTimeout) do socket = TCPSocket.new(self.host, self.port) @@ -31,8 +31,12 @@ module Mongo socket = TCPSocket.new(self.host, self.port) end - socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - rescue OperationFailure, SocketError, SystemCallError, IOError => ex + if socket.nil? + return nil + else + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + end + rescue OperationFailure, SocketError, SystemCallError, IOError => ex return nil end diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index c44baf3..ce07bc7 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -21,7 +21,7 @@ module Mongo # Instantiates and manages connections to a MongoDB replica set. class ReplSetConnection < Connection attr_reader :nodes, :secondaries, :arbiters, :read_pool, :secondary_pools, - :replica_set_name, :members + :replica_set_name, :ping_ranges # Create a connection to a MongoDB replica set. # @@ -82,7 +82,7 @@ module Mongo @seeds = args # The members of the replica set, stored as instances of Mongo::Node. - @members = [] + @nodes = [] # Connection pool for primay node @primary = nil @@ -98,6 +98,9 @@ module Mongo # A list of arbiter address (for client information only) @arbiters = [] + # An array mapping secondaries by proximity + @ping_ranges = Array.new(3) { |i| Array.new } + # Are we allowing reads from secondaries? @read_secondary = opts.fetch(:read_secondary, false) @@ -118,9 +121,10 @@ module Mongo def connect connect_to_members initialize_pools - pick_secondary_for_read if connected? + choose_node_for_reads + update_seed_list BSON::BSON_CODER.update_max_bson_size(self) else close @@ -134,7 +138,7 @@ module Mongo end def connected? - @primary_pool || (@read_pool && @read_secondary) + @primary_pool || (!@secondary_pools.empty? && @read_secondary) end # @deprecated @@ -168,10 +172,10 @@ module Mongo # Close the connection to the database. def close super - @members.each do |member| + @nodes.each do |member| member.disconnect end - @members = [] + @nodes = [] @read_pool = nil @secondary_pools.each do |pool| pool.close @@ -226,56 +230,86 @@ module Mongo end end - raise ConnectionFailure, "Cannot connect to a replica set with name using seed nodes " + - "#{@seeds.map {|s| "#{s[0]}:#{s[1]}" }.join(',')}" + raise ConnectionFailure, "Cannot connect to a replica set using seeds " + + "#{@seeds.map {|s| "#{s[0]}:#{s[1]}" }.join(', ')}" end # Connect to each member of the replica set # as reported by the given seed node, and cache - # those connections in the @members array. + # those connections in the @nodes array. def connect_to_members seed = get_valid_seed_node seed.node_list.each do |host| node = Mongo::Node.new(self, host) if node.connect && node.set_config - @members << node + @nodes << node end end end # Initialize the connection pools to the primary and secondary nodes. def initialize_pools - if @members.empty? + if @nodes.empty? raise ConnectionFailure, "Failed to connect to any given member." end - @arbiters = @members.first.arbiters + @arbiters = @nodes.first.arbiters - @members.each do |member| + @nodes.each do |member| if member.primary? @primary = member.host_port - @primary_pool = Pool.new(self, member.host, member.port, :size => @pool_size, :timeout => @timeout) + @primary_pool = Pool.new(self, member.host, member.port, + :size => @pool_size, + :timeout => @timeout, + :node => member) elsif member.secondary? && !@secondaries.include?(member.host_port) @secondaries << member.host_port - @secondary_pools << Pool.new(self, member.host, member.port, :size => @pool_size, :timeout => @timeout) + @secondary_pools << Pool.new(self, member.host, member.port, + :size => @pool_size, + :timeout => @timeout, + :node => member) end end end - # Pick a node randomly from the set of possible secondaries. - def pick_secondary_for_read - return unless @read_secondary - if (size = @secondary_pools.size) > 0 - @read_pool = @secondary_pools[rand(size)] + # Pick a node from the set of possible secondaries. + # If more than one node is available, use the ping + # time to figure out which nodes to choose from. + def choose_node_for_reads + return if @secondary_pools.empty? + + if @secondary_pools.size == 1 + @read_pool = @secondary_pools.first + else + @secondary_pools.each do |pool| + case pool.ping_time + when 0..150 + @ping_ranges[0] << pool + when 150..1000 + @ping_ranges[1] << pool + else + @ping_ranges[2] << pool + end + end + + for list in @ping_ranges do + break if !list.empty? + end + + @read_pool = list[rand(list.length)] end end + def update_seed_list + @seeds = @nodes.map { |n| n.host_port } + end + # Checkout a socket for reading (i.e., a secondary node). def checkout_reader connect unless connected? - if @read_pool + if @read_secondary && @read_pool @read_pool.checkout else checkout_writer @@ -291,7 +325,7 @@ module Mongo # Checkin a socket used for reading. def checkin_reader(socket) - if @read_pool + if @read_secondary && @read_pool @read_pool.checkin(socket) else checkin_writer(socket) diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index fae29f0..8745473 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -17,8 +17,9 @@ module Mongo class Pool + PING_ATTEMPTS = 6 - attr_accessor :host, :port, :size, :timeout, :safe, :checked_out + attr_accessor :host, :port, :size, :timeout, :safe, :checked_out, :connection # Create a new pool of connections. # @@ -27,6 +28,9 @@ module Mongo @host, @port = host, port + # A Mongo::Node object. + @node = opts[:node] + # Pool size and timeout. @size = opts[:size] || 1 @timeout = opts[:timeout] || 5.0 @@ -59,6 +63,30 @@ module Mongo @checked_out.clear end + # Return the time it takes on average + # to do a round-trip against this node. + def ping_time + trials = [] + begin + PING_ATTEMPTS.times do + t1 = Time.now + self.connection['admin'].command({:ping => 1}, :socket => @node.socket) + trials << (Time.now - t1) * 1000 + end + rescue OperationFailure, SocketError, SystemCallError, IOError => ex + return nil + end + + trials.sort! + trials.delete_at(trials.length-1) + trials.delete_at(0) + + total = 0.0 + trials.each { |t| total += t } + + (total / trials.length).floor + end + # Return a socket to the pool. def checkin(socket) @connection_mutex.synchronize do diff --git a/test/replica_sets/connect_test.rb b/test/replica_sets/connect_test.rb index 312b3da..532435a 100644 --- a/test/replica_sets/connect_test.rb +++ b/test/replica_sets/connect_test.rb @@ -46,8 +46,6 @@ class ConnectTest < Test::Unit::TestCase @conn = ReplSetConnection.new([RS.host, RS.ports[1]], [RS.host, RS.ports[0]], [RS.host, RS.ports[2]], :name => RS.name) assert @conn.connected? - assert @conn.read_primary? - assert @conn.primary? assert_equal RS.primary, @conn.primary assert_equal RS.secondaries.sort, @conn.secondaries.sort diff --git a/test/replica_sets/rs_test_helper.rb b/test/replica_sets/rs_test_helper.rb index 0864cf0..b517b83 100644 --- a/test/replica_sets/rs_test_helper.rb +++ b/test/replica_sets/rs_test_helper.rb @@ -17,6 +17,7 @@ class Test::Unit::TestCase yield rescue Mongo::ConnectionFailure => ex puts "Rescue attempt #{retries}: from #{ex}" + puts ex.backtrace retries += 1 raise ex if retries > max_retries sleep(1)