Stop using 'sync' library. Don't trust it. Too complex.
This commit is contained in:
parent
1f068ce127
commit
3655a94934
@ -16,8 +16,6 @@
|
||||
# limitations under the License.
|
||||
# ++
|
||||
|
||||
require 'sync'
|
||||
|
||||
module Mongo
|
||||
|
||||
# Instantiates and manages connections to a MongoDB replica set.
|
||||
@ -83,8 +81,6 @@ module Mongo
|
||||
# @raise [ReplicaSetConnectionError] This is raised if a replica set name is specified and the
|
||||
# driver fails to connect to a replica set with that name.
|
||||
def initialize(*args)
|
||||
extend Sync_m
|
||||
|
||||
if args.last.is_a?(Hash)
|
||||
opts = args.pop
|
||||
else
|
||||
@ -157,23 +153,21 @@ module Mongo
|
||||
# Initiate a connection to the replica set.
|
||||
def connect
|
||||
log(:info, "Connecting...")
|
||||
sync_synchronize(:EX) do
|
||||
return if @connected
|
||||
manager = PoolManager.new(self, @seeds)
|
||||
manager.connect
|
||||
return if @connected
|
||||
manager = PoolManager.new(self, @seeds)
|
||||
manager.connect
|
||||
|
||||
update_config(manager)
|
||||
initiate_refresh_mode
|
||||
update_config(manager)
|
||||
initiate_refresh_mode
|
||||
|
||||
if @require_primary && self.primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
|
||||
close
|
||||
raise ConnectionFailure, "Failed to connect to primary node."
|
||||
elsif self.read_pool.nil?
|
||||
close
|
||||
raise ConnectionFailure, "Failed to connect to any node."
|
||||
else
|
||||
@connected = true
|
||||
end
|
||||
if @require_primary && self.primary.nil? #TODO: in v2.0, we'll let this be optional and do a lazy connect.
|
||||
close
|
||||
raise ConnectionFailure, "Failed to connect to primary node."
|
||||
elsif self.read_pool.nil?
|
||||
close
|
||||
raise ConnectionFailure, "Failed to connect to any node."
|
||||
else
|
||||
@connected = true
|
||||
end
|
||||
end
|
||||
|
||||
@ -208,13 +202,11 @@ module Mongo
|
||||
# occurred. +false+ is returned when unable
|
||||
# to get the refresh lock.
|
||||
def hard_refresh!
|
||||
return false if sync_exclusive?
|
||||
|
||||
log(:info, "Initiating hard refresh...")
|
||||
background_manager = PoolManager.new(self, @seeds)
|
||||
background_manager.connect
|
||||
|
||||
# TODO: make sure that connect as succeeded
|
||||
# TODO: make sure that connect has succeeded
|
||||
update_config(background_manager)
|
||||
initiate_refresh_mode
|
||||
|
||||
@ -235,14 +227,14 @@ module Mongo
|
||||
#
|
||||
# @return [String]
|
||||
def host
|
||||
super
|
||||
self.primary_pool.host
|
||||
end
|
||||
|
||||
# The replica set primary's port.
|
||||
#
|
||||
# @return [Integer]
|
||||
def port
|
||||
super
|
||||
self.primary_pool.port
|
||||
end
|
||||
|
||||
def nodes
|
||||
@ -266,18 +258,15 @@ module Mongo
|
||||
|
||||
# Close the connection to the database.
|
||||
def close
|
||||
sync_synchronize(:EX) do
|
||||
@connected = false
|
||||
super
|
||||
@connected = false
|
||||
|
||||
if @refresh_thread
|
||||
@refresh_thread.kill
|
||||
@refresh_thread = nil
|
||||
end
|
||||
|
||||
@manager.close if @manager
|
||||
@sockets_to_pools.clear
|
||||
if @refresh_thread
|
||||
@refresh_thread.kill
|
||||
@refresh_thread = nil
|
||||
end
|
||||
|
||||
@manager.close if @manager
|
||||
@sockets_to_pools.clear
|
||||
end
|
||||
|
||||
# If a ConnectionFailure is raised, this method will be called
|
||||
@ -301,14 +290,14 @@ module Mongo
|
||||
end
|
||||
|
||||
def authenticate_pools
|
||||
super
|
||||
self.primary_pool.authenticate_existing
|
||||
self.secondary_pools.each do |pool|
|
||||
pool.authenticate_existing
|
||||
end
|
||||
end
|
||||
|
||||
def logout_pools(db)
|
||||
super
|
||||
self.primary_pool.logout_existing(db)
|
||||
self.secondary_pools.each do |pool|
|
||||
pool.logout_existing(db)
|
||||
end
|
||||
@ -351,16 +340,13 @@ module Mongo
|
||||
end
|
||||
|
||||
def checkin(socket)
|
||||
sync_synchronize(:SH) do
|
||||
if pool = @sockets_to_pools[socket]
|
||||
pool.checkin(socket)
|
||||
elsif socket
|
||||
begin
|
||||
socket.close
|
||||
rescue IOError
|
||||
log(:info, "Tried to close socket #{socket} but already closed.")
|
||||
end
|
||||
if pool = @sockets_to_pools[socket]
|
||||
pool.checkin(socket)
|
||||
if !@sockets_to_pools[socket]
|
||||
close_socket(socket)
|
||||
end
|
||||
elsif socket
|
||||
close_socket(socket)
|
||||
end
|
||||
|
||||
# Refresh synchronously every @refresh_interval seconds
|
||||
@ -372,16 +358,21 @@ module Mongo
|
||||
end
|
||||
end
|
||||
|
||||
def close_socket(socket)
|
||||
begin
|
||||
socket.close
|
||||
rescue IOError
|
||||
log(:info, "Tried to close socket #{socket} but already closed.")
|
||||
end
|
||||
end
|
||||
|
||||
def get_socket_from_pool(pool)
|
||||
begin
|
||||
sync_synchronize(:SH) do
|
||||
if pool
|
||||
socket = pool.checkout
|
||||
@sockets_to_pools[socket] = pool
|
||||
socket
|
||||
end
|
||||
if pool
|
||||
socket = pool.checkout
|
||||
@sockets_to_pools[socket] = pool
|
||||
socket
|
||||
end
|
||||
|
||||
rescue ConnectionFailure => ex
|
||||
log(:info, "Failed to checkout from #{pool} with #{ex.class}; #{ex.message}")
|
||||
return nil
|
||||
@ -463,14 +454,12 @@ module Mongo
|
||||
#
|
||||
# NOTE: will be available in driver release v2.0.
|
||||
def checkout_tagged(tags)
|
||||
sync_synchronize(:SH) do
|
||||
tags.each do |k, v|
|
||||
pool = self.tag_map[{k.to_s => v}]
|
||||
if pool
|
||||
socket = pool.checkout
|
||||
@sockets_to_pools[socket] = pool
|
||||
return socket
|
||||
end
|
||||
tags.each do |k, v|
|
||||
pool = self.tag_map[{k.to_s => v}]
|
||||
if pool
|
||||
socket = pool.checkout
|
||||
@sockets_to_pools[socket] = pool
|
||||
return socket
|
||||
end
|
||||
end
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user