From 3f2e948c5f06aec91ad745ac0d586f8f7ded1a95 Mon Sep 17 00:00:00 2001 From: mperham Date: Fri, 9 Jul 2010 05:40:20 +0800 Subject: [PATCH] Support for asynchronous ActiveRecord via Fibers and EM --- CHANGELOG.md | 3 + README.rdoc | 5 + .../connection_adapters/em_mysql2_adapter.rb | 59 ++++++++++ lib/active_record/fiber_patches.rb | 104 ++++++++++++++++++ 4 files changed, 171 insertions(+) create mode 100644 lib/active_record/connection_adapters/em_mysql2_adapter.rb create mode 100644 lib/active_record/fiber_patches.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index cff082a..73f12c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.9 (HEAD) +* Support async ActiveRecord access with fibers and EventMachine (mperham) + ## 0.1.8 (June 2nd, 2010) * fixes for AR adapter for timezone juggling * fixes to be able to run benchmarks and specs under 1.9.2 diff --git a/README.rdoc b/README.rdoc index bb2bbd6..dac5e14 100644 --- a/README.rdoc +++ b/README.rdoc @@ -80,6 +80,11 @@ If you need multiple query concurrency take a look at using a connection pool. To use the ActiveRecord driver, all you should need to do is have this gem installed and set the adapter in your database.yml to "mysql2". That was easy right? :) +== Asynchronous ActiveRecord + +You can also use Mysql2 with asynchronous Rails (first introduced at http://www.mikeperham.com/2010/04/03/introducing-phat-an-asynchronous-rails-app/) by +setting the adapter in your database.yml to "em_mysql2". You must be running Ruby 1.9, thin and the rack-fiber_pool middleware for it to work. + == EventMachine The mysql2 EventMachine deferrable api allows you to make async queries using EventMachine, diff --git a/lib/active_record/connection_adapters/em_mysql2_adapter.rb b/lib/active_record/connection_adapters/em_mysql2_adapter.rb new file mode 100644 index 0000000..0077e13 --- /dev/null +++ b/lib/active_record/connection_adapters/em_mysql2_adapter.rb @@ -0,0 +1,59 @@ +# encoding: utf-8 + +# AR adapter for using a fibered mysql2 connection with EM +# This adapter should be used within Thin or Unicorn with the rack-fiber_pool middleware. +# Just update your database.yml's adapter to be 'em_mysql2' + +module ActiveRecord + class Base + def self.em_mysql2_connection(config) + client = ::Mysql2::Fibered::Client.new(config.symbolize_keys) + options = [config[:host], config[:username], config[:password], config[:database], config[:port], config[:socket], 0] + ConnectionAdapters::Mysql2Adapter.new(client, logger, options, config) + end + end +end + +require 'fiber' +require 'eventmachine' unless defined? EventMachine +require 'mysql2' unless defined? Mysql2 +require 'active_record/connection_adapters/mysql2_adapter' +require 'active_record/fiber_patches' + +module Mysql2 + module Fibered + class Client < ::Mysql2::Client + module Watcher + def initialize(client, deferable) + @client = client + @deferable = deferable + end + + def notify_readable + begin + detach + results = @client.async_result + @deferable.succeed(results) + rescue Exception => e + puts e.backtrace.join("\n\t") + @deferable.fail(e) + end + end + end + + def query(sql, opts={}) + super(sql, opts.merge(:async => true)) + deferable = ::EM::DefaultDeferrable.new + ::EM.watch(self.socket, Watcher, self, deferable).notify_readable = true + fiber = Fiber.current + deferable.callback do |result| + fiber.resume(result) + end + deferable.errback do |err| + fiber.resume(err) + end + Fiber.yield + end + end + end +end \ No newline at end of file diff --git a/lib/active_record/fiber_patches.rb b/lib/active_record/fiber_patches.rb new file mode 100644 index 0000000..3afabed --- /dev/null +++ b/lib/active_record/fiber_patches.rb @@ -0,0 +1,104 @@ +# Necessary monkeypatching to make AR fiber-friendly. + +module ActiveRecord + module ConnectionAdapters + + def self.fiber_pools + @fiber_pools ||= [] + end + def self.register_fiber_pool(fp) + fiber_pools << fp + end + + class FiberedMonitor + class Queue + def initialize + @queue = [] + end + + def wait(timeout) + t = timeout || 5 + fiber = Fiber.current + x = EM::Timer.new(t) do + @queue.delete(fiber) + fiber.resume(false) + end + @queue << fiber + returning Fiber.yield do + x.cancel + end + end + + def signal + fiber = @queue.pop + fiber.resume(true) if fiber + end + end + + def synchronize + yield + end + + def new_cond + Queue.new + end + end + + # ActiveRecord's connection pool is based on threads. Since we are working + # with EM and a single thread, multiple fiber design, we need to provide + # our own connection pool that keys off of Fiber.current so that different + # fibers running in the same thread don't try to use the same connection. + class ConnectionPool + def initialize(spec) + @spec = spec + + # The cache of reserved connections mapped to threads + @reserved_connections = {} + + # The mutex used to synchronize pool access + @connection_mutex = FiberedMonitor.new + @queue = @connection_mutex.new_cond + + # default 5 second timeout unless on ruby 1.9 + @timeout = spec.config[:wait_timeout] || 5 + + # default max pool size to 5 + @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5 + + @connections = [] + @checked_out = [] + end + + private + + def current_connection_id #:nodoc: + Fiber.current.object_id + end + + # Remove stale fibers from the cache. + def remove_stale_cached_threads!(cache, &block) + keys = Set.new(cache.keys) + + ActiveRecord::ConnectionAdapters.fiber_pools.each do |pool| + pool.busy_fibers.each_pair do |object_id, fiber| + keys.delete(object_id) + end + end + + keys.each do |key| + next unless cache.has_key?(key) + block.call(key, cache[key]) + cache.delete(key) + end + end + + def checkout_and_verify(c) + @checked_out << c + c.run_callbacks :checkout + c.verify! + c + end + end + + end +end \ No newline at end of file