104 lines
2.4 KiB
Ruby
104 lines
2.4 KiB
Ruby
# Necessary monkeypatching to make AR fiber-friendly.
|
|
|
|
module ActiveRecord
|
|
module ConnectionAdapters
|
|
|
|
def self.fiber_pools
|
|
@fiber_pools ||= []
|
|
end
|
|
def self.register_fiber_pool(fp)
|
|
fiber_pools << fp
|
|
end
|
|
|
|
class FiberedMonitor
|
|
class Queue
|
|
def initialize
|
|
@queue = []
|
|
end
|
|
|
|
def wait(timeout)
|
|
t = timeout || 5
|
|
fiber = Fiber.current
|
|
x = EM::Timer.new(t) do
|
|
@queue.delete(fiber)
|
|
fiber.resume(false)
|
|
end
|
|
@queue << fiber
|
|
returning Fiber.yield do
|
|
x.cancel
|
|
end
|
|
end
|
|
|
|
def signal
|
|
fiber = @queue.pop
|
|
fiber.resume(true) if fiber
|
|
end
|
|
end
|
|
|
|
def synchronize
|
|
yield
|
|
end
|
|
|
|
def new_cond
|
|
Queue.new
|
|
end
|
|
end
|
|
|
|
# ActiveRecord's connection pool is based on threads. Since we are working
|
|
# with EM and a single thread, multiple fiber design, we need to provide
|
|
# our own connection pool that keys off of Fiber.current so that different
|
|
# fibers running in the same thread don't try to use the same connection.
|
|
class ConnectionPool
|
|
def initialize(spec)
|
|
@spec = spec
|
|
|
|
# The cache of reserved connections mapped to threads
|
|
@reserved_connections = {}
|
|
|
|
# The mutex used to synchronize pool access
|
|
@connection_mutex = FiberedMonitor.new
|
|
@queue = @connection_mutex.new_cond
|
|
|
|
# default 5 second timeout unless on ruby 1.9
|
|
@timeout = spec.config[:wait_timeout] || 5
|
|
|
|
# default max pool size to 5
|
|
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
|
|
|
|
@connections = []
|
|
@checked_out = []
|
|
end
|
|
|
|
private
|
|
|
|
def current_connection_id #:nodoc:
|
|
Fiber.current.object_id
|
|
end
|
|
|
|
# Remove stale fibers from the cache.
|
|
def remove_stale_cached_threads!(cache, &block)
|
|
keys = Set.new(cache.keys)
|
|
|
|
ActiveRecord::ConnectionAdapters.fiber_pools.each do |pool|
|
|
pool.busy_fibers.each_pair do |object_id, fiber|
|
|
keys.delete(object_id)
|
|
end
|
|
end
|
|
|
|
keys.each do |key|
|
|
next unless cache.has_key?(key)
|
|
block.call(key, cache[key])
|
|
cache.delete(key)
|
|
end
|
|
end
|
|
|
|
def checkout_and_verify(c)
|
|
@checked_out << c
|
|
c.run_callbacks :checkout
|
|
c.verify!
|
|
c
|
|
end
|
|
end
|
|
|
|
end
|
|
end |