diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index a58f4ce..d91e186 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -53,7 +53,7 @@ module Mongo def initialize(name, db, opts={}) if db.is_a?(String) && name.is_a?(Mongo::DB) warn "Warning: the order of parameters to initialize a collection have changed. " + - "Please specify the collection name first, followed by the db. This will be made permanent" + "Please specify the collection name first, followed by the db. This will be made permanent" + "in v2.0." db, name = name, db end diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 63afd39..5e97d2f 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -30,9 +30,6 @@ module Mongo Mutex = ::Mutex ConditionVariable = ::ConditionVariable - # Abort connections if a ConnectionError is raised. - Thread.abort_on_exception = true - DEFAULT_PORT = 27017 STANDARD_HEADER_SIZE = 16 RESPONSE_HEADER_SIZE = 20 diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index bf63c50..012cd3a 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -23,7 +23,7 @@ module Mongo # Instantiates and manages connections to a MongoDB replica set. class ReplSetConnection < Connection attr_reader :nodes, :secondaries, :arbiters, :secondary_pools, - :replica_set_name, :read_pool, :seeds, :tags_to_pools, :refresh_interval + :replica_set_name, :read_pool, :seeds, :tags_to_pools, :refresh_interval, :auto_refresh # Create a connection to a MongoDB replica set. # @@ -62,6 +62,8 @@ module Mongo # prevent you from having to reconnect manually. # @option opts [Integer] :refresh_interval (90) If :auto_refresh is enabled, this is the number of seconds # that the background thread will sleep between calls to check the replica set's state. + # @option opts [Boolean] :require_primary (true) If true, require a primary node for the connection + # to succeed. Otherwise, connection will succeed as long as there's at least one secondary. # # @example Connect to a replica set and provide two seed nodes. Note that the number of seed nodes does # not have to be equal to the number of replica set members. The purpose of seed nodes is to permit @@ -148,6 +150,9 @@ module Mongo @replica_set_name = opts[:name] end + # Require a primary node to connect? + @require_primary = opts.fetch(:require_primary, false) + setup(opts) end @@ -158,6 +163,7 @@ module Mongo # Initiate a connection to the replica set. def connect + log(:debug, "Connecting.") sync_synchronize(:EX) do return if @connected manager = PoolManager.new(self, @seeds) @@ -166,8 +172,10 @@ module Mongo update_config(manager) initiate_auto_refresh - if @primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect. + if @require_primary && @primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect. raise ConnectionFailure, "Failed to connect to primary node." + elsif !@read_pool + raise ConnectionFailure, "Failed to connect to any node." else @connected = true end @@ -205,11 +213,14 @@ module Mongo end background_manager = Thread.current[:background] - if background_manager.update_required?(@hosts) - sync_synchronize(:EX) do - background_manager.connect - update_config(background_manager) - end + + # Return if another thread is already in the process of refreshing. + return if sync_exclusive? + + sync_synchronize(:EX) do + log(:debug, "Refreshing...") + background_manager.connect + update_config(background_manager) end end @@ -259,35 +270,36 @@ module Mongo # Close the connection to the database. # TODO: we should get an exclusive lock here. def close - @connected = false + sync_synchronize(:EX) do + @connected = false + super - super - - if @refresh_thread - @refresh_thread.kill - @refresh_thread = nil - end - - if @nodes - @nodes.each do |member| - member.disconnect + if @refresh_thread + @refresh_thread.kill + @refresh_thread = nil end - end - @nodes = [] - @read_pool = nil - - if @secondary_pools - @secondary_pools.each do |pool| - pool.close + if @nodes + @nodes.each do |member| + member.disconnect + end end - end - @secondaries = [] - @secondary_pools = [] - @arbiters = [] - @tags_to_pools.clear - @sockets_to_pools.clear + @nodes = [] + @read_pool = nil + + if @secondary_pools + @secondary_pools.each do |pool| + pool.close + end + end + + @secondaries = [] + @secondary_pools = [] + @arbiters = [] + @tags_to_pools.clear + @sockets_to_pools.clear + end end # If a ConnectionFailure is raised, this method will be called @@ -342,17 +354,25 @@ module Mongo # if no read pool has been defined. def checkout_reader connect unless connected? + socket = get_socket_from_pool(@read_pool) - sync_synchronize(:SH) do - socket = @read_pool.checkout - @sockets_to_pools[socket] = @read_pool + if !socket + refresh + socket = get_socket_from_pool(@primary_pool) + end + + if socket socket + else + raise ConnectionFailure.new("Could not connect to a node for reading.") end end # Checkout a socket connected to a node with one of # the provided tags. If no such node exists, raise # an exception. + # + # NOTE: will be available in driver release v2.0. def checkout_tagged(tags) sync_synchronize(:SH) do tags.each do |k, v| @@ -372,13 +392,33 @@ module Mongo # Checkout a socket for writing (i.e., a primary node). def checkout_writer connect unless connected? + socket = get_socket_from_pool(@primary_pool) - sync_synchronize(:SH) do - if @primary_pool - socket = @primary_pool.checkout - @sockets_to_pools[socket] = @primary_pool - socket + if !socket + refresh + socket = get_socket_from_pool(@primary_pool) + end + + if socket + socket + else + raise ConnectionFailure.new("Could not connect to primary node.") + end + end + + def get_socket_from_pool(pool) + begin + sync_synchronize(:SH) do + if pool + socket = pool.checkout + @sockets_to_pools[socket] = pool + socket + end end + + rescue ConnectionFailure => ex + log(:info, "Failed to checkout from #{pool} with #{ex.class}; #{ex.message}") + return nil end end @@ -397,8 +437,12 @@ module Mongo end def checkin(socket) - if pool = @sockets_to_pools[socket] - pool.checkin(socket) + sync_synchronize(:SH) do + if pool = @sockets_to_pools[socket] + pool.checkin(socket) + elsif socket + socket.close + end end end end diff --git a/lib/mongo/util/logging.rb b/lib/mongo/util/logging.rb index 714565a..e3fde4c 100644 --- a/lib/mongo/util/logging.rb +++ b/lib/mongo/util/logging.rb @@ -2,7 +2,7 @@ module Mongo module Logging # Log a message with the given level. - def log(level, message) + def log(level, msg) return unless @logger case level when :debug then diff --git a/lib/mongo/util/node.rb b/lib/mongo/util/node.rb index fc7f3fd..5b4c81c 100644 --- a/lib/mongo/util/node.rb +++ b/lib/mongo/util/node.rb @@ -20,6 +20,10 @@ module Mongo end alias :== :eql? + def close + self.socket.close if self.socket + end + def host_string address end diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index 0f962af..13f3f15 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -52,17 +52,19 @@ module Mongo end def close - @sockets.each do |sock| - begin - sock.close - rescue IOError => ex - warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}" + @connection_mutex.synchronize do + (@sockets - @checked_out).each do |sock| + begin + sock.close + rescue IOError => ex + warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}" + end end + @host = @port = nil + @sockets.clear + @pids.clear + @checked_out.clear end - @host = @port = nil - @sockets.clear - @pids.clear - @checked_out.clear end def inspect @@ -188,7 +190,7 @@ module Mongo if @pids[socket] != Process.pid @pids[socket] = nil @sockets.delete(socket) - socket.close + socket.close if socket checkout_new_socket else @checked_out << socket diff --git a/lib/mongo/util/pool_manager.rb b/lib/mongo/util/pool_manager.rb index a7ce367..0a57a6e 100644 --- a/lib/mongo/util/pool_manager.rb +++ b/lib/mongo/util/pool_manager.rb @@ -23,31 +23,29 @@ module Mongo @members = members end - # Ensure that the view of the replica set is current by - # running the ismaster command and checking to see whether - # we've connected to all known nodes. If not, automatically - # connect to these unconnected nodes. This is handy when we've - # connected to a replica set with no primary or when a secondary - # node comes up after we've connected. - # - # If we're connected to nodes that are no longer part of the set, - # remove these from our set of secondary pools. - def update_required?(hosts) - if !@refresh_node || !@refresh_node.set_config - begin - @refresh_node = get_valid_seed_node - rescue ConnectionFailure - warn "Could not refresh config because no valid seed node was available." - return - end - end - - hosts != @refresh_node.node_list - end - private def initialize_data + begin + if @primary_pool + @primary_pool.close + end + + if @secondary_pools + @secondary_pools.each do |pool| + pool.close + end + end + + if @members + @members.each do |member| + member.close + end + end + + rescue ConnectionFailure + end + @primary = nil @primary_pool = nil @read_pool = nil diff --git a/test/replica_sets/refresh_test.rb b/test/replica_sets/refresh_test.rb index f35d04d..7368574 100644 --- a/test/replica_sets/refresh_test.rb +++ b/test/replica_sets/refresh_test.rb @@ -6,6 +6,10 @@ require 'benchmark' class ReplicaSetRefreshTest < Test::Unit::TestCase include Mongo + def setup + @conn = nil + end + def teardown RS.restart_killed_nodes @conn.close if @conn @@ -98,7 +102,6 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase end def test_adding_and_removing_nodes - puts "ADDING AND REMOVING" @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], [RS.host, RS.ports[2]], :refresh_interval => 2, :auto_refresh => true) diff --git a/test/tools/repl_set_manager.rb b/test/tools/repl_set_manager.rb index 5ee8e53..6369648 100644 --- a/test/tools/repl_set_manager.rb +++ b/test/tools/repl_set_manager.rb @@ -21,7 +21,7 @@ class ReplSetManager @config = {"_id" => @name, "members" => []} @durable = opts.fetch(:durable, false) @path = File.join(File.expand_path(File.dirname(__FILE__)), "data") - @oplog_size = opts.fetch(:oplog_size, 512) + @oplog_size = opts.fetch(:oplog_size, 32) @tags = [{"dc" => "ny", "rack" => "a", "db" => "main"}, {"dc" => "ny", "rack" => "b", "db" => "main"}, {"dc" => "sf", "rack" => "a", "db" => "main"}]