From 7c001ab4858f048c0579d2d4f1bff620f59579de Mon Sep 17 00:00:00 2001 From: Arturo Pie Date: Wed, 1 Jun 2011 12:45:43 -0400 Subject: [PATCH] Runner events implemented. Added at_exit hook to make sure runner_end is fired on abnormal termination --- lib/hydra/master.rb | 9 ++- lib/hydra/runner.rb | 25 ++++-- lib/hydra/runner_listener/abstract.rb | 4 +- lib/hydra/worker.rb | 13 +++- test/fixtures/hydra_worker_init.rb | 2 + test/fixtures/master_listeners.rb | 10 +++ test/fixtures/runner_listeners.rb | 24 +++--- test/master_test.rb | 108 ++++++++++++++++++++++++++ test/runner_test.rb | 8 +- test/test_helper.rb | 14 ++++ 10 files changed, 188 insertions(+), 29 deletions(-) create mode 100644 test/fixtures/hydra_worker_init.rb create mode 100644 test/fixtures/master_listeners.rb diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb index b160363..0296e99 100644 --- a/lib/hydra/master.rb +++ b/lib/hydra/master.rb @@ -64,6 +64,9 @@ module Hydra #:nodoc: listener = eval(l) @event_listeners << listener if listener.is_a?(Hydra::Listener::Abstract) end + + @string_runner_event_listeners = Array( opts.fetch( 'runner_listeners' ) { nil } ) + @verbose = opts.fetch('verbose') { false } @autosort = opts.fetch('autosort') { true } @sync = opts.fetch('sync') { nil } @@ -160,7 +163,7 @@ module Hydra #:nodoc: pipe = Hydra::Pipe.new child = SafeFork.fork do pipe.identify_as_child - Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose) + Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose, :runner_listeners => @string_runner_event_listeners ) end pipe.identify_as_parent @@ -170,9 +173,11 @@ module Hydra #:nodoc: def boot_ssh_worker(worker) sync = Sync.new(worker, @sync, @verbose) +# @environment+=" bundle exec" #used for manually testing + runners = worker.fetch('runners') { raise "You must specify the number of runners" } command = worker.fetch('command') { - "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\"" + "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose}, :runner_listeners => \'#{@string_runner_event_listeners}\' );\"" } trace "Booting SSH worker" diff --git a/lib/hydra/runner.rb b/lib/hydra/runner.rb index 86d7027..432d77b 100644 --- a/lib/hydra/runner.rb +++ b/lib/hydra/runner.rb @@ -16,15 +16,16 @@ module Hydra #:nodoc: # Boot up a runner. It takes an IO object (generally a pipe from its # parent) to send it messages on which files to execute. def initialize(opts = {}) - @io = opts.fetch(:io) { raise "No IO Object" } + Runner.runner_instance = self # save Runner to make sure it stop at_exit + @io = opts.fetch(:io) { raise "No IO Object" } @verbose = opts.fetch(:verbose) { false } @event_listeners = Array( opts.fetch( :runner_listeners ) { nil } ) $stdout.sync = true - trace 'Booted. Sending Request for file' runner_begin + trace 'Booted. Sending Request for file' @io.write RequestFile.new begin process_messages @@ -36,7 +37,15 @@ module Hydra #:nodoc: def runner_begin trace "Firing runner_begin event" - @event_listeners.each {|l| l.runner_begin } + @event_listeners.each {|l| l.runner_begin( self ) } + end + + def self.runner_instance=( runner ) + @runner_instance = runner + end + + def self.runner_instance + @runner_instance end # Run a test file and report the results @@ -62,13 +71,13 @@ module Hydra #:nodoc: # Stop running def stop + runner_end if @running @running = false - runner_end end def runner_end - trace "Firing runner_end event" - @event_listeners.each {|l| l.runner_end } +# trace "Firing runner_end event" + @event_listeners.each {|l| l.runner_end( self ) } end private @@ -279,4 +288,8 @@ module Hydra #:nodoc: end.compact end end + + at_exit do + Runner.runner_instance.stop if Runner.runner_instance + end end diff --git a/lib/hydra/runner_listener/abstract.rb b/lib/hydra/runner_listener/abstract.rb index 64cb7fe..9a12d0f 100644 --- a/lib/hydra/runner_listener/abstract.rb +++ b/lib/hydra/runner_listener/abstract.rb @@ -12,11 +12,11 @@ module Hydra #:nodoc: end # Fired by the runner just before requesting the first file - def runner_begin + def runner_begin( runner ) end # Fired by the runner just after stoping - def runner_end + def runner_end( runner ) end end end diff --git a/lib/hydra/worker.rb b/lib/hydra/worker.rb index c0b6f2b..7b3c5a5 100644 --- a/lib/hydra/worker.rb +++ b/lib/hydra/worker.rb @@ -21,6 +21,14 @@ module Hydra #:nodoc: @listeners = [] load_worker_initializer + + @runner_event_listeners = Array(opts.fetch(:runner_listeners) { nil }) + @runner_event_listeners.select{|l| l.is_a? String}.each do |l| + @runner_event_listeners.delete_at(@runner_event_listeners.index(l)) + listener = eval(l) + @runner_event_listeners << listener if listener.is_a?(Hydra::RunnerListener::Abstract) + end + boot_runners(opts.fetch(:runners) { 1 }) @io.write(Hydra::Messages::Worker::WorkerBegin.new) @@ -83,7 +91,7 @@ module Hydra #:nodoc: pipe = Hydra::Pipe.new child = SafeFork.fork do pipe.identify_as_child - Hydra::Runner.new(:io => pipe, :verbose => @verbose) + Hydra::Runner.new(:io => pipe, :verbose => @verbose, :runner_listeners => @runner_event_listeners ) end pipe.identify_as_parent @runners << { :pid => child, :io => pipe, :idle => false } @@ -121,7 +129,8 @@ module Hydra #:nodoc: end rescue IOError => ex trace "Worker lost Master" - Thread.exit + shutdown + #Thread.exit end end end diff --git a/test/fixtures/hydra_worker_init.rb b/test/fixtures/hydra_worker_init.rb new file mode 100644 index 0000000..3a61aaf --- /dev/null +++ b/test/fixtures/hydra_worker_init.rb @@ -0,0 +1,2 @@ +require '../test/fixtures/runner_listeners.rb' +require '../test/fixtures/master_listeners.rb' diff --git a/test/fixtures/master_listeners.rb b/test/fixtures/master_listeners.rb new file mode 100644 index 0000000..af673d1 --- /dev/null +++ b/test/fixtures/master_listeners.rb @@ -0,0 +1,10 @@ +module HydraExtension + module Listener + class WorkerBeganFlag < Hydra::Listener::Abstract + # Fired after runner processes have been started + def worker_begin(worker) + FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'worker_began_flag')) + end + end + end +end diff --git a/test/fixtures/runner_listeners.rb b/test/fixtures/runner_listeners.rb index 47929c6..49f8e81 100644 --- a/test/fixtures/runner_listeners.rb +++ b/test/fixtures/runner_listeners.rb @@ -1,17 +1,17 @@ -require File.join(File.dirname(__FILE__), '..', 'test_helper') - -module RunnerListener - class RunnerBeginTest < Hydra::RunnerListener::Abstract - # Fired by the runner just before requesting the first file - def runner_begin - FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt')) +module HydraExtension + module RunnerListener + class RunnerBeginTest < Hydra::RunnerListener::Abstract + # Fired by the runner just before requesting the first file + def runner_begin( runner ) + FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt')) + end end - end - class RunnerEndTest < Hydra::RunnerListener::Abstract - # Fired by the runner just after stoping - def runner_end - FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt')) + class RunnerEndTest < Hydra::RunnerListener::Abstract + # Fired by the runner just after stoping + def runner_end( runner ) + FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt')) + end end end end diff --git a/test/master_test.rb b/test/master_test.rb index e32a183..f082732 100644 --- a/test/master_test.rb +++ b/test/master_test.rb @@ -1,4 +1,6 @@ require File.join(File.dirname(__FILE__), 'test_helper') +require File.join(File.dirname(__FILE__), 'fixtures', 'runner_listeners') +require File.join(File.dirname(__FILE__), 'fixtures', 'master_listeners') class MasterTest < Test::Unit::TestCase context "with a file to test and a destination to verify" do @@ -178,4 +180,110 @@ class MasterTest < Test::Unit::TestCase assert !File.exists?(File.join(remote, 'test_b.rb')), "B was not deleted" end end + + context "with a runner_end event" do + setup do + # avoid having other tests interfering with us + sleep(0.2) + FileUtils.rm_f(target_file) + FileUtils.rm_f(alternate_target_file) + + @worker_began_flag = File.expand_path(File.join(Dir.consistent_tmpdir, 'worker_began_flag')) #used to know when the worker is ready + FileUtils.rm_f(@worker_began_flag) + + @runner_listener = 'HydraExtension::RunnerListener::RunnerEndTest.new' # runner_end method that creates alternate_target_file + @master_listener = HydraExtension::Listener::WorkerBeganFlag.new #used to know when the runner is up + end + + teardown do + FileUtils.rm_f(target_file) + FileUtils.rm_f(alternate_target_file) + end + + context "running a local worker" do + setup do + @pid = Process.fork do + Hydra::Master.new( + :files => [test_file] * 6, + :autosort => false, + :listeners => [@master_listener], + :runner_listeners => [@runner_listener], + :verbose => false + ) + end + end + + should "run runner_end on successful termination" do + Process.waitpid @pid + + assert File.exists?( target_file ) + + wait_for_file_for_a_while alternate_target_file, 2 + assert File.exists? alternate_target_file + end + + should "run runner_end after interruption signal" do + wait_for_runner_to_begin + Process.kill 'SIGINT', @pid + Process.waitpid @pid + + wait_for_file_for_a_while alternate_target_file, 2 + assert File.exists? alternate_target_file # runner_end should create this file + end + end + + context "running a remote worker" do + setup do + copy_worker_init_file # this method has a protection to avoid erasing an existing worker_init_file + @pid = Process.fork do + Hydra::Master.new( + :files => [test_file] * 6, + :autosort => false, + :listeners => [@master_listener], + :runner_listeners => [@runner_listener], + :workers => [{ + :type => :ssh, + :connect => 'localhost', + :directory => remote_dir_path, + :runners => 1 + }], + :verbose => false + ) + end + end + + teardown do + FileUtils.rm_f(@remote_init_file) unless @protect_init_file + end + + should "run runner_end on successful termination" do + Process.waitpid @pid + + wait_for_file_for_a_while alternate_target_file, 2 + assert File.exists? target_file + assert File.exists? alternate_target_file + end + end + end + + private + # this requires that a worker_begin listener creates a file named worker_began_flag in tmp directory + def wait_for_runner_to_begin + FileUtils.rm_f(@worker_began_flag) + + wait_for_file_for_a_while @worker_began_flag, 2 + assert File.exists?( @worker_began_flag ), "The worker didn't begin!!" + end + + # with a protection to avoid erasing something important in lib + def copy_worker_init_file + @remote_init_file = "#{remote_dir_path}/#{File.basename( hydra_worker_init_file )}" + if File.exists?( @remote_init_file ) + $stderr.puts "\nWARNING!!!: #{@remote_init_file} exits and this test needs to create a new file here. Make sure there is nothing inportant in that file and remove it before running this test\n\n" + @protect_init_file = true + exit + end + # copy the hydra_worker_init to the correct location + FileUtils.cp(hydra_worker_init_file, remote_dir_path) + end end diff --git a/test/runner_test.rb b/test/runner_test.rb index bbda5d8..a1df47a 100644 --- a/test/runner_test.rb +++ b/test/runner_test.rb @@ -107,7 +107,7 @@ class RunnerTest < Test::Unit::TestCase request_a_file_and_verify_completion(pipe, test_file) end - run_the_runner(pipe, [RunnerListener::RunnerBeginTest.new] ) + run_the_runner(pipe, [HydraExtension::RunnerListener::RunnerBeginTest.new] ) Process.wait(parent) # ensure runner_begin was fired @@ -115,7 +115,7 @@ class RunnerTest < Test::Unit::TestCase end should "fire runner_end event after successful shutting down" do - send_file_to_ssh_runner_and_verify_completion ", :runner_listeners => [RunnerListener::RunnerEndTest.new]" + send_file_to_ssh_runner_and_verify_completion ", :runner_listeners => [HydraExtension::RunnerListener::RunnerEndTest.new]" wait_for_file_for_a_while alternate_target_file, 2 @@ -130,12 +130,10 @@ class RunnerTest < Test::Unit::TestCase # grab its response. response = pipe.gets - pipe.close #this will be detected by the runner and it should call runner_end - end - run_the_runner(pipe, [RunnerListener::RunnerEndTest.new] ) + run_the_runner(pipe, [HydraExtension::RunnerListener::RunnerEndTest.new] ) Process.wait(parent) # ensure runner_begin was fired diff --git a/test/test_helper.rb b/test/test_helper.rb index 4d7c6d2..26fec8f 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -60,6 +60,20 @@ class Test::Unit::TestCase def remote_dir_path File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')) end + + def hydra_worker_init_file + File.expand_path(File.join(File.dirname(__FILE__), 'fixtures', 'hydra_worker_init.rb')) + end + + #this method allow us to wait for a file for a maximum number of time, so the + #test can pass in slower machines. This helps to speed up the tests + def wait_for_file_for_a_while file, time_to_wait + time_begin = Time.now + + until Time.now - time_begin >= time_to_wait or File.exists?( file ) do + sleep 0.01 + end + end end module Hydra #:nodoc: