2010-12-10 21:00:35 +00:00
|
|
|
# encoding: UTF-8
|
|
|
|
|
|
|
|
# --
|
2011-01-17 17:26:32 +00:00
|
|
|
# Copyright (C) 2008-2011 10gen Inc.
|
2010-12-10 21:00:35 +00:00
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
# ++
|
|
|
|
|
|
|
|
module Mongo
|
|
|
|
|
2010-12-15 19:14:06 +00:00
|
|
|
# Instantiates and manages connections to a MongoDB replica set.
|
2010-12-13 19:07:32 +00:00
|
|
|
class ReplSetConnection < Connection
|
2011-08-24 22:34:00 +00:00
|
|
|
attr_reader :nodes, :secondaries, :arbiters, :secondary_pools,
|
|
|
|
:replica_set_name, :read_pool
|
2010-12-10 21:00:35 +00:00
|
|
|
|
2010-12-15 19:14:06 +00:00
|
|
|
# Create a connection to a MongoDB replica set.
|
|
|
|
#
|
|
|
|
# Once connected to a replica set, you can find out which nodes are primary, secondary, and
|
|
|
|
# arbiters with the corresponding accessors: Connection#primary, Connection#secondaries, and
|
|
|
|
# Connection#arbiters. This is useful if your application needs to connect manually to nodes other
|
|
|
|
# than the primary.
|
|
|
|
#
|
2011-03-12 13:40:29 +00:00
|
|
|
# @param [Array] args A list of host-port pairs to be used as seed nodes followed by a
|
|
|
|
# hash containing any options. See the examples below for exactly how to use the constructor.
|
2010-12-15 19:14:06 +00:00
|
|
|
#
|
2010-12-30 20:40:50 +00:00
|
|
|
# @option options [String] :rs_name (nil) The name of the replica set to connect to. You
|
|
|
|
# can use this option to verify that you're connecting to the right replica set.
|
2010-12-15 19:14:06 +00:00
|
|
|
# @option options [Boolean, Hash] :safe (false) Set the default safe-mode options
|
|
|
|
# propogated to DB objects instantiated off of this Connection. This
|
|
|
|
# default can be overridden upon instantiation of any DB by explicity setting a :safe value
|
|
|
|
# on initialization.
|
|
|
|
# @option options [Boolean] :read_secondary(false) If true, a random secondary node will be chosen,
|
|
|
|
# and all reads will be directed to that node.
|
|
|
|
# @option options [Logger, #debug] :logger (nil) Logger instance to receive driver operation log.
|
|
|
|
# @option options [Integer] :pool_size (1) The maximum number of socket connections allowed per
|
|
|
|
# connection pool. Note: this setting is relevant only for multi-threaded applications.
|
2011-06-15 20:17:42 +00:00
|
|
|
# @option options [Float] :pool_timeout (5.0) When all of the connections a pool are checked out,
|
2010-12-15 19:14:06 +00:00
|
|
|
# this is the number of seconds to wait for a new connection to be released before throwing an exception.
|
|
|
|
# Note: this setting is relevant only for multi-threaded applications.
|
2011-06-15 18:20:11 +00:00
|
|
|
# @option opts [Float] :op_timeout (nil) The number of seconds to wait for a read operation to time out.
|
|
|
|
# Disabled by default.
|
|
|
|
# @option opts [Float] :connect_timeout (nil) The number of seconds to wait before timing out a
|
|
|
|
# connection attempt.
|
2011-08-24 22:34:00 +00:00
|
|
|
# @option opts [Boolean] :auto_refresh (false) Set this to true to enable a background thread that
|
|
|
|
# periodically updates the state of the connection. If, for example, you initially connect while a secondary
|
|
|
|
# is down, :auto_refresh will reconnect to that secondary behind the scenes to
|
|
|
|
# prevent you from having to reconnect manually.
|
|
|
|
# @option opts [Integer] :refresh_interval (90) If :auto_refresh is enabled, this is the number of seconds
|
|
|
|
# that the background thread will sleep between calls to check the replica set's state.
|
2010-12-15 19:14:06 +00:00
|
|
|
#
|
2011-03-12 13:40:29 +00:00
|
|
|
# @example Connect to a replica set and provide two seed nodes. Note that the number of seed nodes does
|
|
|
|
# not have to be equal to the number of replica set members. The purpose of seed nodes is to permit
|
|
|
|
# the driver to find at least one replica set member even if a member is down.
|
2010-12-15 19:14:06 +00:00
|
|
|
# ReplSetConnection.new(['localhost', 30000], ['localhost', 30001])
|
|
|
|
#
|
2011-01-05 16:30:20 +00:00
|
|
|
# @example Connect to a replica set providing two seed nodes and ensuring a connection to the replica set named 'prod':
|
|
|
|
# ReplSetConnection.new(['localhost', 30000], ['localhost', 30001], :rs_name => 'prod')
|
|
|
|
#
|
|
|
|
# @example Connect to a replica set providing two seed nodes and allowing reads from a secondary node:
|
2010-12-15 19:14:06 +00:00
|
|
|
# ReplSetConnection.new(['localhost', 30000], ['localhost', 30001], :read_secondary => true)
|
|
|
|
#
|
|
|
|
# @see http://api.mongodb.org/ruby/current/file.REPLICA_SETS.html Replica sets in Ruby
|
|
|
|
#
|
|
|
|
# @raise [ReplicaSetConnectionError] This is raised if a replica set name is specified and the
|
|
|
|
# driver fails to connect to a replica set with that name.
|
2010-12-10 21:00:35 +00:00
|
|
|
def initialize(*args)
|
|
|
|
if args.last.is_a?(Hash)
|
2010-12-13 19:07:32 +00:00
|
|
|
opts = args.pop
|
|
|
|
else
|
|
|
|
opts = {}
|
|
|
|
end
|
|
|
|
|
|
|
|
unless args.length > 0
|
2011-08-24 22:34:00 +00:00
|
|
|
raise MongoArgumentError, "A ReplSetConnection requires at least one seed node."
|
2010-12-10 21:00:35 +00:00
|
|
|
end
|
|
|
|
|
2011-08-24 22:34:00 +00:00
|
|
|
# The list of seed nodes
|
2011-08-16 20:47:07 +00:00
|
|
|
@seeds = args
|
2010-12-10 21:00:35 +00:00
|
|
|
|
2011-08-16 20:47:07 +00:00
|
|
|
# The members of the replica set, stored as instances of Mongo::Node.
|
2011-08-22 15:52:11 +00:00
|
|
|
@nodes = []
|
2010-12-10 21:00:35 +00:00
|
|
|
|
2011-08-24 22:34:00 +00:00
|
|
|
# Connection pool for primary node
|
2011-08-16 20:47:07 +00:00
|
|
|
@primary = nil
|
|
|
|
@primary_pool = nil
|
2010-12-10 21:00:35 +00:00
|
|
|
|
|
|
|
# Connection pools for each secondary node
|
2011-08-16 20:47:07 +00:00
|
|
|
@secondaries = []
|
2010-12-10 21:00:35 +00:00
|
|
|
@secondary_pools = []
|
2011-08-16 20:47:07 +00:00
|
|
|
|
|
|
|
# The secondary pool to which we'll be sending reads.
|
2011-08-24 22:34:00 +00:00
|
|
|
# This may be identical to the primary pool.
|
2010-12-10 21:00:35 +00:00
|
|
|
@read_pool = nil
|
|
|
|
|
2011-08-24 22:34:00 +00:00
|
|
|
# A list of arbiter addresses (for client information only)
|
2011-08-16 20:47:07 +00:00
|
|
|
@arbiters = []
|
|
|
|
|
2011-08-24 22:34:00 +00:00
|
|
|
# Refresh
|
|
|
|
@auto_refresh = opts.fetch(:auto_refresh, true)
|
|
|
|
@refresh_interval = opts[:refresh_interval] || 90
|
2011-08-22 15:52:11 +00:00
|
|
|
|
2010-12-13 19:07:32 +00:00
|
|
|
# Are we allowing reads from secondaries?
|
|
|
|
@read_secondary = opts.fetch(:read_secondary, false)
|
|
|
|
|
2011-08-24 22:34:00 +00:00
|
|
|
# Lock around changes to the global config
|
|
|
|
@connection_lock = Mutex.new
|
|
|
|
@connected = false
|
|
|
|
|
|
|
|
# Store the refresher thread
|
|
|
|
@refresh_thread = nil
|
|
|
|
|
2011-08-16 20:47:07 +00:00
|
|
|
# Replica set name
|
|
|
|
if opts[:rs_name]
|
|
|
|
warn ":rs_name option has been deprecated and will be removed in 2.0. " +
|
|
|
|
"Please use :name instead."
|
|
|
|
@replica_set_name = opts[:rs_name]
|
|
|
|
else
|
|
|
|
@replica_set_name = opts[:name]
|
|
|
|
end
|
|
|
|
|
2010-12-13 19:07:32 +00:00
|
|
|
setup(opts)
|
2010-12-10 21:00:35 +00:00
|
|
|
end
|
|
|
|
|
2011-08-24 22:34:00 +00:00
|
|
|
# Initiate a connection to the replica set.
|
2010-12-10 21:00:35 +00:00
|
|
|
def connect
|
2011-08-24 22:34:00 +00:00
|
|
|
@connection_lock.synchronize do
|
|
|
|
return if @connected
|
|
|
|
manager = PoolManager.new(self, @seeds)
|
|
|
|
manager.connect
|
2010-12-10 21:00:35 +00:00
|
|
|
|
2011-08-24 22:34:00 +00:00
|
|
|
update_config(manager)
|
|
|
|
#BSON::BSON_CODER.update_max_bson_size(self)
|
|
|
|
initiate_auto_refresh
|
2011-08-16 20:47:07 +00:00
|
|
|
|
2011-08-24 22:34:00 +00:00
|
|
|
if @primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
|
2010-12-14 18:14:45 +00:00
|
|
|
raise ConnectionFailure, "Failed to connect to primary node."
|
2011-08-16 20:47:07 +00:00
|
|
|
else
|
2011-08-24 22:34:00 +00:00
|
|
|
@connected = true
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
# Note: this method must be called from within
|
|
|
|
# a locked @connection_lock
|
|
|
|
def update_config(manager)
|
|
|
|
@arbiters = manager.arbiters.nil? ? [] : manager.arbiters.dup
|
|
|
|
@primary = manager.primary.nil? ? nil : manager.primary.dup
|
|
|
|
@secondaries = manager.secondaries.dup
|
|
|
|
|
|
|
|
@primary_pool = manager.primary_pool
|
|
|
|
@read_pool = manager.read_pool
|
|
|
|
@secondary_pools = manager.secondary_pools
|
|
|
|
@seeds = manager.seeds
|
|
|
|
@manager = manager
|
|
|
|
@hosts = manager.hosts
|
|
|
|
end
|
|
|
|
|
|
|
|
# If ismaster doesn't match our current view
|
|
|
|
# then create a new PoolManager, passing in our
|
|
|
|
# existing view. It should be able to do the diff.
|
|
|
|
# Then take out the connection lock and replace
|
|
|
|
# our current values.
|
|
|
|
def refresh
|
|
|
|
background_manager = PoolManager.new(self, @seeds)
|
|
|
|
|
|
|
|
if update_struct = background_manager.update_required?(@hosts)
|
|
|
|
@connection_lock.synchronize do
|
|
|
|
background_manager.update(@manager, update_struct)
|
|
|
|
update_config(background_manager)
|
2010-12-14 18:14:45 +00:00
|
|
|
end
|
|
|
|
end
|
2010-12-10 21:00:35 +00:00
|
|
|
end
|
|
|
|
|
2011-08-16 20:47:07 +00:00
|
|
|
def connected?
|
2011-08-24 22:34:00 +00:00
|
|
|
@connected && !@connection_lock.locked?
|
2011-08-16 20:47:07 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
# @deprecated
|
2010-12-10 21:00:35 +00:00
|
|
|
def connecting?
|
2011-08-16 20:47:07 +00:00
|
|
|
false
|
2010-12-10 21:00:35 +00:00
|
|
|
end
|
|
|
|
|
2011-03-23 20:34:42 +00:00
|
|
|
# The replica set primary's host name.
|
|
|
|
#
|
|
|
|
# @return [String]
|
|
|
|
def host
|
|
|
|
super
|
|
|
|
end
|
|
|
|
|
|
|
|
# The replica set primary's port.
|
|
|
|
#
|
|
|
|
# @return [Integer]
|
|
|
|
def port
|
|
|
|
super
|
|
|
|
end
|
|
|
|
|
2010-12-29 18:01:05 +00:00
|
|
|
# Determine whether we're reading from a primary node. If false,
|
|
|
|
# this connection connects to a secondary node and @read_secondaries is true.
|
|
|
|
#
|
|
|
|
# @return [Boolean]
|
|
|
|
def read_primary?
|
2011-08-24 22:34:00 +00:00
|
|
|
@read_pool == @primary_pool
|
2010-12-29 18:01:05 +00:00
|
|
|
end
|
2011-01-17 18:37:41 +00:00
|
|
|
alias :primary? :read_primary?
|
2010-12-29 18:01:05 +00:00
|
|
|
|
2010-12-10 21:00:35 +00:00
|
|
|
# Close the connection to the database.
|
|
|
|
def close
|
|
|
|
super
|
2011-08-24 22:34:00 +00:00
|
|
|
|
|
|
|
@connected = false
|
|
|
|
if @refresh_thread
|
|
|
|
@refresh_thread.kill
|
|
|
|
@refresh_thread = nil
|
|
|
|
end
|
|
|
|
|
2011-08-22 15:52:11 +00:00
|
|
|
@nodes.each do |member|
|
2011-08-16 20:47:07 +00:00
|
|
|
member.disconnect
|
|
|
|
end
|
2011-08-24 22:34:00 +00:00
|
|
|
|
2011-08-22 15:52:11 +00:00
|
|
|
@nodes = []
|
2010-12-10 21:00:35 +00:00
|
|
|
@read_pool = nil
|
2011-08-24 22:34:00 +00:00
|
|
|
|
2010-12-10 21:00:35 +00:00
|
|
|
@secondary_pools.each do |pool|
|
|
|
|
pool.close
|
|
|
|
end
|
2011-08-24 22:34:00 +00:00
|
|
|
|
2010-12-10 21:00:35 +00:00
|
|
|
@secondaries = []
|
|
|
|
@secondary_pools = []
|
|
|
|
@arbiters = []
|
|
|
|
end
|
|
|
|
|
2011-03-29 16:18:58 +00:00
|
|
|
# If a ConnectionFailure is raised, this method will be called
|
|
|
|
# to close the connection and reset connection values.
|
|
|
|
# @deprecated
|
|
|
|
def reset_connection
|
|
|
|
close
|
|
|
|
warn "ReplSetConnection#reset_connection is now deprecated. " +
|
|
|
|
"Use ReplSetConnection#close instead."
|
|
|
|
end
|
|
|
|
|
2010-12-15 17:36:43 +00:00
|
|
|
# Is it okay to connect to a slave?
|
|
|
|
#
|
|
|
|
# @return [Boolean]
|
|
|
|
def slave_ok?
|
2011-06-15 15:31:10 +00:00
|
|
|
@read_secondary
|
2010-12-15 17:36:43 +00:00
|
|
|
end
|
|
|
|
|
2011-01-31 19:47:05 +00:00
|
|
|
def authenticate_pools
|
|
|
|
super
|
|
|
|
@secondary_pools.each do |pool|
|
|
|
|
pool.authenticate_existing
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
def logout_pools(db)
|
|
|
|
super
|
|
|
|
@secondary_pools.each do |pool|
|
|
|
|
pool.logout_existing(db)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2010-12-10 21:00:35 +00:00
|
|
|
private
|
|
|
|
|
2011-08-24 22:34:00 +00:00
|
|
|
def initiate_auto_refresh
|
|
|
|
return if @refresh_thread && @refresh_thread.alive?
|
|
|
|
@refresh_thread = Thread.new do
|
|
|
|
sleep(@refresh_interval)
|
|
|
|
refresh
|
2010-12-10 21:00:35 +00:00
|
|
|
end
|
2011-08-22 15:52:11 +00:00
|
|
|
end
|
|
|
|
|
2010-12-15 17:36:43 +00:00
|
|
|
# Checkout a socket for reading (i.e., a secondary node).
|
2011-08-24 22:34:00 +00:00
|
|
|
# Note that @read_pool might point to the primary pool
|
|
|
|
# if no read pool has been defined. That's okay; we don't
|
|
|
|
# want to have to check for the existence of the @read_pool
|
|
|
|
# because that introduces concurrency issues.
|
2010-12-15 17:36:43 +00:00
|
|
|
def checkout_reader
|
|
|
|
connect unless connected?
|
|
|
|
|
2011-08-22 15:52:11 +00:00
|
|
|
if @read_secondary && @read_pool
|
2011-08-24 22:34:00 +00:00
|
|
|
begin
|
|
|
|
return @read_pool.checkout
|
|
|
|
rescue NoMethodError
|
|
|
|
warn "Read pool was not available."
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
begin
|
|
|
|
return @primary_pool.checkout
|
|
|
|
rescue NoMethodError
|
|
|
|
raise ConnectionFailure, "Not connected to any nodes."
|
2010-12-15 17:36:43 +00:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
# Checkout a socket for writing (i.e., a primary node).
|
|
|
|
def checkout_writer
|
|
|
|
connect unless connected?
|
|
|
|
|
2011-08-24 22:34:00 +00:00
|
|
|
if @primary_pool
|
|
|
|
begin
|
|
|
|
return @primary_pool.checkout
|
|
|
|
rescue NoMethodError
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
raise ConnectionFailure, "Failed to connect to primary node."
|
2010-12-15 17:36:43 +00:00
|
|
|
end
|
|
|
|
|
|
|
|
# Checkin a socket used for reading.
|
|
|
|
def checkin_reader(socket)
|
2011-08-24 22:34:00 +00:00
|
|
|
if @read_secondary
|
2010-12-15 17:36:43 +00:00
|
|
|
@read_pool.checkin(socket)
|
|
|
|
else
|
|
|
|
checkin_writer(socket)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
# Checkin a socket used for writing.
|
|
|
|
def checkin_writer(socket)
|
|
|
|
if @primary_pool
|
|
|
|
@primary_pool.checkin(socket)
|
|
|
|
end
|
|
|
|
end
|
2010-12-10 21:00:35 +00:00
|
|
|
end
|
|
|
|
end
|