Support for asynchronous ActiveRecord via Fibers and EM

This commit is contained in:
mperham 2010-07-09 05:40:20 +08:00 committed by Brian Lopez
parent 39b4776a67
commit 3f2e948c5f
4 changed files with 171 additions and 0 deletions

View File

@ -1,5 +1,8 @@
# Changelog
## 0.1.9 (HEAD)
* Support async ActiveRecord access with fibers and EventMachine (mperham)
## 0.1.8 (June 2nd, 2010)
* fixes for AR adapter for timezone juggling
* fixes to be able to run benchmarks and specs under 1.9.2

View File

@ -80,6 +80,11 @@ If you need multiple query concurrency take a look at using a connection pool.
To use the ActiveRecord driver, all you should need to do is have this gem installed and set the adapter in your database.yml to "mysql2".
That was easy right? :)
== Asynchronous ActiveRecord
You can also use Mysql2 with asynchronous Rails (first introduced at http://www.mikeperham.com/2010/04/03/introducing-phat-an-asynchronous-rails-app/) by
setting the adapter in your database.yml to "em_mysql2". You must be running Ruby 1.9, thin and the rack-fiber_pool middleware for it to work.
== EventMachine
The mysql2 EventMachine deferrable api allows you to make async queries using EventMachine,

View File

@ -0,0 +1,59 @@
# encoding: utf-8
# AR adapter for using a fibered mysql2 connection with EM
# This adapter should be used within Thin or Unicorn with the rack-fiber_pool middleware.
# Just update your database.yml's adapter to be 'em_mysql2'
module ActiveRecord
class Base
def self.em_mysql2_connection(config)
client = ::Mysql2::Fibered::Client.new(config.symbolize_keys)
options = [config[:host], config[:username], config[:password], config[:database], config[:port], config[:socket], 0]
ConnectionAdapters::Mysql2Adapter.new(client, logger, options, config)
end
end
end
require 'fiber'
require 'eventmachine' unless defined? EventMachine
require 'mysql2' unless defined? Mysql2
require 'active_record/connection_adapters/mysql2_adapter'
require 'active_record/fiber_patches'
module Mysql2
module Fibered
class Client < ::Mysql2::Client
module Watcher
def initialize(client, deferable)
@client = client
@deferable = deferable
end
def notify_readable
begin
detach
results = @client.async_result
@deferable.succeed(results)
rescue Exception => e
puts e.backtrace.join("\n\t")
@deferable.fail(e)
end
end
end
def query(sql, opts={})
super(sql, opts.merge(:async => true))
deferable = ::EM::DefaultDeferrable.new
::EM.watch(self.socket, Watcher, self, deferable).notify_readable = true
fiber = Fiber.current
deferable.callback do |result|
fiber.resume(result)
end
deferable.errback do |err|
fiber.resume(err)
end
Fiber.yield
end
end
end
end

View File

@ -0,0 +1,104 @@
# Necessary monkeypatching to make AR fiber-friendly.
module ActiveRecord
module ConnectionAdapters
def self.fiber_pools
@fiber_pools ||= []
end
def self.register_fiber_pool(fp)
fiber_pools << fp
end
class FiberedMonitor
class Queue
def initialize
@queue = []
end
def wait(timeout)
t = timeout || 5
fiber = Fiber.current
x = EM::Timer.new(t) do
@queue.delete(fiber)
fiber.resume(false)
end
@queue << fiber
returning Fiber.yield do
x.cancel
end
end
def signal
fiber = @queue.pop
fiber.resume(true) if fiber
end
end
def synchronize
yield
end
def new_cond
Queue.new
end
end
# ActiveRecord's connection pool is based on threads. Since we are working
# with EM and a single thread, multiple fiber design, we need to provide
# our own connection pool that keys off of Fiber.current so that different
# fibers running in the same thread don't try to use the same connection.
class ConnectionPool
def initialize(spec)
@spec = spec
# The cache of reserved connections mapped to threads
@reserved_connections = {}
# The mutex used to synchronize pool access
@connection_mutex = FiberedMonitor.new
@queue = @connection_mutex.new_cond
# default 5 second timeout unless on ruby 1.9
@timeout = spec.config[:wait_timeout] || 5
# default max pool size to 5
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
@connections = []
@checked_out = []
end
private
def current_connection_id #:nodoc:
Fiber.current.object_id
end
# Remove stale fibers from the cache.
def remove_stale_cached_threads!(cache, &block)
keys = Set.new(cache.keys)
ActiveRecord::ConnectionAdapters.fiber_pools.each do |pool|
pool.busy_fibers.each_pair do |object_id, fiber|
keys.delete(object_id)
end
end
keys.each do |key|
next unless cache.has_key?(key)
block.call(key, cache[key])
cache.delete(key)
end
end
def checkout_and_verify(c)
@checked_out << c
c.run_callbacks :checkout
c.verify!
c
end
end
end
end