diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index 059e0f5..7505bdf 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -114,6 +114,7 @@ module Mongo # No connection manager by default. @manager = nil + @old_managers = [] # Lock for request ids. @id_lock = Mutex.new @@ -125,6 +126,7 @@ module Mongo @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new} @connect_mutex = Mutex.new + @refresh_mutex = Mutex.new check_opts(opts) setup(opts) @@ -147,14 +149,15 @@ module Mongo discovered_seeds = @manager ? @manager.seeds : [] @manager = PoolManager.new(self, discovered_seeds) + Thread.current[:manager] = @manager @manager.connect @refresh_version += 1 - if @require_primary && self.primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect. + if @require_primary && @manager.primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect. close raise ConnectionFailure, "Failed to connect to primary node." - elsif self.read_pool.nil? + elsif @manager.read_pool.nil? close raise ConnectionFailure, "Failed to connect to any node." else @@ -196,19 +199,19 @@ module Mongo def hard_refresh! log(:info, "Initiating hard refresh...") discovered_seeds = @manager ? @manager.seeds : [] - background_manager = PoolManager.new(self, discovered_seeds | @seeds) - background_manager.connect + new_manager = PoolManager.new(self, discovered_seeds | @seeds) + new_manager.connect # TODO: make sure that connect has succeeded - old_manager = @manager - @manager = background_manager - old_manager.close(:soft => true) + @old_managers << @manager + @manager = new_manager + @refresh_version += 1 return true end def connected? - @connected && (self.primary_pool || self.read_pool) + @connected && (@manager.primary_pool || @manager.read_pool) end # @deprecated @@ -221,14 +224,14 @@ module Mongo # # @return [String] def host - self.primary_pool.host + @manager.primary_pool.host end # The replica set primary's port. # # @return [Integer] def port - self.primary_pool.port + @manager.primary_pool.port end def nodes @@ -242,7 +245,7 @@ module Mongo # # @return [Boolean] def read_primary? - self.read_pool == self.primary_pool + @manager.read_pool == @manager.primary_pool end alias :primary? :read_primary? @@ -294,96 +297,68 @@ module Mongo end end + # Generic socket checkout + # Takes a block that returns a socket from pool + def checkout(&block) + if connected? + sync_refresh + else + connect + end + + begin + socket = block.call + rescue => ex + checkin(socket) if socket + raise ex + end + + if socket + socket + else + @connected = false + raise ConnectionFailure.new("Could not checkout a socket") + end + end + # Checkout a socket for reading (i.e., a secondary node). # Note that @read_pool might point to the primary pool # if no read pool has been defined. def checkout_reader - if connected? - sync_refresh - else - connect - end - begin - socket = get_socket_from_pool(self.read_pool) + checkout do + socket = get_socket_from_pool(:read) if !socket connect - socket = get_socket_from_pool(self.primary_pool) + socket = get_socket_from_pool(:primary) end - rescue => ex - checkin(socket) if socket - raise ex - end - - if socket socket - else - @connected = false - raise ConnectionFailure.new("Could not connect to a node for reading.") end end + # Checkout a socket from a secondary + # For :read_preference => :secondary_only def checkout_secondary - if connected? - sync_refresh - else - connect - end - begin - socket = get_socket_from_pool(self.secondary_pool) - rescue => ex - checkin(socket) if socket - raise ex - end - - if socket - socket - else - @connected = false - raise ConnectionFailure.new("Could not connect to a secondary for reading.") + checkout do + get_socket_from_pool(:secondary) end end # Checkout a socket for writing (i.e., a primary node). def checkout_writer - if connected? - sync_refresh - else - connect - end - begin - socket = get_socket_from_pool(self.primary_pool) - - if !socket - connect - socket = get_socket_from_pool(self.primary_pool) - end - rescue => ex - checkin(socket) - raise ex - end - - if socket - socket - else - @connected = false - raise ConnectionFailure.new("Could not connect to primary node.") + checkout do + get_socket_from_pool(:primary) end end # Checkin a socket used for reading. def checkin_reader(socket) - if !((self.read_pool && self.read_pool.checkin(socket)) || - (self.primary_pool && self.primary_pool.checkin(socket))) - close_socket(socket) - end + socket.pool.checkin(socket) sync_refresh end # Checkin a socket used for writing. def checkin_writer(socket) - if !self.primary_pool || !self.primary_pool.checkin(socket) - close_socket(socket) - end + socket.pool.checkin(socket) sync_refresh end @@ -395,7 +370,20 @@ module Mongo end end - def get_socket_from_pool(pool) + def get_socket_from_pool(pool_type) + if Thread.current[:manager] != @manager + Thread.current[:manager] = @manager + end + + pool = case pool_type + when :primary + primary_pool + when :secondary + secondary_pool + when :read + read_pool + end + begin if pool socket = pool.checkout @@ -406,47 +394,51 @@ module Mongo return nil end end + + def local_manager + Thread.current[:manager] + end def arbiters - @manager.arbiters.nil? ? [] : @manager.arbiters + local_manager.arbiters.nil? ? [] : local_manager.arbiters end def primary - @manager ? @manager.primary : nil + local_manager ? local_manager.primary : nil end # Note: might want to freeze these after connecting. def secondaries - @manager ? @manager.secondaries : [] + local_manager ? local_manager.secondaries : [] end def hosts - @manager ? @manager.hosts : [] + local_manager ? local_manager.hosts : [] end def primary_pool - @manager ? @manager.primary_pool : nil + local_manager ? local_manager.primary_pool : nil end def read_pool - @manager ? @manager.read_pool : nil + local_manager ? local_manager.read_pool : nil end def secondary_pool - @manager ? @manager.secondary_pool : nil + local_manager ? local_manager.secondary_pool : nil end def secondary_pools - @manager ? @manager.secondary_pools : [] + local_manager ? local_manager.secondary_pools : [] end def tag_map - @manager ? @manager.tag_map : {} + local_manager ? local_manager.tag_map : {} end def max_bson_size - if @manager && @manager.max_bson_size - @manager.max_bson_size + if local_manager && local_manager.max_bson_size + local_manager.max_bson_size else Mongo::DEFAULT_MAX_BSON_SIZE end @@ -513,12 +505,32 @@ module Mongo raise NodeWithTagsNotFound, "Could not find a connection tagged with #{tags}." end + + def prune_managers + @old_managers.each do |manager| + if manager != @manager + if manager.closed? + @old_managers.delete(manager) + else + manager.close(:soft => true) + end + end + end + end def sync_refresh if @refresh_mode == :sync && ((Time.now - @last_refresh) > @refresh_interval) @last_refresh = Time.now - refresh + + if @refresh_mutex.try_lock + begin + prune_managers + refresh + ensure + @refresh_mutex.unlock + end + end end end end diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index f04a705..2d77489 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -65,22 +65,12 @@ module Mongo # close only those sockets that are not checked out. def close(opts={}) @connection_mutex.synchronize do - if opts[:soft] - sockets_to_close = @sockets - @checked_out + if opts[:soft] && !@checked_out.empty? + close_sockets(@sockets - @checked_out) else - sockets_to_close = @sockets + close_sockets(@sockets) + @closed = true end - sockets_to_close.each do |sock| - begin - sock.close unless sock.closed? - rescue IOError => ex - warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}" - end - end - @sockets.clear - @pids.clear - @checked_out.clear - @closed = true end end @@ -256,9 +246,9 @@ module Mongo start_time = Time.now loop do if (Time.now - start_time) > @timeout - raise ConnectionTimeoutError, "could not obtain connection within " + - "#{@timeout} seconds. The max pool size is currently #{@size}; " + - "consider increasing the pool size or timeout." + raise ConnectionTimeoutError, "could not obtain connection within " + + "#{@timeout} seconds. The max pool size is currently #{@size}; " + + "consider increasing the pool size or timeout." end @connection_mutex.synchronize do @@ -300,5 +290,18 @@ module Mongo end end end + + private + + def close_sockets(sockets) + sockets.each do |socket| + begin + socket.close unless socket.closed? + rescue IOError => ex + warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}" + end + end + end + end end diff --git a/lib/mongo/util/pool_manager.rb b/lib/mongo/util/pool_manager.rb index 0525ec4..019ead2 100644 --- a/lib/mongo/util/pool_manager.rb +++ b/lib/mongo/util/pool_manager.rb @@ -84,6 +84,10 @@ module Mongo @refresh_required end + def closed? + pools.all? { |pool| pool.closed? } + end + def close(opts={}) begin if @primary_pool @@ -114,6 +118,10 @@ module Mongo private + def pools + [@primary_pool, *@secondary_pools] + end + def validate_existing_member(member) config = member.set_config if !config