minor: reorganization. warn if using replica sets RUBY-148
This commit is contained in:
parent
d8b34d7679
commit
476d856abb
|
@ -448,7 +448,6 @@ module Mongo
|
|||
result
|
||||
end
|
||||
|
||||
|
||||
# Create a new socket and attempt to connect to master.
|
||||
# If successful, sets host and port to master and returns the socket.
|
||||
#
|
||||
|
@ -459,16 +458,11 @@ module Mongo
|
|||
def connect
|
||||
reset_connection
|
||||
|
||||
first_node = @nodes.first
|
||||
if is_primary?(check_is_master(first_node))
|
||||
set_primary(first_node)
|
||||
else
|
||||
while !connected? && !(nodes_to_try = @nodes - @nodes_tried).empty?
|
||||
nodes_to_try.each do |node|
|
||||
if is_primary?(check_is_master(node))
|
||||
set_primary(node)
|
||||
break
|
||||
end
|
||||
while !connected? && !(nodes_to_try = @nodes - @nodes_tried).empty?
|
||||
nodes_to_try.each do |node|
|
||||
if is_primary?(check_is_master(node))
|
||||
set_primary(node)
|
||||
break
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -485,122 +479,6 @@ module Mongo
|
|||
connect
|
||||
end
|
||||
|
||||
def reset_connection
|
||||
close
|
||||
@host = nil
|
||||
@port = nil
|
||||
@nodes_tried = []
|
||||
end
|
||||
|
||||
def connected?
|
||||
@host && @port
|
||||
end
|
||||
|
||||
# Primary is defined as either a master node or a slave if
|
||||
# :slave_ok has been set to +true+.
|
||||
#
|
||||
# If a primary node is discovered, we set the the @host and @port and
|
||||
# apply any saved authentication.
|
||||
#
|
||||
# TODO: use the 'primary', and 'seconday' fields if we're in a replica set
|
||||
def is_primary?(config)
|
||||
config && (config['ismaster'] == 1 || config['ismaster'] == true) || @slave_ok
|
||||
end
|
||||
|
||||
# @return
|
||||
def check_is_master(node)
|
||||
begin
|
||||
host, port = *node
|
||||
socket = TCPSocket.new(host, port)
|
||||
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
||||
|
||||
config = self['admin'].command({:ismaster => 1}, :sock => socket)
|
||||
|
||||
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
|
||||
close
|
||||
ensure
|
||||
@nodes_tried << node
|
||||
update_node_list(config['hosts']) if config && config['hosts']
|
||||
socket.close if socket
|
||||
end
|
||||
|
||||
config
|
||||
end
|
||||
|
||||
def set_primary(node)
|
||||
@host, @port = *node
|
||||
apply_saved_authentication
|
||||
end
|
||||
|
||||
def update_node_list(hosts)
|
||||
new_nodes = hosts.map do |host|
|
||||
if !host.respond_to?(:split)
|
||||
warn "Could not parse host #{host.inspect}."
|
||||
next
|
||||
end
|
||||
|
||||
host, port = host.split(':')
|
||||
[host, port.to_i]
|
||||
end
|
||||
|
||||
@nodes |= new_nodes
|
||||
end
|
||||
|
||||
# Create a new socket and attempt to connect to master.
|
||||
# If successful, sets host and port to master and returns the socket.
|
||||
#
|
||||
# @raise [ConnectionFailure] if unable to connect to any host or port.
|
||||
def connect_to_master_old
|
||||
close
|
||||
@host = @port = nil
|
||||
for node_pair in @nodes
|
||||
host, port = *node_pair
|
||||
begin
|
||||
socket = TCPSocket.new(host, port)
|
||||
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
||||
|
||||
# If we're connected to master, set the @host and @port
|
||||
result = self['admin'].command({:ismaster => 1}, :check_response => false, :sock => socket)
|
||||
if Mongo::Support.ok?(result) &&
|
||||
((is_master = result['ismaster'] == 1) || @slave_ok)
|
||||
@host, @port = host, port
|
||||
apply_saved_authentication
|
||||
end
|
||||
|
||||
# Note: slave_ok can be true only when connecting to a single node.
|
||||
if @nodes.length == 1 && !is_master && !@slave_ok
|
||||
raise ConfigurationError, "Trying to connect directly to slave; " +
|
||||
"if this is what you want, specify :slave_ok => true."
|
||||
end
|
||||
|
||||
break if is_master || @slave_ok
|
||||
rescue SocketError, SystemCallError, IOError => ex
|
||||
close
|
||||
false
|
||||
ensure
|
||||
socket.close if socket
|
||||
end
|
||||
end
|
||||
raise ConnectionFailure, "failed to connect to any given host:port" unless socket
|
||||
end
|
||||
|
||||
def attempt_node(node)
|
||||
host, port = *node
|
||||
socket = TCPSocket.new(host, port)
|
||||
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
||||
|
||||
# If we're connected to master, set the @host and @port
|
||||
result = self['admin'].command({:ismaster => 1}, :check_response => false, :sock => socket)
|
||||
if Mongo::Support.ok?(result)
|
||||
if ((is_master = result['ismaster'] == 1) || @slave_ok)
|
||||
@host, @port = host, port
|
||||
end
|
||||
apply_saved_authentication
|
||||
end
|
||||
end
|
||||
|
||||
# Are we connected to MongoDB? This is determined by checking whether
|
||||
# host and port have values, since they're set to nil on calls to #close.
|
||||
def connected?
|
||||
@host && @port
|
||||
end
|
||||
|
@ -691,6 +569,79 @@ module Mongo
|
|||
|
||||
private
|
||||
|
||||
# If a ConnectionFailure is raised, this method will be called
|
||||
# to close the connection and reset connection values.
|
||||
def reset_connection
|
||||
close
|
||||
@host = nil
|
||||
@port = nil
|
||||
@nodes_tried = []
|
||||
end
|
||||
|
||||
# Primary is defined as either a master node or a slave if
|
||||
# :slave_ok has been set to +true+.
|
||||
#
|
||||
# If a primary node is discovered, we set the the @host and @port and
|
||||
# apply any saved authentication.
|
||||
#
|
||||
# TODO: use the 'primary', and 'seconday' fields if we're in a replica set
|
||||
def is_primary?(config)
|
||||
config && (config['ismaster'] == 1 || config['ismaster'] == true) || @slave_ok
|
||||
end
|
||||
|
||||
# @return
|
||||
def check_is_master(node)
|
||||
begin
|
||||
host, port = *node
|
||||
socket = TCPSocket.new(host, port)
|
||||
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
||||
|
||||
config = self['admin'].command({:ismaster => 1}, :sock => socket)
|
||||
|
||||
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
|
||||
close
|
||||
ensure
|
||||
@nodes_tried << node
|
||||
if config
|
||||
update_node_list(config['hosts']) if config['hosts']
|
||||
@logger.warn(config['msg']) if config['msg']
|
||||
end
|
||||
|
||||
socket.close if socket
|
||||
end
|
||||
|
||||
config
|
||||
end
|
||||
|
||||
# Set the specified node as primary, and
|
||||
# apply any saved authentication credentials.
|
||||
def set_primary(node)
|
||||
@host, @port = *node
|
||||
apply_saved_authentication
|
||||
end
|
||||
|
||||
# Update the list of known nodes. Only applies to replica sets,
|
||||
# where the response to the ismaster command will return a list
|
||||
# of known hosts.
|
||||
#
|
||||
# @param hosts [Array] a list of hosts, specified as string-encoded
|
||||
# host-port values. Example: ["myserver-1.org:27017", "myserver-1.org:27017"]
|
||||
#
|
||||
# @return [Array] the updated list of nodes
|
||||
def update_node_list(hosts)
|
||||
new_nodes = hosts.map do |host|
|
||||
if !host.respond_to?(:split)
|
||||
warn "Could not parse host #{host.inspect}."
|
||||
next
|
||||
end
|
||||
|
||||
host, port = host.split(':')
|
||||
[host, port.to_i]
|
||||
end
|
||||
|
||||
@nodes |= new_nodes
|
||||
end
|
||||
|
||||
# Return a socket to the pool.
|
||||
def checkin(socket)
|
||||
@connection_mutex.synchronize do
|
||||
|
|
Loading…
Reference in New Issue