Cleanup for distributed reads on replica sets.
This commit is contained in:
parent
c5862da061
commit
8aaed130d6
|
@ -0,0 +1,24 @@
|
||||||
|
# This code assumes a running replica set with at least on node at localhost:27017.
|
||||||
|
require 'mongo'
|
||||||
|
|
||||||
|
cons = []
|
||||||
|
|
||||||
|
10.times do
|
||||||
|
cons << Mongo::Connection.multi([['localhost', 27017]], :read_secondary => true)
|
||||||
|
end
|
||||||
|
|
||||||
|
ports = cons.map do |con|
|
||||||
|
con.read_pool.port
|
||||||
|
end
|
||||||
|
|
||||||
|
puts "These ten connections will read from the following ports:"
|
||||||
|
p ports
|
||||||
|
|
||||||
|
cons[rand(10)]['foo']['bar'].remove
|
||||||
|
100.times do |n|
|
||||||
|
cons[rand(10)]['foo']['bar'].insert({:a => n})
|
||||||
|
end
|
||||||
|
|
||||||
|
100.times do |n|
|
||||||
|
p cons[rand(10)]['foo']['bar'].find_one({:a => n})
|
||||||
|
end
|
|
@ -39,20 +39,18 @@ module Mongo
|
||||||
MONGODB_URI_SPEC = "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/database]"
|
MONGODB_URI_SPEC = "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/database]"
|
||||||
|
|
||||||
attr_reader :logger, :size, :nodes, :auths, :primary, :secondaries, :arbiters,
|
attr_reader :logger, :size, :nodes, :auths, :primary, :secondaries, :arbiters,
|
||||||
:safe, :primary_pool, :secondary_pools, :read_pool
|
:safe, :primary_pool, :read_pool, :secondary_pools
|
||||||
|
|
||||||
# Counter for generating unique request ids.
|
# Counter for generating unique request ids.
|
||||||
@@current_request_id = 0
|
@@current_request_id = 0
|
||||||
|
|
||||||
# Create a connection to MongoDB.
|
# Create a connection to single MongoDB instance.
|
||||||
#
|
#
|
||||||
# If connecting to just one server, you may specify whether connection to slave is permitted.
|
# You may specify whether connection to slave is permitted.
|
||||||
# In all cases, the default host is "localhost" and the default port is 27017.
|
# In all cases, the default host is "localhost" and the default port is 27017.
|
||||||
#
|
#
|
||||||
# To specify more than one host pair to be used as seeds in a replica set
|
# To specify more than one host pair to be used as seeds in a replica set,
|
||||||
# or replica pair, use Connection.multi. If you're only specifying one node in the
|
# use Connection.multi.
|
||||||
# replica set, you can use Connection.new, as any other host known to the set will be
|
|
||||||
# cached.
|
|
||||||
#
|
#
|
||||||
# Once connected to a replica set, you can find out which nodes are primary, secondary, and
|
# Once connected to a replica set, you can find out which nodes are primary, secondary, and
|
||||||
# arbiters with the corresponding accessors: Connection#primary, Connection#secondaries, and
|
# arbiters with the corresponding accessors: Connection#primary, Connection#secondaries, and
|
||||||
|
@ -69,12 +67,11 @@ module Mongo
|
||||||
# @option options [Boolean] :slave_ok (false) Must be set to +true+ when connecting
|
# @option options [Boolean] :slave_ok (false) Must be set to +true+ when connecting
|
||||||
# to a single, slave node.
|
# to a single, slave node.
|
||||||
# @option options [Logger, #debug] :logger (nil) Logger instance to receive driver operation log.
|
# @option options [Logger, #debug] :logger (nil) Logger instance to receive driver operation log.
|
||||||
# @option options [String] :rs_name (nil) The name of the replica set to connect to. An exception will be
|
# @option options [Integer] :pool_size (1) The maximum number of socket connections allowed per
|
||||||
# raised if unable to connect to a replica set with this name.
|
# connection pool. Note: this setting is relevant only for multi-threaded applications.
|
||||||
# @option options [Integer] :pool_size (1) The maximum number of socket connections that can be
|
# @option options [Float] :timeout (5.0) When all of the connections a pool are checked out,
|
||||||
# opened to the database.
|
|
||||||
# @option options [Float] :timeout (5.0) When all of the connections to the pool are checked out,
|
|
||||||
# this is the number of seconds to wait for a new connection to be released before throwing an exception.
|
# this is the number of seconds to wait for a new connection to be released before throwing an exception.
|
||||||
|
# Note: this setting is relevant only for multi-threaded applications (which in Ruby are rare).
|
||||||
#
|
#
|
||||||
# @example localhost, 27017
|
# @example localhost, 27017
|
||||||
# Connection.new
|
# Connection.new
|
||||||
|
@ -88,7 +85,7 @@ module Mongo
|
||||||
# @example localhost, 3000, where this node may be a slave
|
# @example localhost, 3000, where this node may be a slave
|
||||||
# Connection.new("localhost", 3000, :slave_ok => true)
|
# Connection.new("localhost", 3000, :slave_ok => true)
|
||||||
#
|
#
|
||||||
# @see http://www.mongodb.org/display/DOCS/Replica+Pairs+in+Ruby Replica pairs in Ruby
|
# @see http://api.mongodb.org/ruby/current/file.REPLICA_SETS.html Replica sets in Ruby
|
||||||
#
|
#
|
||||||
# @raise [ReplicaSetConnectionError] This is raised if a replica set name is specified and the
|
# @raise [ReplicaSetConnectionError] This is raised if a replica set name is specified and the
|
||||||
# driver fails to connect to a replica set with that name.
|
# driver fails to connect to a replica set with that name.
|
||||||
|
@ -113,7 +110,7 @@ module Mongo
|
||||||
@id_lock = Mutex.new
|
@id_lock = Mutex.new
|
||||||
|
|
||||||
# Pool size and timeout.
|
# Pool size and timeout.
|
||||||
@size = options[:pool_size] || 1
|
@pool_size = options[:pool_size] || 1
|
||||||
@timeout = options[:timeout] || 5.0
|
@timeout = options[:timeout] || 5.0
|
||||||
|
|
||||||
# Mutex for synchronizing pool access
|
# Mutex for synchronizing pool access
|
||||||
|
@ -156,20 +153,24 @@ module Mongo
|
||||||
|
|
||||||
# Initialize a connection to a MongoDB replica set using an array of seed nodes.
|
# Initialize a connection to a MongoDB replica set using an array of seed nodes.
|
||||||
#
|
#
|
||||||
# Note that, even when connecting to a replica set, you can use Connection.new specifying
|
# The seed nodes specified will be used on the initial connection to the replica set, but note
|
||||||
# just a single node. If the replica set is up, the remaining nodes in the set will be cached
|
# that this list of nodes will be replced by the list of canonical nodes returned by running the
|
||||||
# for failover.
|
# is_master command on the replica set.
|
||||||
#
|
#
|
||||||
# @param nodes [Array] An array of arrays, each of which specifies a host and port.
|
# @param nodes [Array] An array of arrays, each of which specifies a host and port.
|
||||||
# @param opts Takes the same options as Connection.new
|
# @param opts [Hash] Any of the available options that can be passed to Connection.new.
|
||||||
|
#
|
||||||
|
# @option options [String] :rs_name (nil) The name of the replica set to connect to. An exception will be
|
||||||
|
# raised if unable to connect to a replica set with this name.
|
||||||
|
# @option options [Boolean] :read_secondary (false) When true, this connection object will pick a random slave
|
||||||
|
# to send reads to.
|
||||||
#
|
#
|
||||||
# @example
|
# @example
|
||||||
# Connection.multi([["db1.example.com", 27017],
|
# Connection.multi([["db1.example.com", 27017], ["db2.example.com", 27017]])
|
||||||
# ["db2.example.com", 27017]])
|
|
||||||
#
|
#
|
||||||
# @example
|
# @example This connection will read from a random secondary node.
|
||||||
# Connection.multi([["db1.example.com", 27017], ["db2.example.com", 27017], ["db3.example.com", 27017]],
|
# Connection.multi([["db1.example.com", 27017], ["db2.example.com", 27017], ["db3.example.com", 27017]],
|
||||||
# :pool_size => 20, :timeout => 5)
|
# :read_secondary => true)
|
||||||
#
|
#
|
||||||
# @return [Mongo::Connection]
|
# @return [Mongo::Connection]
|
||||||
def self.multi(nodes, opts={})
|
def self.multi(nodes, opts={})
|
||||||
|
@ -182,7 +183,7 @@ module Mongo
|
||||||
new(nil, nil, opts) do |con|
|
new(nil, nil, opts) do |con|
|
||||||
nodes.map do |node|
|
nodes.map do |node|
|
||||||
con.instance_variable_set(:@replica_set, true)
|
con.instance_variable_set(:@replica_set, true)
|
||||||
con.instance_variable_set(:@read_secondaries, true) if opts[:read_secondaries]
|
con.instance_variable_set(:@read_secondary, true) if opts[:read_secondary]
|
||||||
con.pair_val_to_connection(node)
|
con.pair_val_to_connection(node)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -394,7 +395,7 @@ module Mongo
|
||||||
#
|
#
|
||||||
# @return [Boolean]
|
# @return [Boolean]
|
||||||
def slave_ok?
|
def slave_ok?
|
||||||
@read_secondaries || @slave_ok
|
@read_secondary || @slave_ok
|
||||||
end
|
end
|
||||||
|
|
||||||
# Send a message to MongoDB, adding the necessary headers.
|
# Send a message to MongoDB, adding the necessary headers.
|
||||||
|
@ -633,7 +634,6 @@ module Mongo
|
||||||
|
|
||||||
# Pick a node randomly from the set of possibly secondaries.
|
# Pick a node randomly from the set of possibly secondaries.
|
||||||
def pick_secondary_for_read
|
def pick_secondary_for_read
|
||||||
srand(Time.now.to_i)
|
|
||||||
if (size = @secondary_pools.size) > 1
|
if (size = @secondary_pools.size) > 1
|
||||||
@read_pool = @secondary_pools[rand(size)]
|
@read_pool = @secondary_pools[rand(size)]
|
||||||
end
|
end
|
||||||
|
@ -707,7 +707,7 @@ module Mongo
|
||||||
def set_primary(node)
|
def set_primary(node)
|
||||||
host, port = *node
|
host, port = *node
|
||||||
@primary = [host, port]
|
@primary = [host, port]
|
||||||
@primary_pool = Pool.new(self, host, port)
|
@primary_pool = Pool.new(self, host, port, :size => @pool_size, :timeout => @timeout)
|
||||||
apply_saved_authentication
|
apply_saved_authentication
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -718,7 +718,9 @@ module Mongo
|
||||||
if config['secondary']
|
if config['secondary']
|
||||||
host, port = *node
|
host, port = *node
|
||||||
@secondaries << node unless @secondaries.include?(node)
|
@secondaries << node unless @secondaries.include?(node)
|
||||||
@secondary_pools << Pool.new(self, host, port) if @read_secondaries
|
if @read_secondary
|
||||||
|
@secondary_pools << Pool.new(self, host, port, :size => @pool_size, :timeout => @timeout)
|
||||||
|
end
|
||||||
elsif config['arbiterOnly']
|
elsif config['arbiterOnly']
|
||||||
@arbiters << node unless @arbiters.include?(node)
|
@arbiters << node unless @arbiters.include?(node)
|
||||||
end
|
end
|
||||||
|
@ -744,6 +746,9 @@ module Mongo
|
||||||
[host, port.to_i]
|
[host, port.to_i]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Replace the list of seed nodes with the canonical list.
|
||||||
|
@nodes = new_nodes.clone
|
||||||
|
|
||||||
@nodes_to_try = new_nodes - @nodes_tried
|
@nodes_to_try = new_nodes - @nodes_tried
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ module Mongo
|
||||||
@host, @port = host, port
|
@host, @port = host, port
|
||||||
|
|
||||||
# Pool size and timeout.
|
# Pool size and timeout.
|
||||||
@size = options[:pool_size] || 1
|
@size = options[:size] || 1
|
||||||
@timeout = options[:timeout] || 5.0
|
@timeout = options[:timeout] || 5.0
|
||||||
|
|
||||||
# Mutex for synchronizing pool access
|
# Mutex for synchronizing pool access
|
||||||
|
|
|
@ -44,7 +44,7 @@ class ConnectionTest < Test::Unit::TestCase
|
||||||
context "connecting to a replica set" do
|
context "connecting to a replica set" do
|
||||||
setup do
|
setup do
|
||||||
TCPSocket.stubs(:new).returns(new_mock_socket('localhost', 27017))
|
TCPSocket.stubs(:new).returns(new_mock_socket('localhost', 27017))
|
||||||
@conn = Connection.multi([['localhost', 27017]], :connect => false, :read_secondaries => true)
|
@conn = Connection.multi([['localhost', 27017]], :connect => false, :read_secondary => true)
|
||||||
|
|
||||||
admin_db = new_mock_db
|
admin_db = new_mock_db
|
||||||
@hosts = ['localhost:27018', 'localhost:27019', 'localhost:27020']
|
@hosts = ['localhost:27018', 'localhost:27019', 'localhost:27020']
|
||||||
|
|
Loading…
Reference in New Issue