diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 6eeab0c..33bbd7d 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -35,7 +35,8 @@ module Mongo STANDARD_HEADER_SIZE = 16 RESPONSE_HEADER_SIZE = 20 - attr_reader :logger, :size, :auths, :primary, :safe, :primary_pool, :host_to_try, :pool_size + attr_reader :logger, :size, :auths, :primary, :safe, :primary_pool, + :host_to_try, :pool_size, :connect_timeout # Counter for generating unique request ids. @@current_request_id = 0 diff --git a/lib/mongo/node.rb b/lib/mongo/node.rb index 375c452..ea938fc 100644 --- a/lib/mongo/node.rb +++ b/lib/mongo/node.rb @@ -1,19 +1,133 @@ module Mongo class Node - attr_accessor :host, :port, :address - - def initialize(data) - data = data.split(':') if data.is_a?(String) - self.host = data[0] - self.port = data[1] ? data[1].to_i : Connection::DEFAULT_PORT + attr_accessor :host, :port, :address, :config, :repl_set_status, :connection, :socket + + def initialize(connection, data) + self.connection = connection + if data.is_a?(String) + self.host, self.port = split_nodes(data) + else + self.host, self.port = data + end self.address = "#{host}:#{port}" end + def eql?(other) other.is_a?(Node) && host == other.host && port == other.port end alias :== :eql? + + # Create a connection to the provided node, + # and, if successful, return the socket. Otherwise, + # return nil. + def connect + begin + + if self.connection.connect_timeout + Mongo::TimeoutHandler.timeout(self.connection.connect_timeout, OperationTimeout) do + socket = TCPSocket.new(self.host, self.port) + end + else + socket = TCPSocket.new(self.host, self.port) + end + + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + rescue OperationFailure, SocketError, SystemCallError, IOError => ex + return nil + end + + self.socket = socket + end + + def disconnect + if self.socket + self.socket.close + self.socket = nil + self.config = nil + end + end + + def connected? + self.socket != nil + end + + # Get the configuration for the provided node as returned by the + # ismaster command. Additionally, check that the replica set name + # matches with the name provided. + def set_config + begin + self.config = self.connection['admin'].command({:ismaster => 1}, :socket => self.socket) + + if self.config['msg'] && @logger + self.connection.logger.warn("MONGODB #{config['msg']}") + end + + check_set_name + rescue OperationFailure, SocketError, SystemCallError, IOError => ex + return nil + end + + self.config + end + + # Return a list of replica set nodes from the config. + # Note: this excludes arbiters. + def node_list + connect unless connected? + return [] unless self.config + + nodes = [] + nodes += config['hosts'] if config['hosts'] + nodes += config['passives'] if config['passives'] + nodes + end + + def arbiters + connect unless connected? + + config['arbiters'].map do |arbiter| + split_nodes(arbiter) + end + end + + def primary? + self.config['ismaster'] == true || self.config['ismaster'] == 1 + end + + def secondary? + self.config['secondary'] == true || self.config['secondary'] == 1 + end + + def host_port + [self.host, self.port] + end + def hash address.hash end + + private + + def split_nodes(host_string) + data = host_string.split(":") + host = data[0] + port = data[1].to_i || Connection::DEFAULT_PORT + + [host, port] + end + + # Make sure that we're connected to the expected replica set. + def check_set_name + if self.connection.replica_set_name + if !self.config['setName'] + self.connection.logger.warn("MONGODB [warning] could not verify replica set name " + + "because ismaster does not return name in this version of MongoDB") + elsif self.connection.replica_set_name != self.config['setName'] + raise ReplicaSetConnectionError, + "Attempting to connect to replica set '#{config['setName']}' " + + "but expected '#{self.connection.replica_set_name}'" + end + end + end end -end \ No newline at end of file +end diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index 581768e..c44baf3 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -20,7 +20,8 @@ module Mongo # Instantiates and manages connections to a MongoDB replica set. class ReplSetConnection < Connection - attr_reader :nodes, :secondaries, :arbiters, :read_pool, :secondary_pools + attr_reader :nodes, :secondaries, :arbiters, :read_pool, :secondary_pools, + :replica_set_name, :members # Create a connection to a MongoDB replica set. # @@ -77,66 +78,68 @@ module Mongo raise MongoArgumentError, "A ReplSetConnection requires at least one node." end - # Get seed nodes - @nodes = args + # Get the list of seed nodes + @seeds = args - # Replica set name - @replica_set = opts[:rs_name] + # The members of the replica set, stored as instances of Mongo::Node. + @members = [] - # Cache the various node types when connecting to a replica set. - @secondaries = [] - @arbiters = [] + # Connection pool for primay node + @primary = nil + @primary_pool = nil # Connection pools for each secondary node + @secondaries = [] @secondary_pools = [] + + # The secondary pool to which we'll be sending reads. @read_pool = nil + # A list of arbiter address (for client information only) + @arbiters = [] + # Are we allowing reads from secondaries? @read_secondary = opts.fetch(:read_secondary, false) + # Replica set name + if opts[:rs_name] + warn ":rs_name option has been deprecated and will be removed in 2.0. " + + "Please use :name instead." + @replica_set_name = opts[:rs_name] + else + @replica_set_name = opts[:name] + end + setup(opts) end - # Create a new socket and attempt to connect to master. - # If successful, sets host and port to master and returns the socket. - # - # If connecting to a replica set, this method will replace the - # initially-provided seed list with any nodes known to the set. - # - # @raise [ConnectionFailure] if unable to connect to any host or port. + # Use the provided seed nodes to initiate a connection + # to the replica set. def connect - close - @nodes_to_try = @nodes.clone - - while connecting? - node = @nodes_to_try.shift - config = check_is_master(node) - - if is_primary?(config) - set_primary(node) - else - set_auxillary(node, config) - end - end - - pick_secondary_for_read if @read_secondary + connect_to_members + initialize_pools + pick_secondary_for_read if connected? BSON::BSON_CODER.update_max_bson_size(self) else - if @secondary_pools.empty? - close # close any existing pools and sockets - raise ConnectionFailure, "Failed to connect any given host:port" - else - close # close any existing pools and sockets + close + + if @primary.nil? raise ConnectionFailure, "Failed to connect to primary node." + else + raise ConnectionFailure, "Failed to connect to any given member." end end end - alias :reconnect :connect + def connected? + @primary_pool || (@read_pool && @read_secondary) + end + + # @deprecated def connecting? - @nodes_to_try.length > 0 + false end # The replica set primary's host name. @@ -165,6 +168,10 @@ module Mongo # Close the connection to the database. def close super + @members.each do |member| + member.disconnect + end + @members = [] @read_pool = nil @secondary_pools.each do |pool| pool.close @@ -172,8 +179,6 @@ module Mongo @secondaries = [] @secondary_pools = [] @arbiters = [] - @nodes_tried = [] - @nodes_to_try = [] end # If a ConnectionFailure is raised, this method will be called @@ -208,115 +213,64 @@ module Mongo private - def check_is_master(node) - begin - host, port = *node - - if @connect_timeout - Mongo::TimeoutHandler.timeout(@connect_timeout, OperationTimeout) do - socket = TCPSocket.new(host, port) - socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - end - else - socket = TCPSocket.new(host, port) - socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - end - - config = self['admin'].command({:ismaster => 1}, :socket => socket) - - check_set_name(config, socket) - rescue OperationFailure, SocketError, SystemCallError, IOError => ex - # It's necessary to rescue here. The #connect method will keep trying - # until it has no more nodes to try and raise a ConnectionFailure if - # it can't connect to a primary. - ensure - socket.close if socket - @nodes_tried << node - - if config - nodes = [] - nodes += config['hosts'] if config['hosts'] - nodes += config['arbiters'] if config['arbiters'] - nodes += config['passives'] if config['passives'] - update_node_list(nodes) - - if config['msg'] && @logger - @logger.warn("MONGODB #{config['msg']}") - end + # Iterate through the list of provided seed + # nodes until we've gotten a response from the + # replica set we're trying to connect to. + # + # If we don't get a response, raise an exception. + def get_valid_seed_node + @seeds.each do |seed| + node = Mongo::Node.new(self, seed) + if node.connect && node.set_config + return node end end - config + raise ConnectionFailure, "Cannot connect to a replica set with name using seed nodes " + + "#{@seeds.map {|s| "#{s[0]}:#{s[1]}" }.join(',')}" end - # Primary, when connecting to a replica can, can only be a true primary node. - # (And not a slave, which is possible when connecting with the standard - # Connection class. - def is_primary?(config) - config && (config['ismaster'] == 1 || config['ismaster'] == true) + # Connect to each member of the replica set + # as reported by the given seed node, and cache + # those connections in the @members array. + def connect_to_members + seed = get_valid_seed_node + + seed.node_list.each do |host| + node = Mongo::Node.new(self, host) + if node.connect && node.set_config + @members << node + end + end + end + + # Initialize the connection pools to the primary and secondary nodes. + def initialize_pools + if @members.empty? + raise ConnectionFailure, "Failed to connect to any given member." + end + + @arbiters = @members.first.arbiters + + @members.each do |member| + if member.primary? + @primary = member.host_port + @primary_pool = Pool.new(self, member.host, member.port, :size => @pool_size, :timeout => @timeout) + elsif member.secondary? && !@secondaries.include?(member.host_port) + @secondaries << member.host_port + @secondary_pools << Pool.new(self, member.host, member.port, :size => @pool_size, :timeout => @timeout) + end + end end # Pick a node randomly from the set of possible secondaries. def pick_secondary_for_read + return unless @read_secondary if (size = @secondary_pools.size) > 0 @read_pool = @secondary_pools[rand(size)] end end - # Make sure that we're connected to the expected replica set. - def check_set_name(config, socket) - if @replica_set - config = self['admin'].command({:replSetGetStatus => 1}, - :socket => socket, :check_response => false) - - if !Mongo::Support.ok?(config) - raise ReplicaSetConnectionError, config['errmsg'] - elsif config['set'] != @replica_set - raise ReplicaSetConnectionError, - "Attempting to connect to replica set '#{config['set']}' but expected '#{@replica_set}'" - end - end - end - - # Determines what kind of node we have and caches its host - # and port so that users can easily connect manually. - def set_auxillary(node, config) - if config - if config['secondary'] - host, port = *node - @secondaries << node unless @secondaries.include?(node) - @secondary_pools << Pool.new(self, host, port, :size => @pool_size, :timeout => @timeout) - elsif config['arbiterOnly'] - @arbiters << node unless @arbiters.include?(node) - end - end - end - - # Update the list of known nodes. Only applies to replica sets, - # where the response to the ismaster command will return a list - # of known hosts. - # - # @param hosts [Array] a list of hosts, specified as string-encoded - # host-port values. Example: ["myserver-1.org:27017", "myserver-1.org:27017"] - # - # @return [Array] the updated list of nodes - def update_node_list(hosts) - new_nodes = hosts.map do |host| - if !host.respond_to?(:split) - warn "Could not parse host #{host.inspect}." - next - end - - host, port = host.split(':') - [host, port ? port.to_i : Connection::DEFAULT_PORT] - end - - # Replace the list of seed nodes with the canonical list. - @nodes = new_nodes.clone - - @nodes_to_try = new_nodes - @nodes_tried - end - # Checkout a socket for reading (i.e., a secondary node). def checkout_reader connect unless connected? diff --git a/test/replica_sets/query_test.rb b/test/replica_sets/query_test.rb index 127f1f7..296cdda 100644 --- a/test/replica_sets/query_test.rb +++ b/test/replica_sets/query_test.rb @@ -7,7 +7,7 @@ class ReplicaSetQueryTest < Test::Unit::TestCase include Mongo def setup - @conn = ReplSetConnection.new([RS.host, RS.ports[0]]) + @conn = ReplSetConnection.new([RS.host, RS.ports[0], RS.ports[1]]) @db = @conn.db(MONGO_TEST_DB) @db.drop_collection("test-sets") @coll = @db.collection("test-sets") diff --git a/test/replica_sets/replication_ack_test.rb b/test/replica_sets/replication_ack_test.rb index 33f0bb8..d08ec86 100644 --- a/test/replica_sets/replication_ack_test.rb +++ b/test/replica_sets/replication_ack_test.rb @@ -33,15 +33,15 @@ class ReplicaSetAckTest < Test::Unit::TestCase end def test_safe_mode_replication_ack - @col.insert({:baz => "bar"}, :safe => {:w => 2, :wtimeout => 5000}) + @col.insert({:baz => "bar"}, :safe => {:w => 3, :wtimeout => 5000}) - assert @col.insert({:foo => "0" * 5000}, :safe => {:w => 2, :wtimeout => 5000}) + assert @col.insert({:foo => "0" * 5000}, :safe => {:w => 3, :wtimeout => 5000}) assert_equal 2, @slave1[MONGO_TEST_DB]["test-sets"].count - assert @col.update({:baz => "bar"}, {:baz => "foo"}, :safe => {:w => 2, :wtimeout => 5000}) + assert @col.update({:baz => "bar"}, {:baz => "foo"}, :safe => {:w => 3, :wtimeout => 5000}) assert @slave1[MONGO_TEST_DB]["test-sets"].find_one({:baz => "foo"}) - assert @col.remove({}, :safe => {:w => 2, :wtimeout => 5000}) + assert @col.remove({}, :safe => {:w => 3, :wtimeout => 5000}) assert_equal 0, @slave1[MONGO_TEST_DB]["test-sets"].count end