Replica Set connection improvements for refresh and multi-threaded apps.
This commit is contained in:
parent
a370f3abed
commit
83eaa4d51b
@ -53,7 +53,7 @@ module Mongo
|
||||
def initialize(name, db, opts={})
|
||||
if db.is_a?(String) && name.is_a?(Mongo::DB)
|
||||
warn "Warning: the order of parameters to initialize a collection have changed. " +
|
||||
"Please specify the collection name first, followed by the db. This will be made permanent"
|
||||
"Please specify the collection name first, followed by the db. This will be made permanent" +
|
||||
"in v2.0."
|
||||
db, name = name, db
|
||||
end
|
||||
|
@ -30,9 +30,6 @@ module Mongo
|
||||
Mutex = ::Mutex
|
||||
ConditionVariable = ::ConditionVariable
|
||||
|
||||
# Abort connections if a ConnectionError is raised.
|
||||
Thread.abort_on_exception = true
|
||||
|
||||
DEFAULT_PORT = 27017
|
||||
STANDARD_HEADER_SIZE = 16
|
||||
RESPONSE_HEADER_SIZE = 20
|
||||
|
@ -23,7 +23,7 @@ module Mongo
|
||||
# Instantiates and manages connections to a MongoDB replica set.
|
||||
class ReplSetConnection < Connection
|
||||
attr_reader :nodes, :secondaries, :arbiters, :secondary_pools,
|
||||
:replica_set_name, :read_pool, :seeds, :tags_to_pools, :refresh_interval
|
||||
:replica_set_name, :read_pool, :seeds, :tags_to_pools, :refresh_interval, :auto_refresh
|
||||
|
||||
# Create a connection to a MongoDB replica set.
|
||||
#
|
||||
@ -62,6 +62,8 @@ module Mongo
|
||||
# prevent you from having to reconnect manually.
|
||||
# @option opts [Integer] :refresh_interval (90) If :auto_refresh is enabled, this is the number of seconds
|
||||
# that the background thread will sleep between calls to check the replica set's state.
|
||||
# @option opts [Boolean] :require_primary (true) If true, require a primary node for the connection
|
||||
# to succeed. Otherwise, connection will succeed as long as there's at least one secondary.
|
||||
#
|
||||
# @example Connect to a replica set and provide two seed nodes. Note that the number of seed nodes does
|
||||
# not have to be equal to the number of replica set members. The purpose of seed nodes is to permit
|
||||
@ -148,6 +150,9 @@ module Mongo
|
||||
@replica_set_name = opts[:name]
|
||||
end
|
||||
|
||||
# Require a primary node to connect?
|
||||
@require_primary = opts.fetch(:require_primary, false)
|
||||
|
||||
setup(opts)
|
||||
end
|
||||
|
||||
@ -158,6 +163,7 @@ module Mongo
|
||||
|
||||
# Initiate a connection to the replica set.
|
||||
def connect
|
||||
log(:debug, "Connecting.")
|
||||
sync_synchronize(:EX) do
|
||||
return if @connected
|
||||
manager = PoolManager.new(self, @seeds)
|
||||
@ -166,8 +172,10 @@ module Mongo
|
||||
update_config(manager)
|
||||
initiate_auto_refresh
|
||||
|
||||
if @primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
|
||||
if @require_primary && @primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
|
||||
raise ConnectionFailure, "Failed to connect to primary node."
|
||||
elsif !@read_pool
|
||||
raise ConnectionFailure, "Failed to connect to any node."
|
||||
else
|
||||
@connected = true
|
||||
end
|
||||
@ -205,11 +213,14 @@ module Mongo
|
||||
end
|
||||
|
||||
background_manager = Thread.current[:background]
|
||||
if background_manager.update_required?(@hosts)
|
||||
sync_synchronize(:EX) do
|
||||
background_manager.connect
|
||||
update_config(background_manager)
|
||||
end
|
||||
|
||||
# Return if another thread is already in the process of refreshing.
|
||||
return if sync_exclusive?
|
||||
|
||||
sync_synchronize(:EX) do
|
||||
log(:debug, "Refreshing...")
|
||||
background_manager.connect
|
||||
update_config(background_manager)
|
||||
end
|
||||
end
|
||||
|
||||
@ -259,35 +270,36 @@ module Mongo
|
||||
# Close the connection to the database.
|
||||
# TODO: we should get an exclusive lock here.
|
||||
def close
|
||||
@connected = false
|
||||
sync_synchronize(:EX) do
|
||||
@connected = false
|
||||
super
|
||||
|
||||
super
|
||||
|
||||
if @refresh_thread
|
||||
@refresh_thread.kill
|
||||
@refresh_thread = nil
|
||||
end
|
||||
|
||||
if @nodes
|
||||
@nodes.each do |member|
|
||||
member.disconnect
|
||||
if @refresh_thread
|
||||
@refresh_thread.kill
|
||||
@refresh_thread = nil
|
||||
end
|
||||
end
|
||||
|
||||
@nodes = []
|
||||
@read_pool = nil
|
||||
|
||||
if @secondary_pools
|
||||
@secondary_pools.each do |pool|
|
||||
pool.close
|
||||
if @nodes
|
||||
@nodes.each do |member|
|
||||
member.disconnect
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@secondaries = []
|
||||
@secondary_pools = []
|
||||
@arbiters = []
|
||||
@tags_to_pools.clear
|
||||
@sockets_to_pools.clear
|
||||
@nodes = []
|
||||
@read_pool = nil
|
||||
|
||||
if @secondary_pools
|
||||
@secondary_pools.each do |pool|
|
||||
pool.close
|
||||
end
|
||||
end
|
||||
|
||||
@secondaries = []
|
||||
@secondary_pools = []
|
||||
@arbiters = []
|
||||
@tags_to_pools.clear
|
||||
@sockets_to_pools.clear
|
||||
end
|
||||
end
|
||||
|
||||
# If a ConnectionFailure is raised, this method will be called
|
||||
@ -342,17 +354,25 @@ module Mongo
|
||||
# if no read pool has been defined.
|
||||
def checkout_reader
|
||||
connect unless connected?
|
||||
socket = get_socket_from_pool(@read_pool)
|
||||
|
||||
sync_synchronize(:SH) do
|
||||
socket = @read_pool.checkout
|
||||
@sockets_to_pools[socket] = @read_pool
|
||||
if !socket
|
||||
refresh
|
||||
socket = get_socket_from_pool(@primary_pool)
|
||||
end
|
||||
|
||||
if socket
|
||||
socket
|
||||
else
|
||||
raise ConnectionFailure.new("Could not connect to a node for reading.")
|
||||
end
|
||||
end
|
||||
|
||||
# Checkout a socket connected to a node with one of
|
||||
# the provided tags. If no such node exists, raise
|
||||
# an exception.
|
||||
#
|
||||
# NOTE: will be available in driver release v2.0.
|
||||
def checkout_tagged(tags)
|
||||
sync_synchronize(:SH) do
|
||||
tags.each do |k, v|
|
||||
@ -372,13 +392,33 @@ module Mongo
|
||||
# Checkout a socket for writing (i.e., a primary node).
|
||||
def checkout_writer
|
||||
connect unless connected?
|
||||
socket = get_socket_from_pool(@primary_pool)
|
||||
|
||||
sync_synchronize(:SH) do
|
||||
if @primary_pool
|
||||
socket = @primary_pool.checkout
|
||||
@sockets_to_pools[socket] = @primary_pool
|
||||
socket
|
||||
if !socket
|
||||
refresh
|
||||
socket = get_socket_from_pool(@primary_pool)
|
||||
end
|
||||
|
||||
if socket
|
||||
socket
|
||||
else
|
||||
raise ConnectionFailure.new("Could not connect to primary node.")
|
||||
end
|
||||
end
|
||||
|
||||
def get_socket_from_pool(pool)
|
||||
begin
|
||||
sync_synchronize(:SH) do
|
||||
if pool
|
||||
socket = pool.checkout
|
||||
@sockets_to_pools[socket] = pool
|
||||
socket
|
||||
end
|
||||
end
|
||||
|
||||
rescue ConnectionFailure => ex
|
||||
log(:info, "Failed to checkout from #{pool} with #{ex.class}; #{ex.message}")
|
||||
return nil
|
||||
end
|
||||
end
|
||||
|
||||
@ -397,8 +437,12 @@ module Mongo
|
||||
end
|
||||
|
||||
def checkin(socket)
|
||||
if pool = @sockets_to_pools[socket]
|
||||
pool.checkin(socket)
|
||||
sync_synchronize(:SH) do
|
||||
if pool = @sockets_to_pools[socket]
|
||||
pool.checkin(socket)
|
||||
elsif socket
|
||||
socket.close
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -2,7 +2,7 @@ module Mongo
|
||||
module Logging
|
||||
|
||||
# Log a message with the given level.
|
||||
def log(level, message)
|
||||
def log(level, msg)
|
||||
return unless @logger
|
||||
case level
|
||||
when :debug then
|
||||
|
@ -20,6 +20,10 @@ module Mongo
|
||||
end
|
||||
alias :== :eql?
|
||||
|
||||
def close
|
||||
self.socket.close if self.socket
|
||||
end
|
||||
|
||||
def host_string
|
||||
address
|
||||
end
|
||||
|
@ -52,17 +52,19 @@ module Mongo
|
||||
end
|
||||
|
||||
def close
|
||||
@sockets.each do |sock|
|
||||
begin
|
||||
sock.close
|
||||
rescue IOError => ex
|
||||
warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}"
|
||||
@connection_mutex.synchronize do
|
||||
(@sockets - @checked_out).each do |sock|
|
||||
begin
|
||||
sock.close
|
||||
rescue IOError => ex
|
||||
warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}"
|
||||
end
|
||||
end
|
||||
@host = @port = nil
|
||||
@sockets.clear
|
||||
@pids.clear
|
||||
@checked_out.clear
|
||||
end
|
||||
@host = @port = nil
|
||||
@sockets.clear
|
||||
@pids.clear
|
||||
@checked_out.clear
|
||||
end
|
||||
|
||||
def inspect
|
||||
@ -188,7 +190,7 @@ module Mongo
|
||||
if @pids[socket] != Process.pid
|
||||
@pids[socket] = nil
|
||||
@sockets.delete(socket)
|
||||
socket.close
|
||||
socket.close if socket
|
||||
checkout_new_socket
|
||||
else
|
||||
@checked_out << socket
|
||||
|
@ -23,31 +23,29 @@ module Mongo
|
||||
@members = members
|
||||
end
|
||||
|
||||
# Ensure that the view of the replica set is current by
|
||||
# running the ismaster command and checking to see whether
|
||||
# we've connected to all known nodes. If not, automatically
|
||||
# connect to these unconnected nodes. This is handy when we've
|
||||
# connected to a replica set with no primary or when a secondary
|
||||
# node comes up after we've connected.
|
||||
#
|
||||
# If we're connected to nodes that are no longer part of the set,
|
||||
# remove these from our set of secondary pools.
|
||||
def update_required?(hosts)
|
||||
if !@refresh_node || !@refresh_node.set_config
|
||||
begin
|
||||
@refresh_node = get_valid_seed_node
|
||||
rescue ConnectionFailure
|
||||
warn "Could not refresh config because no valid seed node was available."
|
||||
return
|
||||
end
|
||||
end
|
||||
|
||||
hosts != @refresh_node.node_list
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def initialize_data
|
||||
begin
|
||||
if @primary_pool
|
||||
@primary_pool.close
|
||||
end
|
||||
|
||||
if @secondary_pools
|
||||
@secondary_pools.each do |pool|
|
||||
pool.close
|
||||
end
|
||||
end
|
||||
|
||||
if @members
|
||||
@members.each do |member|
|
||||
member.close
|
||||
end
|
||||
end
|
||||
|
||||
rescue ConnectionFailure
|
||||
end
|
||||
|
||||
@primary = nil
|
||||
@primary_pool = nil
|
||||
@read_pool = nil
|
||||
|
@ -6,6 +6,10 @@ require 'benchmark'
|
||||
class ReplicaSetRefreshTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
|
||||
def setup
|
||||
@conn = nil
|
||||
end
|
||||
|
||||
def teardown
|
||||
RS.restart_killed_nodes
|
||||
@conn.close if @conn
|
||||
@ -98,7 +102,6 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
|
||||
end
|
||||
|
||||
def test_adding_and_removing_nodes
|
||||
puts "ADDING AND REMOVING"
|
||||
@conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]],
|
||||
[RS.host, RS.ports[2]], :refresh_interval => 2, :auto_refresh => true)
|
||||
|
||||
|
@ -21,7 +21,7 @@ class ReplSetManager
|
||||
@config = {"_id" => @name, "members" => []}
|
||||
@durable = opts.fetch(:durable, false)
|
||||
@path = File.join(File.expand_path(File.dirname(__FILE__)), "data")
|
||||
@oplog_size = opts.fetch(:oplog_size, 512)
|
||||
@oplog_size = opts.fetch(:oplog_size, 32)
|
||||
@tags = [{"dc" => "ny", "rack" => "a", "db" => "main"},
|
||||
{"dc" => "ny", "rack" => "b", "db" => "main"},
|
||||
{"dc" => "sf", "rack" => "a", "db" => "main"}]
|
||||
|
Loading…
Reference in New Issue
Block a user