Simpify replica set connection code.

This commit is contained in:
Kyle Banker 2011-08-16 16:47:07 -04:00
parent 2557a575eb
commit 3027e29f46
5 changed files with 216 additions and 147 deletions

View File

@ -35,7 +35,8 @@ module Mongo
STANDARD_HEADER_SIZE = 16
RESPONSE_HEADER_SIZE = 20
attr_reader :logger, :size, :auths, :primary, :safe, :primary_pool, :host_to_try, :pool_size
attr_reader :logger, :size, :auths, :primary, :safe, :primary_pool,
:host_to_try, :pool_size, :connect_timeout
# Counter for generating unique request ids.
@@current_request_id = 0

View File

@ -1,19 +1,133 @@
module Mongo
class Node
attr_accessor :host, :port, :address
attr_accessor :host, :port, :address, :config, :repl_set_status, :connection, :socket
def initialize(data)
data = data.split(':') if data.is_a?(String)
self.host = data[0]
self.port = data[1] ? data[1].to_i : Connection::DEFAULT_PORT
def initialize(connection, data)
self.connection = connection
if data.is_a?(String)
self.host, self.port = split_nodes(data)
else
self.host, self.port = data
end
self.address = "#{host}:#{port}"
end
def eql?(other)
other.is_a?(Node) && host == other.host && port == other.port
end
alias :== :eql?
# Create a connection to the provided node,
# and, if successful, return the socket. Otherwise,
# return nil.
def connect
begin
if self.connection.connect_timeout
Mongo::TimeoutHandler.timeout(self.connection.connect_timeout, OperationTimeout) do
socket = TCPSocket.new(self.host, self.port)
end
else
socket = TCPSocket.new(self.host, self.port)
end
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
return nil
end
self.socket = socket
end
def disconnect
if self.socket
self.socket.close
self.socket = nil
self.config = nil
end
end
def connected?
self.socket != nil
end
# Get the configuration for the provided node as returned by the
# ismaster command. Additionally, check that the replica set name
# matches with the name provided.
def set_config
begin
self.config = self.connection['admin'].command({:ismaster => 1}, :socket => self.socket)
if self.config['msg'] && @logger
self.connection.logger.warn("MONGODB #{config['msg']}")
end
check_set_name
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
return nil
end
self.config
end
# Return a list of replica set nodes from the config.
# Note: this excludes arbiters.
def node_list
connect unless connected?
return [] unless self.config
nodes = []
nodes += config['hosts'] if config['hosts']
nodes += config['passives'] if config['passives']
nodes
end
def arbiters
connect unless connected?
config['arbiters'].map do |arbiter|
split_nodes(arbiter)
end
end
def primary?
self.config['ismaster'] == true || self.config['ismaster'] == 1
end
def secondary?
self.config['secondary'] == true || self.config['secondary'] == 1
end
def host_port
[self.host, self.port]
end
def hash
address.hash
end
private
def split_nodes(host_string)
data = host_string.split(":")
host = data[0]
port = data[1].to_i || Connection::DEFAULT_PORT
[host, port]
end
# Make sure that we're connected to the expected replica set.
def check_set_name
if self.connection.replica_set_name
if !self.config['setName']
self.connection.logger.warn("MONGODB [warning] could not verify replica set name " +
"because ismaster does not return name in this version of MongoDB")
elsif self.connection.replica_set_name != self.config['setName']
raise ReplicaSetConnectionError,
"Attempting to connect to replica set '#{config['setName']}' " +
"but expected '#{self.connection.replica_set_name}'"
end
end
end
end
end

View File

@ -20,7 +20,8 @@ module Mongo
# Instantiates and manages connections to a MongoDB replica set.
class ReplSetConnection < Connection
attr_reader :nodes, :secondaries, :arbiters, :read_pool, :secondary_pools
attr_reader :nodes, :secondaries, :arbiters, :read_pool, :secondary_pools,
:replica_set_name, :members
# Create a connection to a MongoDB replica set.
#
@ -77,66 +78,68 @@ module Mongo
raise MongoArgumentError, "A ReplSetConnection requires at least one node."
end
# Get seed nodes
@nodes = args
# Get the list of seed nodes
@seeds = args
# Replica set name
@replica_set = opts[:rs_name]
# The members of the replica set, stored as instances of Mongo::Node.
@members = []
# Cache the various node types when connecting to a replica set.
@secondaries = []
@arbiters = []
# Connection pool for primay node
@primary = nil
@primary_pool = nil
# Connection pools for each secondary node
@secondaries = []
@secondary_pools = []
# The secondary pool to which we'll be sending reads.
@read_pool = nil
# A list of arbiter address (for client information only)
@arbiters = []
# Are we allowing reads from secondaries?
@read_secondary = opts.fetch(:read_secondary, false)
# Replica set name
if opts[:rs_name]
warn ":rs_name option has been deprecated and will be removed in 2.0. " +
"Please use :name instead."
@replica_set_name = opts[:rs_name]
else
@replica_set_name = opts[:name]
end
setup(opts)
end
# Create a new socket and attempt to connect to master.
# If successful, sets host and port to master and returns the socket.
#
# If connecting to a replica set, this method will replace the
# initially-provided seed list with any nodes known to the set.
#
# @raise [ConnectionFailure] if unable to connect to any host or port.
# Use the provided seed nodes to initiate a connection
# to the replica set.
def connect
close
@nodes_to_try = @nodes.clone
while connecting?
node = @nodes_to_try.shift
config = check_is_master(node)
if is_primary?(config)
set_primary(node)
else
set_auxillary(node, config)
end
end
pick_secondary_for_read if @read_secondary
connect_to_members
initialize_pools
pick_secondary_for_read
if connected?
BSON::BSON_CODER.update_max_bson_size(self)
else
if @secondary_pools.empty?
close # close any existing pools and sockets
raise ConnectionFailure, "Failed to connect any given host:port"
else
close # close any existing pools and sockets
close
if @primary.nil?
raise ConnectionFailure, "Failed to connect to primary node."
else
raise ConnectionFailure, "Failed to connect to any given member."
end
end
end
alias :reconnect :connect
def connected?
@primary_pool || (@read_pool && @read_secondary)
end
# @deprecated
def connecting?
@nodes_to_try.length > 0
false
end
# The replica set primary's host name.
@ -165,6 +168,10 @@ module Mongo
# Close the connection to the database.
def close
super
@members.each do |member|
member.disconnect
end
@members = []
@read_pool = nil
@secondary_pools.each do |pool|
pool.close
@ -172,8 +179,6 @@ module Mongo
@secondaries = []
@secondary_pools = []
@arbiters = []
@nodes_tried = []
@nodes_to_try = []
end
# If a ConnectionFailure is raised, this method will be called
@ -208,115 +213,64 @@ module Mongo
private
def check_is_master(node)
begin
host, port = *node
if @connect_timeout
Mongo::TimeoutHandler.timeout(@connect_timeout, OperationTimeout) do
socket = TCPSocket.new(host, port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
end
else
socket = TCPSocket.new(host, port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
end
config = self['admin'].command({:ismaster => 1}, :socket => socket)
check_set_name(config, socket)
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
# It's necessary to rescue here. The #connect method will keep trying
# until it has no more nodes to try and raise a ConnectionFailure if
# it can't connect to a primary.
ensure
socket.close if socket
@nodes_tried << node
if config
nodes = []
nodes += config['hosts'] if config['hosts']
nodes += config['arbiters'] if config['arbiters']
nodes += config['passives'] if config['passives']
update_node_list(nodes)
if config['msg'] && @logger
@logger.warn("MONGODB #{config['msg']}")
end
# Iterate through the list of provided seed
# nodes until we've gotten a response from the
# replica set we're trying to connect to.
#
# If we don't get a response, raise an exception.
def get_valid_seed_node
@seeds.each do |seed|
node = Mongo::Node.new(self, seed)
if node.connect && node.set_config
return node
end
end
config
raise ConnectionFailure, "Cannot connect to a replica set with name using seed nodes " +
"#{@seeds.map {|s| "#{s[0]}:#{s[1]}" }.join(',')}"
end
# Primary, when connecting to a replica can, can only be a true primary node.
# (And not a slave, which is possible when connecting with the standard
# Connection class.
def is_primary?(config)
config && (config['ismaster'] == 1 || config['ismaster'] == true)
# Connect to each member of the replica set
# as reported by the given seed node, and cache
# those connections in the @members array.
def connect_to_members
seed = get_valid_seed_node
seed.node_list.each do |host|
node = Mongo::Node.new(self, host)
if node.connect && node.set_config
@members << node
end
end
end
# Initialize the connection pools to the primary and secondary nodes.
def initialize_pools
if @members.empty?
raise ConnectionFailure, "Failed to connect to any given member."
end
@arbiters = @members.first.arbiters
@members.each do |member|
if member.primary?
@primary = member.host_port
@primary_pool = Pool.new(self, member.host, member.port, :size => @pool_size, :timeout => @timeout)
elsif member.secondary? && !@secondaries.include?(member.host_port)
@secondaries << member.host_port
@secondary_pools << Pool.new(self, member.host, member.port, :size => @pool_size, :timeout => @timeout)
end
end
end
# Pick a node randomly from the set of possible secondaries.
def pick_secondary_for_read
return unless @read_secondary
if (size = @secondary_pools.size) > 0
@read_pool = @secondary_pools[rand(size)]
end
end
# Make sure that we're connected to the expected replica set.
def check_set_name(config, socket)
if @replica_set
config = self['admin'].command({:replSetGetStatus => 1},
:socket => socket, :check_response => false)
if !Mongo::Support.ok?(config)
raise ReplicaSetConnectionError, config['errmsg']
elsif config['set'] != @replica_set
raise ReplicaSetConnectionError,
"Attempting to connect to replica set '#{config['set']}' but expected '#{@replica_set}'"
end
end
end
# Determines what kind of node we have and caches its host
# and port so that users can easily connect manually.
def set_auxillary(node, config)
if config
if config['secondary']
host, port = *node
@secondaries << node unless @secondaries.include?(node)
@secondary_pools << Pool.new(self, host, port, :size => @pool_size, :timeout => @timeout)
elsif config['arbiterOnly']
@arbiters << node unless @arbiters.include?(node)
end
end
end
# Update the list of known nodes. Only applies to replica sets,
# where the response to the ismaster command will return a list
# of known hosts.
#
# @param hosts [Array] a list of hosts, specified as string-encoded
# host-port values. Example: ["myserver-1.org:27017", "myserver-1.org:27017"]
#
# @return [Array] the updated list of nodes
def update_node_list(hosts)
new_nodes = hosts.map do |host|
if !host.respond_to?(:split)
warn "Could not parse host #{host.inspect}."
next
end
host, port = host.split(':')
[host, port ? port.to_i : Connection::DEFAULT_PORT]
end
# Replace the list of seed nodes with the canonical list.
@nodes = new_nodes.clone
@nodes_to_try = new_nodes - @nodes_tried
end
# Checkout a socket for reading (i.e., a secondary node).
def checkout_reader
connect unless connected?

View File

@ -7,7 +7,7 @@ class ReplicaSetQueryTest < Test::Unit::TestCase
include Mongo
def setup
@conn = ReplSetConnection.new([RS.host, RS.ports[0]])
@conn = ReplSetConnection.new([RS.host, RS.ports[0], RS.ports[1]])
@db = @conn.db(MONGO_TEST_DB)
@db.drop_collection("test-sets")
@coll = @db.collection("test-sets")

View File

@ -33,15 +33,15 @@ class ReplicaSetAckTest < Test::Unit::TestCase
end
def test_safe_mode_replication_ack
@col.insert({:baz => "bar"}, :safe => {:w => 2, :wtimeout => 5000})
@col.insert({:baz => "bar"}, :safe => {:w => 3, :wtimeout => 5000})
assert @col.insert({:foo => "0" * 5000}, :safe => {:w => 2, :wtimeout => 5000})
assert @col.insert({:foo => "0" * 5000}, :safe => {:w => 3, :wtimeout => 5000})
assert_equal 2, @slave1[MONGO_TEST_DB]["test-sets"].count
assert @col.update({:baz => "bar"}, {:baz => "foo"}, :safe => {:w => 2, :wtimeout => 5000})
assert @col.update({:baz => "bar"}, {:baz => "foo"}, :safe => {:w => 3, :wtimeout => 5000})
assert @slave1[MONGO_TEST_DB]["test-sets"].find_one({:baz => "foo"})
assert @col.remove({}, :safe => {:w => 2, :wtimeout => 5000})
assert @col.remove({}, :safe => {:w => 3, :wtimeout => 5000})
assert_equal 0, @slave1[MONGO_TEST_DB]["test-sets"].count
end