# Necessary monkeypatching to make AR fiber-friendly. module ActiveRecord module ConnectionAdapters def self.fiber_pools @fiber_pools ||= [] end def self.register_fiber_pool(fp) fiber_pools << fp end class FiberedMonitor class Queue def initialize @queue = [] end def wait(timeout) t = timeout || 5 fiber = Fiber.current x = EM::Timer.new(t) do @queue.delete(fiber) fiber.resume(false) end @queue << fiber returning Fiber.yield do x.cancel end end def signal fiber = @queue.pop fiber.resume(true) if fiber end end def synchronize yield end def new_cond Queue.new end end # ActiveRecord's connection pool is based on threads. Since we are working # with EM and a single thread, multiple fiber design, we need to provide # our own connection pool that keys off of Fiber.current so that different # fibers running in the same thread don't try to use the same connection. class ConnectionPool def initialize(spec) @spec = spec # The cache of reserved connections mapped to threads @reserved_connections = {} # The mutex used to synchronize pool access @connection_mutex = FiberedMonitor.new @queue = @connection_mutex.new_cond # default 5 second timeout unless on ruby 1.9 @timeout = spec.config[:wait_timeout] || 5 # default max pool size to 5 @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 @connections = [] @checked_out = [] end private def current_connection_id #:nodoc: Fiber.current.object_id end # Remove stale fibers from the cache. def remove_stale_cached_threads!(cache, &block) keys = Set.new(cache.keys) ActiveRecord::ConnectionAdapters.fiber_pools.each do |pool| pool.busy_fibers.each_pair do |object_id, fiber| keys.delete(object_id) end end keys.each do |key| next unless cache.has_key?(key) block.call(key, cache[key]) cache.delete(key) end end def checkout_and_verify(c) @checked_out << c c.run_callbacks :checkout c.verify! c end end end end