From 8aaed130d6e37b2ba9921ae6d95b4b9c8077c70a Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Wed, 24 Nov 2010 13:49:34 -0500 Subject: [PATCH] Cleanup for distributed reads on replica sets. --- docs/examples/replica_set.rb | 24 +++++++++++++++ lib/mongo/connection.rb | 59 +++++++++++++++++++----------------- lib/mongo/util/pool.rb | 2 +- test/unit/connection_test.rb | 2 +- 4 files changed, 58 insertions(+), 29 deletions(-) create mode 100644 docs/examples/replica_set.rb diff --git a/docs/examples/replica_set.rb b/docs/examples/replica_set.rb new file mode 100644 index 0000000..6ddfd1a --- /dev/null +++ b/docs/examples/replica_set.rb @@ -0,0 +1,24 @@ +# This code assumes a running replica set with at least on node at localhost:27017. +require 'mongo' + +cons = [] + +10.times do + cons << Mongo::Connection.multi([['localhost', 27017]], :read_secondary => true) +end + +ports = cons.map do |con| + con.read_pool.port +end + +puts "These ten connections will read from the following ports:" +p ports + +cons[rand(10)]['foo']['bar'].remove +100.times do |n| + cons[rand(10)]['foo']['bar'].insert({:a => n}) +end + +100.times do |n| + p cons[rand(10)]['foo']['bar'].find_one({:a => n}) +end diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 1146754..e078a94 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -39,20 +39,18 @@ module Mongo MONGODB_URI_SPEC = "mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/database]" attr_reader :logger, :size, :nodes, :auths, :primary, :secondaries, :arbiters, - :safe, :primary_pool, :secondary_pools, :read_pool + :safe, :primary_pool, :read_pool, :secondary_pools # Counter for generating unique request ids. @@current_request_id = 0 - # Create a connection to MongoDB. + # Create a connection to single MongoDB instance. # - # If connecting to just one server, you may specify whether connection to slave is permitted. + # You may specify whether connection to slave is permitted. # In all cases, the default host is "localhost" and the default port is 27017. # - # To specify more than one host pair to be used as seeds in a replica set - # or replica pair, use Connection.multi. If you're only specifying one node in the - # replica set, you can use Connection.new, as any other host known to the set will be - # cached. + # To specify more than one host pair to be used as seeds in a replica set, + # use Connection.multi. # # Once connected to a replica set, you can find out which nodes are primary, secondary, and # arbiters with the corresponding accessors: Connection#primary, Connection#secondaries, and @@ -69,12 +67,11 @@ module Mongo # @option options [Boolean] :slave_ok (false) Must be set to +true+ when connecting # to a single, slave node. # @option options [Logger, #debug] :logger (nil) Logger instance to receive driver operation log. - # @option options [String] :rs_name (nil) The name of the replica set to connect to. An exception will be - # raised if unable to connect to a replica set with this name. - # @option options [Integer] :pool_size (1) The maximum number of socket connections that can be - # opened to the database. - # @option options [Float] :timeout (5.0) When all of the connections to the pool are checked out, + # @option options [Integer] :pool_size (1) The maximum number of socket connections allowed per + # connection pool. Note: this setting is relevant only for multi-threaded applications. + # @option options [Float] :timeout (5.0) When all of the connections a pool are checked out, # this is the number of seconds to wait for a new connection to be released before throwing an exception. + # Note: this setting is relevant only for multi-threaded applications (which in Ruby are rare). # # @example localhost, 27017 # Connection.new @@ -88,7 +85,7 @@ module Mongo # @example localhost, 3000, where this node may be a slave # Connection.new("localhost", 3000, :slave_ok => true) # - # @see http://www.mongodb.org/display/DOCS/Replica+Pairs+in+Ruby Replica pairs in Ruby + # @see http://api.mongodb.org/ruby/current/file.REPLICA_SETS.html Replica sets in Ruby # # @raise [ReplicaSetConnectionError] This is raised if a replica set name is specified and the # driver fails to connect to a replica set with that name. @@ -113,7 +110,7 @@ module Mongo @id_lock = Mutex.new # Pool size and timeout. - @size = options[:pool_size] || 1 + @pool_size = options[:pool_size] || 1 @timeout = options[:timeout] || 5.0 # Mutex for synchronizing pool access @@ -156,20 +153,24 @@ module Mongo # Initialize a connection to a MongoDB replica set using an array of seed nodes. # - # Note that, even when connecting to a replica set, you can use Connection.new specifying - # just a single node. If the replica set is up, the remaining nodes in the set will be cached - # for failover. + # The seed nodes specified will be used on the initial connection to the replica set, but note + # that this list of nodes will be replced by the list of canonical nodes returned by running the + # is_master command on the replica set. # # @param nodes [Array] An array of arrays, each of which specifies a host and port. - # @param opts Takes the same options as Connection.new + # @param opts [Hash] Any of the available options that can be passed to Connection.new. + # + # @option options [String] :rs_name (nil) The name of the replica set to connect to. An exception will be + # raised if unable to connect to a replica set with this name. + # @option options [Boolean] :read_secondary (false) When true, this connection object will pick a random slave + # to send reads to. # # @example - # Connection.multi([["db1.example.com", 27017], - # ["db2.example.com", 27017]]) + # Connection.multi([["db1.example.com", 27017], ["db2.example.com", 27017]]) # - # @example + # @example This connection will read from a random secondary node. # Connection.multi([["db1.example.com", 27017], ["db2.example.com", 27017], ["db3.example.com", 27017]], - # :pool_size => 20, :timeout => 5) + # :read_secondary => true) # # @return [Mongo::Connection] def self.multi(nodes, opts={}) @@ -182,7 +183,7 @@ module Mongo new(nil, nil, opts) do |con| nodes.map do |node| con.instance_variable_set(:@replica_set, true) - con.instance_variable_set(:@read_secondaries, true) if opts[:read_secondaries] + con.instance_variable_set(:@read_secondary, true) if opts[:read_secondary] con.pair_val_to_connection(node) end end @@ -394,7 +395,7 @@ module Mongo # # @return [Boolean] def slave_ok? - @read_secondaries || @slave_ok + @read_secondary || @slave_ok end # Send a message to MongoDB, adding the necessary headers. @@ -633,7 +634,6 @@ module Mongo # Pick a node randomly from the set of possibly secondaries. def pick_secondary_for_read - srand(Time.now.to_i) if (size = @secondary_pools.size) > 1 @read_pool = @secondary_pools[rand(size)] end @@ -707,7 +707,7 @@ module Mongo def set_primary(node) host, port = *node @primary = [host, port] - @primary_pool = Pool.new(self, host, port) + @primary_pool = Pool.new(self, host, port, :size => @pool_size, :timeout => @timeout) apply_saved_authentication end @@ -718,7 +718,9 @@ module Mongo if config['secondary'] host, port = *node @secondaries << node unless @secondaries.include?(node) - @secondary_pools << Pool.new(self, host, port) if @read_secondaries + if @read_secondary + @secondary_pools << Pool.new(self, host, port, :size => @pool_size, :timeout => @timeout) + end elsif config['arbiterOnly'] @arbiters << node unless @arbiters.include?(node) end @@ -744,6 +746,9 @@ module Mongo [host, port.to_i] end + # Replace the list of seed nodes with the canonical list. + @nodes = new_nodes.clone + @nodes_to_try = new_nodes - @nodes_tried end diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index b1d235c..2bb0d22 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -28,7 +28,7 @@ module Mongo @host, @port = host, port # Pool size and timeout. - @size = options[:pool_size] || 1 + @size = options[:size] || 1 @timeout = options[:timeout] || 5.0 # Mutex for synchronizing pool access diff --git a/test/unit/connection_test.rb b/test/unit/connection_test.rb index 1891323..4edc2b0 100644 --- a/test/unit/connection_test.rb +++ b/test/unit/connection_test.rb @@ -44,7 +44,7 @@ class ConnectionTest < Test::Unit::TestCase context "connecting to a replica set" do setup do TCPSocket.stubs(:new).returns(new_mock_socket('localhost', 27017)) - @conn = Connection.multi([['localhost', 27017]], :connect => false, :read_secondaries => true) + @conn = Connection.multi([['localhost', 27017]], :connect => false, :read_secondary => true) admin_db = new_mock_db @hosts = ['localhost:27018', 'localhost:27019', 'localhost:27020']