minor: cleanup and test fix
This commit is contained in:
parent
b910e3e635
commit
3e3f05813a
@ -23,7 +23,8 @@ module Mongo
|
|||||||
# Instantiates and manages connections to a MongoDB replica set.
|
# Instantiates and manages connections to a MongoDB replica set.
|
||||||
class ReplSetConnection < Connection
|
class ReplSetConnection < Connection
|
||||||
attr_reader :nodes, :secondaries, :arbiters, :secondary_pools,
|
attr_reader :nodes, :secondaries, :arbiters, :secondary_pools,
|
||||||
:replica_set_name, :read_pool, :seeds, :tags_to_pools, :refresh_interval, :auto_refresh
|
:replica_set_name, :read_pool, :seeds, :tags_to_pools,
|
||||||
|
:refresh_interval, :background_refresh
|
||||||
|
|
||||||
# Create a connection to a MongoDB replica set.
|
# Create a connection to a MongoDB replica set.
|
||||||
#
|
#
|
||||||
@ -56,11 +57,12 @@ module Mongo
|
|||||||
# @option opts [Float] :connect_timeout (nil) The number of seconds to wait before timing out a
|
# @option opts [Float] :connect_timeout (nil) The number of seconds to wait before timing out a
|
||||||
# connection attempt.
|
# connection attempt.
|
||||||
# @option opts [Boolean] :ssl (false) If true, create the connection to the server using SSL.
|
# @option opts [Boolean] :ssl (false) If true, create the connection to the server using SSL.
|
||||||
# @option opts [Boolean] :auto_refresh (false) Set this to true to enable a background thread that
|
# @option opts [Boolean] :background_refresh (false) Set this to true to enable a background thread that
|
||||||
# periodically updates the state of the connection. If, for example, you initially connect while a secondary
|
# periodically updates the state of the connection. If, for example, you initially connect while a secondary
|
||||||
# is down, :auto_refresh will reconnect to that secondary behind the scenes to
|
# is down, :background_refresh will reconnect to that secondary behind the scenes to
|
||||||
# prevent you from having to reconnect manually.
|
# prevent you from having to reconnect manually. If set to +false+, background refresh will happen
|
||||||
# @option opts [Integer] :refresh_interval (90) If :auto_refresh is enabled, this is the number of seconds
|
# synchronously.
|
||||||
|
# @option opts [Integer] :refresh_interval (90) If :background_refresh is enabled, this is the number of seconds
|
||||||
# that the background thread will sleep between calls to check the replica set's state.
|
# that the background thread will sleep between calls to check the replica set's state.
|
||||||
# @option opts [Boolean] :require_primary (true) If true, require a primary node for the connection
|
# @option opts [Boolean] :require_primary (true) If true, require a primary node for the connection
|
||||||
# to succeed. Otherwise, connection will succeed as long as there's at least one secondary.
|
# to succeed. Otherwise, connection will succeed as long as there's at least one secondary.
|
||||||
@ -118,7 +120,7 @@ module Mongo
|
|||||||
@arbiters = []
|
@arbiters = []
|
||||||
|
|
||||||
# Refresh
|
# Refresh
|
||||||
@auto_refresh = opts.fetch(:auto_refresh, false)
|
@background_refresh = opts.fetch(:background_refresh, false)
|
||||||
@refresh_interval = opts[:refresh_interval] || 90
|
@refresh_interval = opts[:refresh_interval] || 90
|
||||||
|
|
||||||
# Are we allowing reads from secondaries?
|
# Are we allowing reads from secondaries?
|
||||||
@ -151,7 +153,7 @@ module Mongo
|
|||||||
end
|
end
|
||||||
|
|
||||||
# Require a primary node to connect?
|
# Require a primary node to connect?
|
||||||
@require_primary = opts.fetch(:require_primary, false)
|
@require_primary = opts.fetch(:require_primary, true)
|
||||||
|
|
||||||
setup(opts)
|
setup(opts)
|
||||||
end
|
end
|
||||||
@ -163,14 +165,14 @@ module Mongo
|
|||||||
|
|
||||||
# Initiate a connection to the replica set.
|
# Initiate a connection to the replica set.
|
||||||
def connect
|
def connect
|
||||||
log(:debug, "Connecting.")
|
log(:debug, "Connecting...")
|
||||||
sync_synchronize(:EX) do
|
sync_synchronize(:EX) do
|
||||||
return if @connected
|
return if @connected
|
||||||
manager = PoolManager.new(self, @seeds)
|
manager = PoolManager.new(self, @seeds)
|
||||||
manager.connect
|
manager.connect
|
||||||
|
|
||||||
update_config(manager)
|
update_config(manager)
|
||||||
initiate_auto_refresh
|
initiate_background_refresh
|
||||||
|
|
||||||
if @require_primary && @primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
|
if @require_primary && @primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
|
||||||
raise ConnectionFailure, "Failed to connect to primary node."
|
raise ConnectionFailure, "Failed to connect to primary node."
|
||||||
@ -241,7 +243,8 @@ module Mongo
|
|||||||
end
|
end
|
||||||
|
|
||||||
def nodes
|
def nodes
|
||||||
warn "DEPRECATED"
|
warn "ReplSetConnection#nodes is DEPRECATED and will be removed in v2.0. " +
|
||||||
|
"Please use ReplSetConnection#seeds instead."
|
||||||
@seeds
|
@seeds
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -261,7 +264,6 @@ module Mongo
|
|||||||
end
|
end
|
||||||
|
|
||||||
# Close the connection to the database.
|
# Close the connection to the database.
|
||||||
# TODO: we should get an exclusive lock here.
|
|
||||||
def close
|
def close
|
||||||
sync_synchronize(:EX) do
|
sync_synchronize(:EX) do
|
||||||
@connected = false
|
@connected = false
|
||||||
@ -331,8 +333,8 @@ module Mongo
|
|||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def initiate_auto_refresh
|
def initiate_background_refresh
|
||||||
if @auto_refresh
|
if @background_refresh
|
||||||
return if @refresh_thread && @refresh_thread.alive?
|
return if @refresh_thread && @refresh_thread.alive?
|
||||||
@refresh_thread = Thread.new do
|
@refresh_thread = Thread.new do
|
||||||
while true do
|
while true do
|
||||||
@ -441,7 +443,9 @@ module Mongo
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
if !@auto_refresh &&
|
# Refresh synchronously every @refresh_interval seconds
|
||||||
|
# if @background_refresh is false.
|
||||||
|
if !@background_refresh &&
|
||||||
((Time.now - @last_refresh) > @refresh_interval)
|
((Time.now - @last_refresh) > @refresh_interval)
|
||||||
refresh
|
refresh
|
||||||
end
|
end
|
||||||
|
@ -4,15 +4,15 @@ module Mongo
|
|||||||
attr_accessor :host, :port, :address, :config, :connection, :socket
|
attr_accessor :host, :port, :address, :config, :connection, :socket
|
||||||
|
|
||||||
def initialize(connection, data)
|
def initialize(connection, data)
|
||||||
self.connection = connection
|
@connection = connection
|
||||||
if data.is_a?(String)
|
if data.is_a?(String)
|
||||||
self.host, self.port = split_nodes(data)
|
@host, @port = split_nodes(data)
|
||||||
else
|
else
|
||||||
self.host = data[0]
|
@host = data[0]
|
||||||
self.port = data[1].nil? ? Connection::DEFAULT_PORT : data[1].to_i
|
@port = data[1].nil? ? Connection::DEFAULT_PORT : data[1].to_i
|
||||||
end
|
end
|
||||||
self.address = "#{host}:#{port}"
|
@address = "#{host}:#{port}"
|
||||||
self.config = nil
|
@config = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
def eql?(other)
|
def eql?(other)
|
||||||
@ -34,12 +34,12 @@ module Mongo
|
|||||||
def connect
|
def connect
|
||||||
begin
|
begin
|
||||||
socket = nil
|
socket = nil
|
||||||
if self.connection.connect_timeout
|
if @connection.connect_timeout
|
||||||
Mongo::TimeoutHandler.timeout(self.connection.connect_timeout, OperationTimeout) do
|
Mongo::TimeoutHandler.timeout(@connection.connect_timeout, OperationTimeout) do
|
||||||
socket = self.connection.socket_class.new(self.host, self.port)
|
socket = @connection.socket_class.new(@host, @port)
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
socket = self.connection.socket_class.new(self.host, self.port)
|
socket = @connection.socket_class.new(@host, @port)
|
||||||
end
|
end
|
||||||
|
|
||||||
if socket.nil?
|
if socket.nil?
|
||||||
@ -48,29 +48,29 @@ module Mongo
|
|||||||
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
||||||
end
|
end
|
||||||
rescue OperationTimeout, OperationFailure, SocketError, SystemCallError, IOError => ex
|
rescue OperationTimeout, OperationFailure, SocketError, SystemCallError, IOError => ex
|
||||||
self.connection.log(:debug, "Failed connection to #{host_string} with #{ex.class}, #{ex.message}.")
|
@connection.log(:debug, "Failed connection to #{host_string} with #{ex.class}, #{ex.message}.")
|
||||||
socket.close if socket
|
socket.close if socket
|
||||||
return nil
|
return nil
|
||||||
end
|
end
|
||||||
|
|
||||||
self.socket = socket
|
@socket = socket
|
||||||
end
|
end
|
||||||
|
|
||||||
def close
|
def close
|
||||||
if self.socket
|
if @socket
|
||||||
self.socket.close
|
@socket.close
|
||||||
self.socket = nil
|
@socket = nil
|
||||||
self.config = nil
|
@config = nil
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def connected?
|
def connected?
|
||||||
self.socket != nil
|
@socket != nil
|
||||||
end
|
end
|
||||||
|
|
||||||
def active?
|
def active?
|
||||||
begin
|
begin
|
||||||
result = self.connection['admin'].command({:ping => 1}, :socket => self.socket)
|
result = @connection['admin'].command({:ping => 1}, :socket => @socket)
|
||||||
return result['ok'] == 1
|
return result['ok'] == 1
|
||||||
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
|
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
|
||||||
return nil
|
return nil
|
||||||
@ -82,28 +82,28 @@ module Mongo
|
|||||||
# matches with the name provided.
|
# matches with the name provided.
|
||||||
def set_config
|
def set_config
|
||||||
begin
|
begin
|
||||||
self.config = self.connection['admin'].command({:ismaster => 1}, :socket => self.socket)
|
@config = @connection['admin'].command({:ismaster => 1}, :socket => @socket)
|
||||||
|
|
||||||
if self.config['msg'] && @logger
|
if @config['msg'] && @logger
|
||||||
self.connection.log(:warn, "#{config['msg']}")
|
@connection.log(:warn, "#{config['msg']}")
|
||||||
end
|
end
|
||||||
|
|
||||||
check_set_membership(config)
|
check_set_membership(config)
|
||||||
check_set_name(config)
|
check_set_name(config)
|
||||||
rescue ConnectionFailure, OperationFailure, SocketError, SystemCallError, IOError => ex
|
rescue ConnectionFailure, OperationFailure, SocketError, SystemCallError, IOError => ex
|
||||||
self.connection.log(:warn, "Attempted connection to node #{host_string} raised " +
|
@connection.log(:warn, "Attempted connection to node #{host_string} raised " +
|
||||||
"#{ex.class}: #{ex.message}")
|
"#{ex.class}: #{ex.message}")
|
||||||
return nil
|
return nil
|
||||||
end
|
end
|
||||||
|
|
||||||
self.config
|
@config
|
||||||
end
|
end
|
||||||
|
|
||||||
# Return a list of replica set nodes from the config.
|
# Return a list of replica set nodes from the config.
|
||||||
# Note: this excludes arbiters.
|
# Note: this excludes arbiters.
|
||||||
def node_list
|
def node_list
|
||||||
connect unless connected?
|
connect unless connected?
|
||||||
set_config unless self.config
|
set_config unless @config
|
||||||
|
|
||||||
return [] unless config
|
return [] unless config
|
||||||
|
|
||||||
@ -115,7 +115,7 @@ module Mongo
|
|||||||
|
|
||||||
def arbiters
|
def arbiters
|
||||||
connect unless connected?
|
connect unless connected?
|
||||||
set_config unless self.config
|
set_config unless @config
|
||||||
return [] unless config['arbiters']
|
return [] unless config['arbiters']
|
||||||
|
|
||||||
config['arbiters'].map do |arbiter|
|
config['arbiters'].map do |arbiter|
|
||||||
@ -125,22 +125,22 @@ module Mongo
|
|||||||
|
|
||||||
def tags
|
def tags
|
||||||
connect unless connected?
|
connect unless connected?
|
||||||
set_config unless self.config
|
set_config unless @config
|
||||||
return {} unless config['tags'] && !config['tags'].empty?
|
return {} unless config['tags'] && !config['tags'].empty?
|
||||||
|
|
||||||
config['tags']
|
config['tags']
|
||||||
end
|
end
|
||||||
|
|
||||||
def primary?
|
def primary?
|
||||||
self.config['ismaster'] == true || self.config['ismaster'] == 1
|
@config['ismaster'] == true || @config['ismaster'] == 1
|
||||||
end
|
end
|
||||||
|
|
||||||
def secondary?
|
def secondary?
|
||||||
self.config['secondary'] == true || self.config['secondary'] == 1
|
@config['secondary'] == true || @config['secondary'] == 1
|
||||||
end
|
end
|
||||||
|
|
||||||
def host_port
|
def host_port
|
||||||
[self.host, self.port]
|
[@host, @port]
|
||||||
end
|
end
|
||||||
|
|
||||||
def hash
|
def hash
|
||||||
@ -168,13 +168,13 @@ module Mongo
|
|||||||
|
|
||||||
# Ensure that this node is part of a replica set of the expected name.
|
# Ensure that this node is part of a replica set of the expected name.
|
||||||
def check_set_name(config)
|
def check_set_name(config)
|
||||||
if self.connection.replica_set_name
|
if @connection.replica_set_name
|
||||||
if !config['setName']
|
if !config['setName']
|
||||||
self.connection.log(:warn, "Could not verify replica set name for member #{host_string} " +
|
@connection.log(:warn, "Could not verify replica set name for member #{host_string} " +
|
||||||
"because ismaster does not return name in this version of MongoDB")
|
"because ismaster does not return name in this version of MongoDB")
|
||||||
elsif self.connection.replica_set_name != config['setName']
|
elsif @connection.replica_set_name != config['setName']
|
||||||
message = "Attempting to connect to replica set '#{config['setName']}' on member #{host_string} " +
|
message = "Attempting to connect to replica set '#{config['setName']}' on member #{host_string} " +
|
||||||
"but expected '#{self.connection.replica_set_name}'"
|
"but expected '#{@connection.replica_set_name}'"
|
||||||
raise ReplicaSetConnectionError, message
|
raise ReplicaSetConnectionError, message
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -20,12 +20,12 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||||||
x.report("Connect") do
|
x.report("Connect") do
|
||||||
10.times do
|
10.times do
|
||||||
ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||||
[RS.host, RS.ports[2]], :auto_refresh => false)
|
[RS.host, RS.ports[2]], :background_refresh => false)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@con = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
@con = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||||
[RS.host, RS.ports[2]], :auto_refresh => false)
|
[RS.host, RS.ports[2]], :background_refresh => false)
|
||||||
|
|
||||||
x.report("manager") do
|
x.report("manager") do
|
||||||
man = Mongo::PoolManager.new(@con, @con.seeds)
|
man = Mongo::PoolManager.new(@con, @con.seeds)
|
||||||
@ -41,7 +41,7 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||||||
|
|
||||||
rescue_connection_failure do
|
rescue_connection_failure do
|
||||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||||
[RS.host, RS.ports[2]], :auto_refresh => false)
|
[RS.host, RS.ports[2]], :background_refresh => false)
|
||||||
end
|
end
|
||||||
|
|
||||||
assert_equal [], @conn.secondaries
|
assert_equal [], @conn.secondaries
|
||||||
@ -70,7 +70,7 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||||||
|
|
||||||
rescue_connection_failure do
|
rescue_connection_failure do
|
||||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :auto_refresh => true)
|
[RS.host, RS.ports[2]], :refresh_interval => 2, :background_refresh => true)
|
||||||
end
|
end
|
||||||
|
|
||||||
assert_equal [], @conn.secondaries
|
assert_equal [], @conn.secondaries
|
||||||
@ -87,7 +87,7 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||||||
|
|
||||||
def test_automated_refresh_with_removed_node
|
def test_automated_refresh_with_removed_node
|
||||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :auto_refresh => true)
|
[RS.host, RS.ports[2]], :refresh_interval => 2, :background_refresh => true)
|
||||||
|
|
||||||
assert_equal 2, @conn.secondary_pools.length
|
assert_equal 2, @conn.secondary_pools.length
|
||||||
assert_equal 2, @conn.secondaries.length
|
assert_equal 2, @conn.secondaries.length
|
||||||
@ -103,13 +103,13 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
|||||||
|
|
||||||
def test_adding_and_removing_nodes
|
def test_adding_and_removing_nodes
|
||||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :auto_refresh => true)
|
[RS.host, RS.ports[2]], :refresh_interval => 2, :background_refresh => true)
|
||||||
|
|
||||||
RS.add_node
|
RS.add_node
|
||||||
sleep(5)
|
sleep(5)
|
||||||
|
|
||||||
@conn2 = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
@conn2 = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :auto_refresh => true)
|
[RS.host, RS.ports[2]], :refresh_interval => 2, :background_refresh => true)
|
||||||
|
|
||||||
assert @conn2.secondaries == @conn.secondaries
|
assert @conn2.secondaries == @conn.secondaries
|
||||||
assert_equal 3, @conn.secondary_pools.length
|
assert_equal 3, @conn.secondary_pools.length
|
||||||
|
@ -5,7 +5,7 @@ class PoolManagerTest < Test::Unit::TestCase
|
|||||||
|
|
||||||
context "Initialization: " do
|
context "Initialization: " do
|
||||||
|
|
||||||
def setup
|
should "populate pools correctly" do
|
||||||
TCPSocket.stubs(:new).returns(new_mock_socket)
|
TCPSocket.stubs(:new).returns(new_mock_socket)
|
||||||
@db = new_mock_db
|
@db = new_mock_db
|
||||||
|
|
||||||
@ -14,9 +14,7 @@ class PoolManagerTest < Test::Unit::TestCase
|
|||||||
@connection.stubs(:pool_size).returns(2)
|
@connection.stubs(:pool_size).returns(2)
|
||||||
@connection.stubs(:socket_class).returns(TCPSocket)
|
@connection.stubs(:socket_class).returns(TCPSocket)
|
||||||
@connection.stubs(:[]).returns(@db)
|
@connection.stubs(:[]).returns(@db)
|
||||||
end
|
|
||||||
|
|
||||||
should "populate pools correctly" do
|
|
||||||
@connection.stubs(:replica_set_name).returns(nil)
|
@connection.stubs(:replica_set_name).returns(nil)
|
||||||
@connection.stubs(:log)
|
@connection.stubs(:log)
|
||||||
@arbiters = ['localhost:27020']
|
@arbiters = ['localhost:27020']
|
||||||
|
Loading…
Reference in New Issue
Block a user