# encoding: UTF-8 # -- # Copyright (C) 2008-2010 10gen Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ++ module Mongo # Instantiates and manages connections to MongoDB. class ReplSetConnection attr_reader :nodes, :secondaries, :arbiters, :read_pool, :secondary_pools def initialize(*args) if args.last.is_a?(Hash) options = args.pop end @nodes = args # Replica set name @replica_set_name = options[:rs_name] # Cache the various node types when connecting to a replica set. @secondaries = [] @arbiters = [] # Connection pools for each secondary node @secondary_pools = [] @read_pool = nil super end # Create a new socket and attempt to connect to master. # If successful, sets host and port to master and returns the socket. # # If connecting to a replica set, this method will replace the # initially-provided seed list with any nodes known to the set. # # @raise [ConnectionFailure] if unable to connect to any host or port. def connect reset_connection @nodes_to_try = @nodes.clone while connecting? node = @nodes_to_try.shift config = check_is_master(node) if is_primary?(config) set_primary(node) else set_auxillary(node, config) end end pick_secondary_for_read if @read_secondary raise ConnectionFailure, "failed to connect to any given host:port" unless connected? end def connecting? @nodes_to_try.length > 0 end # Close the connection to the database. def close super @read_pool = nil @secondary_pools.each do |pool| pool.close end end # If a ConnectionFailure is raised, this method will be called # to close the connection and reset connection values. def reset_connection super @secondaries = [] @secondary_pools = [] @arbiters = [] @nodes_tried = [] @nodes_to_try = [] end private def check_is_master(node) begin host, port = *node socket = TCPSocket.new(host, port) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) config = self['admin'].command({:ismaster => 1}, :sock => socket) check_set_name(config, socket) rescue OperationFailure, SocketError, SystemCallError, IOError => ex close unless connected? ensure @nodes_tried << node if config update_node_list(config['hosts']) if config['hosts'] if config['msg'] && @logger @logger.warn("MONGODB #{config['msg']}") end end socket.close if socket end config end # Pick a node randomly from the set of possible secondaries. def pick_secondary_for_read if (size = @secondary_pools.size) > 0 @read_pool = @secondary_pools[rand(size)] end end # Make sure that we're connected to the expected replica set. def check_set_name(config, socket) if @replica_set_name config = self['admin'].command({:replSetGetStatus => 1}, :sock => socket, :check_response => false) if !Mongo::Support.ok?(config) raise ReplicaSetConnectionError, config['errmsg'] elsif config['set'] != @replica_set_name raise ReplicaSetConnectionError, "Attempting to connect to replica set '#{config['set']}' but expected '#{@replica_set_name}'" end end end # Determines what kind of node we have and caches its host # and port so that users can easily connect manually. def set_auxillary(node, config) if config if config['secondary'] host, port = *node @secondaries << node unless @secondaries.include?(node) @secondary_pools << Pool.new(self, host, port, :size => @pool_size, :timeout => @timeout) elsif config['arbiterOnly'] @arbiters << node unless @arbiters.include?(node) end end end # Update the list of known nodes. Only applies to replica sets, # where the response to the ismaster command will return a list # of known hosts. # # @param hosts [Array] a list of hosts, specified as string-encoded # host-port values. Example: ["myserver-1.org:27017", "myserver-1.org:27017"] # # @return [Array] the updated list of nodes def update_node_list(hosts) new_nodes = hosts.map do |host| if !host.respond_to?(:split) warn "Could not parse host #{host.inspect}." next end host, port = host.split(':') [host, port.to_i] end # Replace the list of seed nodes with the canonical list. @nodes = new_nodes.clone @nodes_to_try = new_nodes - @nodes_tried end end end