mysql2/lib/active_record/fiber_patches.rb

104 lines
2.4 KiB
Ruby
Raw Normal View History

# Necessary monkeypatching to make AR fiber-friendly.
module ActiveRecord
module ConnectionAdapters
def self.fiber_pools
@fiber_pools ||= []
end
def self.register_fiber_pool(fp)
fiber_pools << fp
end
class FiberedMonitor
class Queue
def initialize
@queue = []
end
def wait(timeout)
t = timeout || 5
fiber = Fiber.current
x = EM::Timer.new(t) do
@queue.delete(fiber)
fiber.resume(false)
end
@queue << fiber
returning Fiber.yield do
x.cancel
end
end
def signal
fiber = @queue.pop
fiber.resume(true) if fiber
end
end
def synchronize
yield
end
def new_cond
Queue.new
end
end
# ActiveRecord's connection pool is based on threads. Since we are working
# with EM and a single thread, multiple fiber design, we need to provide
# our own connection pool that keys off of Fiber.current so that different
# fibers running in the same thread don't try to use the same connection.
class ConnectionPool
def initialize(spec)
@spec = spec
# The cache of reserved connections mapped to threads
@reserved_connections = {}
# The mutex used to synchronize pool access
@connection_mutex = FiberedMonitor.new
@queue = @connection_mutex.new_cond
# default 5 second timeout unless on ruby 1.9
@timeout = spec.config[:wait_timeout] || 5
# default max pool size to 5
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
@connections = []
@checked_out = []
end
private
def current_connection_id #:nodoc:
Fiber.current.object_id
end
# Remove stale fibers from the cache.
def remove_stale_cached_threads!(cache, &block)
keys = Set.new(cache.keys)
ActiveRecord::ConnectionAdapters.fiber_pools.each do |pool|
pool.busy_fibers.each_pair do |object_id, fiber|
keys.delete(object_id)
end
end
keys.each do |key|
next unless cache.has_key?(key)
block.call(key, cache[key])
cache.delete(key)
end
end
def checkout_and_verify(c)
@checked_out << c
c.run_callbacks :checkout
c.verify!
c
end
end
end
end