From b51bb8c136387b40b8a65c2a719136147150b3bd Mon Sep 17 00:00:00 2001 From: Arturo Pie Date: Mon, 30 May 2011 13:40:44 -0400 Subject: [PATCH 01/11] Some refactoring and added .rvmrc to .gitignore --- .gitignore | 1 + test/master_test.rb | 4 ++-- test/runner_test.rb | 2 +- test/test_helper.rb | 4 ++++ 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 8a5a523..cf7c4c0 100644 --- a/.gitignore +++ b/.gitignore @@ -18,5 +18,6 @@ coverage rdoc pkg tags +.rvmrc ## PROJECT::SPECIFIC diff --git a/test/master_test.rb b/test/master_test.rb index ca6091a..e32a183 100644 --- a/test/master_test.rb +++ b/test/master_test.rb @@ -115,8 +115,8 @@ class MasterTest < Test::Unit::TestCase :workers => [{ :type => :ssh, :connect => 'localhost', - :directory => File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')), - :runners => 1 + :directory => remote_dir_path, + :runners => 1 }] ) assert File.exists?(target_file) diff --git a/test/runner_test.rb b/test/runner_test.rb index 7f25acb..96f88ee 100644 --- a/test/runner_test.rb +++ b/test/runner_test.rb @@ -98,7 +98,7 @@ class RunnerTest < Test::Unit::TestCase should "be able to run a runner over ssh" do ssh = Hydra::SSH.new( 'localhost', - File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')), + remote_dir_path, "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Runner.new(:io => Hydra::Stdio.new, :verbose => true);\"" ) assert ssh.gets.is_a?(Hydra::Messages::Runner::RequestFile) diff --git a/test/test_helper.rb b/test/test_helper.rb index a482fc0..4d7c6d2 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -56,6 +56,10 @@ class Test::Unit::TestCase def conflicting_test_file File.expand_path(File.join(File.dirname(__FILE__), 'fixtures', 'conflicting.rb')) end + + def remote_dir_path + File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')) + end end module Hydra #:nodoc: From 2648c3679fb10a1fb6d685bd84a78be9c4370317 Mon Sep 17 00:00:00 2001 From: Arturo Pie Date: Mon, 30 May 2011 18:08:50 -0400 Subject: [PATCH 02/11] Two runner events implemented and tested at runner level --- lib/hydra.rb | 2 +- lib/hydra/runner.rb | 19 ++++- lib/hydra/runner_listener/abstract.rb | 23 ++++++ test/fixtures/runner_listeners.rb | 17 +++++ test/runner_test.rb | 104 ++++++++++++++++++++------ 5 files changed, 140 insertions(+), 25 deletions(-) create mode 100644 lib/hydra/runner_listener/abstract.rb create mode 100644 test/fixtures/runner_listeners.rb diff --git a/lib/hydra.rb b/lib/hydra.rb index c8b7abf..b2a64ff 100644 --- a/lib/hydra.rb +++ b/lib/hydra.rb @@ -13,4 +13,4 @@ require 'hydra/listener/minimal_output' require 'hydra/listener/report_generator' require 'hydra/listener/notifier' require 'hydra/listener/progress_bar' - +require 'hydra/runner_listener/abstract' diff --git a/lib/hydra/runner.rb b/lib/hydra/runner.rb index 5205e1c..86d7027 100644 --- a/lib/hydra/runner.rb +++ b/lib/hydra/runner.rb @@ -17,10 +17,14 @@ module Hydra #:nodoc: # parent) to send it messages on which files to execute. def initialize(opts = {}) @io = opts.fetch(:io) { raise "No IO Object" } - @verbose = opts.fetch(:verbose) { false } + @verbose = opts.fetch(:verbose) { false } + @event_listeners = Array( opts.fetch( :runner_listeners ) { nil } ) + $stdout.sync = true trace 'Booted. Sending Request for file' + runner_begin + @io.write RequestFile.new begin process_messages @@ -30,6 +34,11 @@ module Hydra #:nodoc: end end + def runner_begin + trace "Firing runner_begin event" + @event_listeners.each {|l| l.runner_begin } + end + # Run a test file and report the results def run_file(file) trace "Running file: #{file}" @@ -54,6 +63,12 @@ module Hydra #:nodoc: # Stop running def stop @running = false + runner_end + end + + def runner_end + trace "Firing runner_end event" + @event_listeners.each {|l| l.runner_end } end private @@ -74,7 +89,7 @@ module Hydra #:nodoc: end rescue IOError => ex trace "Runner lost Worker" - @running = false + stop end end end diff --git a/lib/hydra/runner_listener/abstract.rb b/lib/hydra/runner_listener/abstract.rb new file mode 100644 index 0000000..64cb7fe --- /dev/null +++ b/lib/hydra/runner_listener/abstract.rb @@ -0,0 +1,23 @@ +module Hydra #:nodoc: + module RunnerListener #:nodoc: + # Abstract listener that implements all the events + # but does nothing. + class Abstract + # Create a new listener. + # + # Output: The IO object for outputting any information. + # Defaults to STDOUT, but you could pass a file in, or STDERR + def initialize(output = $stdout) + @output = output + end + + # Fired by the runner just before requesting the first file + def runner_begin + end + + # Fired by the runner just after stoping + def runner_end + end + end + end +end diff --git a/test/fixtures/runner_listeners.rb b/test/fixtures/runner_listeners.rb new file mode 100644 index 0000000..47929c6 --- /dev/null +++ b/test/fixtures/runner_listeners.rb @@ -0,0 +1,17 @@ +require File.join(File.dirname(__FILE__), '..', 'test_helper') + +module RunnerListener + class RunnerBeginTest < Hydra::RunnerListener::Abstract + # Fired by the runner just before requesting the first file + def runner_begin + FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt')) + end + end + + class RunnerEndTest < Hydra::RunnerListener::Abstract + # Fired by the runner just after stoping + def runner_end + FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt')) + end + end +end diff --git a/test/runner_test.rb b/test/runner_test.rb index 96f88ee..bbda5d8 100644 --- a/test/runner_test.rb +++ b/test/runner_test.rb @@ -1,4 +1,5 @@ require File.join(File.dirname(__FILE__), 'test_helper') +require File.join(File.dirname(__FILE__), 'fixtures', 'runner_listeners') class RunnerTest < Test::Unit::TestCase context "with a file to test and a destination to verify" do @@ -96,32 +97,65 @@ class RunnerTest < Test::Unit::TestCase end should "be able to run a runner over ssh" do - ssh = Hydra::SSH.new( - 'localhost', - remote_dir_path, - "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Runner.new(:io => Hydra::Stdio.new, :verbose => true);\"" - ) - assert ssh.gets.is_a?(Hydra::Messages::Runner::RequestFile) - ssh.write(Hydra::Messages::Worker::RunFile.new(:file => test_file)) - - # grab its response. This makes us wait for it to finish - echo = ssh.gets # get the ssh echo - response = ssh.gets + send_file_to_ssh_runner_and_verify_completion + end - assert_equal Hydra::Messages::Runner::Results, response.class - - # tell it to shut down - ssh.write(Hydra::Messages::Worker::Shutdown.new) + context "using runner events" do + should "fire runner_begin event" do + pipe = Hydra::Pipe.new + parent = Process.fork do + request_a_file_and_verify_completion(pipe, test_file) + end - ssh.close - - # ensure it ran - assert File.exists?(target_file) - assert_equal "HYDRA", File.read(target_file) + run_the_runner(pipe, [RunnerListener::RunnerBeginTest.new] ) + Process.wait(parent) + + # ensure runner_begin was fired + assert File.exists?( alternate_target_file ) + end + + should "fire runner_end event after successful shutting down" do + send_file_to_ssh_runner_and_verify_completion ", :runner_listeners => [RunnerListener::RunnerEndTest.new]" + + wait_for_file_for_a_while alternate_target_file, 2 + + # ensure runner_end was fired + assert File.exists?( alternate_target_file ) + end + + should "fire runner_end event after losing communication with worker" do + pipe = Hydra::Pipe.new + parent = Process.fork do + pipe.identify_as_parent + + # grab its response. + response = pipe.gets + + pipe.close #this will be detected by the runner and it should call runner_end + + end + + run_the_runner(pipe, [RunnerListener::RunnerEndTest.new] ) + Process.wait(parent) + + # ensure runner_begin was fired + assert File.exists?( alternate_target_file ) + end end end module RunnerTestHelper + + #this method allow us to wait for a file for a maximum number of time, so the + #test can pass in slower machines. This helps to speed up the tests + def wait_for_file_for_a_while file, time_to_wait + time_begin = Time.now + + until Time.now - time_begin >= time_to_wait or File.exists?( file ) do + sleep 0.01 + end + end + def request_a_file_and_verify_completion(pipe, file) pipe.identify_as_parent @@ -140,9 +174,35 @@ class RunnerTest < Test::Unit::TestCase assert_equal "HYDRA", File.read(target_file) end - def run_the_runner(pipe) + def run_the_runner(pipe, listeners = []) pipe.identify_as_child - Hydra::Runner.new(:io => pipe) + Hydra::Runner.new( :io => pipe, :runner_listeners => listeners ) + end + + def send_file_to_ssh_runner_and_verify_completion opts = "" + ssh = Hydra::SSH.new( + 'localhost', + remote_dir_path, + "ruby -e \"require 'rubygems'; require 'hydra'; require '../test/fixtures/runner_listeners' ; Hydra::Runner.new(:io => Hydra::Stdio.new, :verbose => true #{opts} );\"" + ) + + assert ssh.gets.is_a?(Hydra::Messages::Runner::RequestFile) + ssh.write(Hydra::Messages::Worker::RunFile.new(:file => test_file)) + + # grab its response. This makes us wait for it to finish + echo = ssh.gets # get the ssh echo + response = ssh.gets + + assert_equal Hydra::Messages::Runner::Results, response.class + + # tell it to shut down + ssh.write(Hydra::Messages::Worker::Shutdown.new) + + ssh.close + + # ensure it ran + assert File.exists?(target_file) + assert_equal "HYDRA", File.read(target_file) end end include RunnerTestHelper From 7c001ab4858f048c0579d2d4f1bff620f59579de Mon Sep 17 00:00:00 2001 From: Arturo Pie Date: Wed, 1 Jun 2011 12:45:43 -0400 Subject: [PATCH 03/11] Runner events implemented. Added at_exit hook to make sure runner_end is fired on abnormal termination --- lib/hydra/master.rb | 9 ++- lib/hydra/runner.rb | 25 ++++-- lib/hydra/runner_listener/abstract.rb | 4 +- lib/hydra/worker.rb | 13 +++- test/fixtures/hydra_worker_init.rb | 2 + test/fixtures/master_listeners.rb | 10 +++ test/fixtures/runner_listeners.rb | 24 +++--- test/master_test.rb | 108 ++++++++++++++++++++++++++ test/runner_test.rb | 8 +- test/test_helper.rb | 14 ++++ 10 files changed, 188 insertions(+), 29 deletions(-) create mode 100644 test/fixtures/hydra_worker_init.rb create mode 100644 test/fixtures/master_listeners.rb diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb index b160363..0296e99 100644 --- a/lib/hydra/master.rb +++ b/lib/hydra/master.rb @@ -64,6 +64,9 @@ module Hydra #:nodoc: listener = eval(l) @event_listeners << listener if listener.is_a?(Hydra::Listener::Abstract) end + + @string_runner_event_listeners = Array( opts.fetch( 'runner_listeners' ) { nil } ) + @verbose = opts.fetch('verbose') { false } @autosort = opts.fetch('autosort') { true } @sync = opts.fetch('sync') { nil } @@ -160,7 +163,7 @@ module Hydra #:nodoc: pipe = Hydra::Pipe.new child = SafeFork.fork do pipe.identify_as_child - Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose) + Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose, :runner_listeners => @string_runner_event_listeners ) end pipe.identify_as_parent @@ -170,9 +173,11 @@ module Hydra #:nodoc: def boot_ssh_worker(worker) sync = Sync.new(worker, @sync, @verbose) +# @environment+=" bundle exec" #used for manually testing + runners = worker.fetch('runners') { raise "You must specify the number of runners" } command = worker.fetch('command') { - "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\"" + "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose}, :runner_listeners => \'#{@string_runner_event_listeners}\' );\"" } trace "Booting SSH worker" diff --git a/lib/hydra/runner.rb b/lib/hydra/runner.rb index 86d7027..432d77b 100644 --- a/lib/hydra/runner.rb +++ b/lib/hydra/runner.rb @@ -16,15 +16,16 @@ module Hydra #:nodoc: # Boot up a runner. It takes an IO object (generally a pipe from its # parent) to send it messages on which files to execute. def initialize(opts = {}) - @io = opts.fetch(:io) { raise "No IO Object" } + Runner.runner_instance = self # save Runner to make sure it stop at_exit + @io = opts.fetch(:io) { raise "No IO Object" } @verbose = opts.fetch(:verbose) { false } @event_listeners = Array( opts.fetch( :runner_listeners ) { nil } ) $stdout.sync = true - trace 'Booted. Sending Request for file' runner_begin + trace 'Booted. Sending Request for file' @io.write RequestFile.new begin process_messages @@ -36,7 +37,15 @@ module Hydra #:nodoc: def runner_begin trace "Firing runner_begin event" - @event_listeners.each {|l| l.runner_begin } + @event_listeners.each {|l| l.runner_begin( self ) } + end + + def self.runner_instance=( runner ) + @runner_instance = runner + end + + def self.runner_instance + @runner_instance end # Run a test file and report the results @@ -62,13 +71,13 @@ module Hydra #:nodoc: # Stop running def stop + runner_end if @running @running = false - runner_end end def runner_end - trace "Firing runner_end event" - @event_listeners.each {|l| l.runner_end } +# trace "Firing runner_end event" + @event_listeners.each {|l| l.runner_end( self ) } end private @@ -279,4 +288,8 @@ module Hydra #:nodoc: end.compact end end + + at_exit do + Runner.runner_instance.stop if Runner.runner_instance + end end diff --git a/lib/hydra/runner_listener/abstract.rb b/lib/hydra/runner_listener/abstract.rb index 64cb7fe..9a12d0f 100644 --- a/lib/hydra/runner_listener/abstract.rb +++ b/lib/hydra/runner_listener/abstract.rb @@ -12,11 +12,11 @@ module Hydra #:nodoc: end # Fired by the runner just before requesting the first file - def runner_begin + def runner_begin( runner ) end # Fired by the runner just after stoping - def runner_end + def runner_end( runner ) end end end diff --git a/lib/hydra/worker.rb b/lib/hydra/worker.rb index c0b6f2b..7b3c5a5 100644 --- a/lib/hydra/worker.rb +++ b/lib/hydra/worker.rb @@ -21,6 +21,14 @@ module Hydra #:nodoc: @listeners = [] load_worker_initializer + + @runner_event_listeners = Array(opts.fetch(:runner_listeners) { nil }) + @runner_event_listeners.select{|l| l.is_a? String}.each do |l| + @runner_event_listeners.delete_at(@runner_event_listeners.index(l)) + listener = eval(l) + @runner_event_listeners << listener if listener.is_a?(Hydra::RunnerListener::Abstract) + end + boot_runners(opts.fetch(:runners) { 1 }) @io.write(Hydra::Messages::Worker::WorkerBegin.new) @@ -83,7 +91,7 @@ module Hydra #:nodoc: pipe = Hydra::Pipe.new child = SafeFork.fork do pipe.identify_as_child - Hydra::Runner.new(:io => pipe, :verbose => @verbose) + Hydra::Runner.new(:io => pipe, :verbose => @verbose, :runner_listeners => @runner_event_listeners ) end pipe.identify_as_parent @runners << { :pid => child, :io => pipe, :idle => false } @@ -121,7 +129,8 @@ module Hydra #:nodoc: end rescue IOError => ex trace "Worker lost Master" - Thread.exit + shutdown + #Thread.exit end end end diff --git a/test/fixtures/hydra_worker_init.rb b/test/fixtures/hydra_worker_init.rb new file mode 100644 index 0000000..3a61aaf --- /dev/null +++ b/test/fixtures/hydra_worker_init.rb @@ -0,0 +1,2 @@ +require '../test/fixtures/runner_listeners.rb' +require '../test/fixtures/master_listeners.rb' diff --git a/test/fixtures/master_listeners.rb b/test/fixtures/master_listeners.rb new file mode 100644 index 0000000..af673d1 --- /dev/null +++ b/test/fixtures/master_listeners.rb @@ -0,0 +1,10 @@ +module HydraExtension + module Listener + class WorkerBeganFlag < Hydra::Listener::Abstract + # Fired after runner processes have been started + def worker_begin(worker) + FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'worker_began_flag')) + end + end + end +end diff --git a/test/fixtures/runner_listeners.rb b/test/fixtures/runner_listeners.rb index 47929c6..49f8e81 100644 --- a/test/fixtures/runner_listeners.rb +++ b/test/fixtures/runner_listeners.rb @@ -1,17 +1,17 @@ -require File.join(File.dirname(__FILE__), '..', 'test_helper') - -module RunnerListener - class RunnerBeginTest < Hydra::RunnerListener::Abstract - # Fired by the runner just before requesting the first file - def runner_begin - FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt')) +module HydraExtension + module RunnerListener + class RunnerBeginTest < Hydra::RunnerListener::Abstract + # Fired by the runner just before requesting the first file + def runner_begin( runner ) + FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt')) + end end - end - class RunnerEndTest < Hydra::RunnerListener::Abstract - # Fired by the runner just after stoping - def runner_end - FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt')) + class RunnerEndTest < Hydra::RunnerListener::Abstract + # Fired by the runner just after stoping + def runner_end( runner ) + FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt')) + end end end end diff --git a/test/master_test.rb b/test/master_test.rb index e32a183..f082732 100644 --- a/test/master_test.rb +++ b/test/master_test.rb @@ -1,4 +1,6 @@ require File.join(File.dirname(__FILE__), 'test_helper') +require File.join(File.dirname(__FILE__), 'fixtures', 'runner_listeners') +require File.join(File.dirname(__FILE__), 'fixtures', 'master_listeners') class MasterTest < Test::Unit::TestCase context "with a file to test and a destination to verify" do @@ -178,4 +180,110 @@ class MasterTest < Test::Unit::TestCase assert !File.exists?(File.join(remote, 'test_b.rb')), "B was not deleted" end end + + context "with a runner_end event" do + setup do + # avoid having other tests interfering with us + sleep(0.2) + FileUtils.rm_f(target_file) + FileUtils.rm_f(alternate_target_file) + + @worker_began_flag = File.expand_path(File.join(Dir.consistent_tmpdir, 'worker_began_flag')) #used to know when the worker is ready + FileUtils.rm_f(@worker_began_flag) + + @runner_listener = 'HydraExtension::RunnerListener::RunnerEndTest.new' # runner_end method that creates alternate_target_file + @master_listener = HydraExtension::Listener::WorkerBeganFlag.new #used to know when the runner is up + end + + teardown do + FileUtils.rm_f(target_file) + FileUtils.rm_f(alternate_target_file) + end + + context "running a local worker" do + setup do + @pid = Process.fork do + Hydra::Master.new( + :files => [test_file] * 6, + :autosort => false, + :listeners => [@master_listener], + :runner_listeners => [@runner_listener], + :verbose => false + ) + end + end + + should "run runner_end on successful termination" do + Process.waitpid @pid + + assert File.exists?( target_file ) + + wait_for_file_for_a_while alternate_target_file, 2 + assert File.exists? alternate_target_file + end + + should "run runner_end after interruption signal" do + wait_for_runner_to_begin + Process.kill 'SIGINT', @pid + Process.waitpid @pid + + wait_for_file_for_a_while alternate_target_file, 2 + assert File.exists? alternate_target_file # runner_end should create this file + end + end + + context "running a remote worker" do + setup do + copy_worker_init_file # this method has a protection to avoid erasing an existing worker_init_file + @pid = Process.fork do + Hydra::Master.new( + :files => [test_file] * 6, + :autosort => false, + :listeners => [@master_listener], + :runner_listeners => [@runner_listener], + :workers => [{ + :type => :ssh, + :connect => 'localhost', + :directory => remote_dir_path, + :runners => 1 + }], + :verbose => false + ) + end + end + + teardown do + FileUtils.rm_f(@remote_init_file) unless @protect_init_file + end + + should "run runner_end on successful termination" do + Process.waitpid @pid + + wait_for_file_for_a_while alternate_target_file, 2 + assert File.exists? target_file + assert File.exists? alternate_target_file + end + end + end + + private + # this requires that a worker_begin listener creates a file named worker_began_flag in tmp directory + def wait_for_runner_to_begin + FileUtils.rm_f(@worker_began_flag) + + wait_for_file_for_a_while @worker_began_flag, 2 + assert File.exists?( @worker_began_flag ), "The worker didn't begin!!" + end + + # with a protection to avoid erasing something important in lib + def copy_worker_init_file + @remote_init_file = "#{remote_dir_path}/#{File.basename( hydra_worker_init_file )}" + if File.exists?( @remote_init_file ) + $stderr.puts "\nWARNING!!!: #{@remote_init_file} exits and this test needs to create a new file here. Make sure there is nothing inportant in that file and remove it before running this test\n\n" + @protect_init_file = true + exit + end + # copy the hydra_worker_init to the correct location + FileUtils.cp(hydra_worker_init_file, remote_dir_path) + end end diff --git a/test/runner_test.rb b/test/runner_test.rb index bbda5d8..a1df47a 100644 --- a/test/runner_test.rb +++ b/test/runner_test.rb @@ -107,7 +107,7 @@ class RunnerTest < Test::Unit::TestCase request_a_file_and_verify_completion(pipe, test_file) end - run_the_runner(pipe, [RunnerListener::RunnerBeginTest.new] ) + run_the_runner(pipe, [HydraExtension::RunnerListener::RunnerBeginTest.new] ) Process.wait(parent) # ensure runner_begin was fired @@ -115,7 +115,7 @@ class RunnerTest < Test::Unit::TestCase end should "fire runner_end event after successful shutting down" do - send_file_to_ssh_runner_and_verify_completion ", :runner_listeners => [RunnerListener::RunnerEndTest.new]" + send_file_to_ssh_runner_and_verify_completion ", :runner_listeners => [HydraExtension::RunnerListener::RunnerEndTest.new]" wait_for_file_for_a_while alternate_target_file, 2 @@ -130,12 +130,10 @@ class RunnerTest < Test::Unit::TestCase # grab its response. response = pipe.gets - pipe.close #this will be detected by the runner and it should call runner_end - end - run_the_runner(pipe, [RunnerListener::RunnerEndTest.new] ) + run_the_runner(pipe, [HydraExtension::RunnerListener::RunnerEndTest.new] ) Process.wait(parent) # ensure runner_begin was fired diff --git a/test/test_helper.rb b/test/test_helper.rb index 4d7c6d2..26fec8f 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -60,6 +60,20 @@ class Test::Unit::TestCase def remote_dir_path File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')) end + + def hydra_worker_init_file + File.expand_path(File.join(File.dirname(__FILE__), 'fixtures', 'hydra_worker_init.rb')) + end + + #this method allow us to wait for a file for a maximum number of time, so the + #test can pass in slower machines. This helps to speed up the tests + def wait_for_file_for_a_while file, time_to_wait + time_begin = Time.now + + until Time.now - time_begin >= time_to_wait or File.exists?( file ) do + sleep 0.01 + end + end end module Hydra #:nodoc: From 3a613ef1b2c16c1e41d8e672978374e8cf118a5f Mon Sep 17 00:00:00 2001 From: Arturo Pie Date: Wed, 1 Jun 2011 16:45:31 -0400 Subject: [PATCH 04/11] Removed the class methods --- lib/hydra/runner.rb | 19 ++++++++----------- test/fixtures/runner_listeners.rb | 1 + 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/lib/hydra/runner.rb b/lib/hydra/runner.rb index 432d77b..696a232 100644 --- a/lib/hydra/runner.rb +++ b/lib/hydra/runner.rb @@ -16,7 +16,6 @@ module Hydra #:nodoc: # Boot up a runner. It takes an IO object (generally a pipe from its # parent) to send it messages on which files to execute. def initialize(opts = {}) - Runner.runner_instance = self # save Runner to make sure it stop at_exit @io = opts.fetch(:io) { raise "No IO Object" } @verbose = opts.fetch(:verbose) { false } @event_listeners = Array( opts.fetch( :runner_listeners ) { nil } ) @@ -25,6 +24,8 @@ module Hydra #:nodoc: runner_begin + reg_exit_hook + trace 'Booted. Sending Request for file' @io.write RequestFile.new begin @@ -40,12 +41,11 @@ module Hydra #:nodoc: @event_listeners.each {|l| l.runner_begin( self ) } end - def self.runner_instance=( runner ) - @runner_instance = runner - end - - def self.runner_instance - @runner_instance + def reg_exit_hook + at_exit do + # NOTE: do not use trace here + stop + end end # Run a test file and report the results @@ -71,6 +71,7 @@ module Hydra #:nodoc: # Stop running def stop + # NOTE: do not use trace here runner_end if @running @running = false end @@ -288,8 +289,4 @@ module Hydra #:nodoc: end.compact end end - - at_exit do - Runner.runner_instance.stop if Runner.runner_instance - end end diff --git a/test/fixtures/runner_listeners.rb b/test/fixtures/runner_listeners.rb index 49f8e81..fb4b12d 100644 --- a/test/fixtures/runner_listeners.rb +++ b/test/fixtures/runner_listeners.rb @@ -10,6 +10,7 @@ module HydraExtension class RunnerEndTest < Hydra::RunnerListener::Abstract # Fired by the runner just after stoping def runner_end( runner ) + # NOTE: do not use trace here FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt')) end end From 692c8cf7906d2f780804364c2d7c389450d9acdb Mon Sep 17 00:00:00 2001 From: Arturo Pie Date: Wed, 1 Jun 2011 17:29:45 -0400 Subject: [PATCH 05/11] redirecting some ugly output when running tests --- test/master_test.rb | 48 +++++++++++++++++++++++++-------------------- test/runner_test.rb | 28 ++++++++++++++------------ test/test_helper.rb | 13 ++++++++++++ 3 files changed, 55 insertions(+), 34 deletions(-) diff --git a/test/master_test.rb b/test/master_test.rb index f082732..79008fa 100644 --- a/test/master_test.rb +++ b/test/master_test.rb @@ -202,14 +202,16 @@ class MasterTest < Test::Unit::TestCase context "running a local worker" do setup do - @pid = Process.fork do - Hydra::Master.new( - :files => [test_file] * 6, - :autosort => false, - :listeners => [@master_listener], - :runner_listeners => [@runner_listener], - :verbose => false - ) + capture_stderr do # redirect stderr + @pid = Process.fork do + Hydra::Master.new( + :files => [test_file] * 6, + :autosort => false, + :listeners => [@master_listener], + :runner_listeners => [@runner_listener], + :verbose => false + ) + end end end @@ -224,7 +226,9 @@ class MasterTest < Test::Unit::TestCase should "run runner_end after interruption signal" do wait_for_runner_to_begin + Process.kill 'SIGINT', @pid + Process.waitpid @pid wait_for_file_for_a_while alternate_target_file, 2 @@ -235,20 +239,22 @@ class MasterTest < Test::Unit::TestCase context "running a remote worker" do setup do copy_worker_init_file # this method has a protection to avoid erasing an existing worker_init_file - @pid = Process.fork do - Hydra::Master.new( - :files => [test_file] * 6, - :autosort => false, - :listeners => [@master_listener], - :runner_listeners => [@runner_listener], - :workers => [{ - :type => :ssh, - :connect => 'localhost', - :directory => remote_dir_path, - :runners => 1 + capture_stderr do # redirect stderr + @pid = Process.fork do + Hydra::Master.new( + :files => [test_file] * 6, + :autosort => false, + :listeners => [@master_listener], + :runner_listeners => [@runner_listener], + :workers => [{ + :type => :ssh, + :connect => 'localhost', + :directory => remote_dir_path, + :runners => 1 }], - :verbose => false - ) + :verbose => false + ) + end end end diff --git a/test/runner_test.rb b/test/runner_test.rb index a1df47a..f1c0600 100644 --- a/test/runner_test.rb +++ b/test/runner_test.rb @@ -80,20 +80,22 @@ class RunnerTest < Test::Unit::TestCase # because of all the crap cucumber pulls in # we run this in a fork to not contaminate # the main test environment - pid = Process.fork do - runner = Hydra::Runner.new(:io => File.new('/dev/null', 'w')) - runner.run_file(cucumber_feature_file) - assert File.exists?(target_file) - assert_equal "HYDRA", File.read(target_file) - - FileUtils.rm_f(target_file) - - runner.run_file(alternate_cucumber_feature_file) - assert File.exists?(alternate_target_file) - assert_equal "HYDRA", File.read(alternate_target_file) - assert !File.exists?(target_file) + capture_stderr do # redirect stderr + pid = Process.fork do + runner = Hydra::Runner.new(:io => File.new('/dev/null', 'w')) + runner.run_file(cucumber_feature_file) + assert File.exists?(target_file) + assert_equal "HYDRA", File.read(target_file) + + FileUtils.rm_f(target_file) + + runner.run_file(alternate_cucumber_feature_file) + assert File.exists?(alternate_target_file) + assert_equal "HYDRA", File.read(alternate_target_file) + assert !File.exists?(target_file) + end + Process.wait pid end - Process.wait pid end should "be able to run a runner over ssh" do diff --git a/test/test_helper.rb b/test/test_helper.rb index 26fec8f..ca7383c 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -4,6 +4,7 @@ gem 'shoulda', '2.10.3' gem 'rspec', '2.0.0.beta.19' require 'shoulda' require 'tmpdir' +require "stringio" $LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) $LOAD_PATH.unshift(File.dirname(__FILE__)) @@ -65,6 +66,18 @@ class Test::Unit::TestCase File.expand_path(File.join(File.dirname(__FILE__), 'fixtures', 'hydra_worker_init.rb')) end + def capture_stderr + # The output stream must be an IO-like object. In this case we capture it in + # an in-memory IO object so we can return the string value. You can assign any + # IO object here. + previous_stderr, $stderr = $stderr, StringIO.new + yield + $stderr.string + ensure + # Restore the previous value of stderr (typically equal to STDERR). + $stderr = previous_stderr + end + #this method allow us to wait for a file for a maximum number of time, so the #test can pass in slower machines. This helps to speed up the tests def wait_for_file_for_a_while file, time_to_wait From 5bd2e5f32384a7120155bb33469ac784f07a35d2 Mon Sep 17 00:00:00 2001 From: Arturo Pie Date: Thu, 2 Jun 2011 14:18:54 -0400 Subject: [PATCH 06/11] Fixed indentation in master_test and a runner test --- test/master_test.rb | 36 ++++++++++---------- test/runner_test.rb | 82 ++++++++++++++++++++++----------------------- 2 files changed, 59 insertions(+), 59 deletions(-) diff --git a/test/master_test.rb b/test/master_test.rb index 79008fa..d4be7c4 100644 --- a/test/master_test.rb +++ b/test/master_test.rb @@ -205,12 +205,12 @@ class MasterTest < Test::Unit::TestCase capture_stderr do # redirect stderr @pid = Process.fork do Hydra::Master.new( - :files => [test_file] * 6, - :autosort => false, - :listeners => [@master_listener], - :runner_listeners => [@runner_listener], - :verbose => false - ) + :files => [test_file] * 6, + :autosort => false, + :listeners => [@master_listener], + :runner_listeners => [@runner_listener], + :verbose => false + ) end end end @@ -242,18 +242,18 @@ class MasterTest < Test::Unit::TestCase capture_stderr do # redirect stderr @pid = Process.fork do Hydra::Master.new( - :files => [test_file] * 6, - :autosort => false, - :listeners => [@master_listener], - :runner_listeners => [@runner_listener], - :workers => [{ - :type => :ssh, - :connect => 'localhost', - :directory => remote_dir_path, - :runners => 1 - }], - :verbose => false - ) + :files => [test_file] * 6, + :autosort => false, + :listeners => [@master_listener], + :runner_listeners => [@runner_listener], + :workers => [{ + :type => :ssh, + :connect => 'localhost', + :directory => remote_dir_path, + :runners => 1 + }], + :verbose => false + ) end end end diff --git a/test/runner_test.rb b/test/runner_test.rb index f1c0600..80eb796 100644 --- a/test/runner_test.rb +++ b/test/runner_test.rb @@ -99,30 +99,56 @@ class RunnerTest < Test::Unit::TestCase end should "be able to run a runner over ssh" do - send_file_to_ssh_runner_and_verify_completion + ssh = Hydra::SSH.new( + 'localhost', + File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')), + "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Runner.new(:io => Hydra::Stdio.new, :verbose => true);\"" + ) + assert ssh.gets.is_a?(Hydra::Messages::Runner::RequestFile) + ssh.write(Hydra::Messages::Worker::RunFile.new(:file => test_file)) + + # grab its response. This makes us wait for it to finish + echo = ssh.gets # get the ssh echo + response = ssh.gets + + assert_equal Hydra::Messages::Runner::Results, response.class + + # tell it to shut down + ssh.write(Hydra::Messages::Worker::Shutdown.new) + + ssh.close + + # ensure it ran + assert File.exists?(target_file) + assert_equal "HYDRA", File.read(target_file) end context "using runner events" do - should "fire runner_begin event" do - pipe = Hydra::Pipe.new - parent = Process.fork do - request_a_file_and_verify_completion(pipe, test_file) + context "on successful termination" do + setup do + @pipe = Hydra::Pipe.new + @parent = Process.fork do + request_a_file_and_verify_completion(@pipe, test_file) + end end - run_the_runner(pipe, [HydraExtension::RunnerListener::RunnerBeginTest.new] ) - Process.wait(parent) + should "fire runner_begin event" do + run_the_runner(@pipe, [HydraExtension::RunnerListener::RunnerBeginTest.new] ) + Process.wait(@parent) - # ensure runner_begin was fired - assert File.exists?( alternate_target_file ) - end + # ensure runner_begin was fired + assert File.exists?( alternate_target_file ) + end - should "fire runner_end event after successful shutting down" do - send_file_to_ssh_runner_and_verify_completion ", :runner_listeners => [HydraExtension::RunnerListener::RunnerEndTest.new]" + should "fire runner_end event after successful shutting down" do + run_the_runner(@pipe, [HydraExtension::RunnerListener::RunnerEndTest.new] ) + Process.wait(@parent) - wait_for_file_for_a_while alternate_target_file, 2 + wait_for_file_for_a_while alternate_target_file, 2 - # ensure runner_end was fired - assert File.exists?( alternate_target_file ) + # ensure runner_end was fired + assert File.exists?( alternate_target_file ) + end end should "fire runner_end event after losing communication with worker" do @@ -178,32 +204,6 @@ class RunnerTest < Test::Unit::TestCase pipe.identify_as_child Hydra::Runner.new( :io => pipe, :runner_listeners => listeners ) end - - def send_file_to_ssh_runner_and_verify_completion opts = "" - ssh = Hydra::SSH.new( - 'localhost', - remote_dir_path, - "ruby -e \"require 'rubygems'; require 'hydra'; require '../test/fixtures/runner_listeners' ; Hydra::Runner.new(:io => Hydra::Stdio.new, :verbose => true #{opts} );\"" - ) - - assert ssh.gets.is_a?(Hydra::Messages::Runner::RequestFile) - ssh.write(Hydra::Messages::Worker::RunFile.new(:file => test_file)) - - # grab its response. This makes us wait for it to finish - echo = ssh.gets # get the ssh echo - response = ssh.gets - - assert_equal Hydra::Messages::Runner::Results, response.class - - # tell it to shut down - ssh.write(Hydra::Messages::Worker::Shutdown.new) - - ssh.close - - # ensure it ran - assert File.exists?(target_file) - assert_equal "HYDRA", File.read(target_file) - end end include RunnerTestHelper end From d6ff3ea5d22265358a51bd4350bbb2ad2a1f3f98 Mon Sep 17 00:00:00 2001 From: Arturo Pie Date: Thu, 2 Jun 2011 15:02:58 -0400 Subject: [PATCH 07/11] Some fixes to the tests and removed some commented-out code --- lib/hydra/master.rb | 2 -- lib/hydra/worker.rb | 1 - test/master_test.rb | 46 +++++++++++++++++++++++++-------------------- test/runner_test.rb | 20 +++----------------- test/test_helper.rb | 4 +++- 5 files changed, 32 insertions(+), 41 deletions(-) diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb index 0296e99..8fb0903 100644 --- a/lib/hydra/master.rb +++ b/lib/hydra/master.rb @@ -173,8 +173,6 @@ module Hydra #:nodoc: def boot_ssh_worker(worker) sync = Sync.new(worker, @sync, @verbose) -# @environment+=" bundle exec" #used for manually testing - runners = worker.fetch('runners') { raise "You must specify the number of runners" } command = worker.fetch('command') { "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose}, :runner_listeners => \'#{@string_runner_event_listeners}\' );\"" diff --git a/lib/hydra/worker.rb b/lib/hydra/worker.rb index 7b3c5a5..01350cd 100644 --- a/lib/hydra/worker.rb +++ b/lib/hydra/worker.rb @@ -130,7 +130,6 @@ module Hydra #:nodoc: rescue IOError => ex trace "Worker lost Master" shutdown - #Thread.exit end end end diff --git a/test/master_test.rb b/test/master_test.rb index d4be7c4..2d9cc21 100644 --- a/test/master_test.rb +++ b/test/master_test.rb @@ -201,9 +201,8 @@ class MasterTest < Test::Unit::TestCase end context "running a local worker" do - setup do - capture_stderr do # redirect stderr - @pid = Process.fork do + should "run runner_end on successful termination" do + @pid = Process.fork do Hydra::Master.new( :files => [test_file] * 6, :autosort => false, @@ -212,27 +211,37 @@ class MasterTest < Test::Unit::TestCase :verbose => false ) end - end - end - - should "run runner_end on successful termination" do Process.waitpid @pid - assert File.exists?( target_file ) - - wait_for_file_for_a_while alternate_target_file, 2 - assert File.exists? alternate_target_file + assert_file_exists alternate_target_file end should "run runner_end after interruption signal" do + + class << @master_listener + def worker_begin( worker ) + super + sleep 1 while true #ensure the process doesn't finish before killing it + end + end + + capture_stderr do # redirect stderr + @pid = Process.fork do + Hydra::Master.new( + :files => [test_file], + :autosort => false, + :listeners => [@master_listener], + :runner_listeners => [@runner_listener], + :verbose => false + ) + end + end wait_for_runner_to_begin Process.kill 'SIGINT', @pid - Process.waitpid @pid - wait_for_file_for_a_while alternate_target_file, 2 - assert File.exists? alternate_target_file # runner_end should create this file + assert_file_exists alternate_target_file end end @@ -242,7 +251,7 @@ class MasterTest < Test::Unit::TestCase capture_stderr do # redirect stderr @pid = Process.fork do Hydra::Master.new( - :files => [test_file] * 6, + :files => [test_file], :autosort => false, :listeners => [@master_listener], :runner_listeners => [@runner_listener], @@ -265,9 +274,7 @@ class MasterTest < Test::Unit::TestCase should "run runner_end on successful termination" do Process.waitpid @pid - wait_for_file_for_a_while alternate_target_file, 2 - assert File.exists? target_file - assert File.exists? alternate_target_file + assert_file_exists alternate_target_file end end end @@ -277,8 +284,7 @@ class MasterTest < Test::Unit::TestCase def wait_for_runner_to_begin FileUtils.rm_f(@worker_began_flag) - wait_for_file_for_a_while @worker_began_flag, 2 - assert File.exists?( @worker_began_flag ), "The worker didn't begin!!" + assert_file_exists @worker_began_flag end # with a protection to avoid erasing something important in lib diff --git a/test/runner_test.rb b/test/runner_test.rb index 80eb796..3b04189 100644 --- a/test/runner_test.rb +++ b/test/runner_test.rb @@ -137,17 +137,14 @@ class RunnerTest < Test::Unit::TestCase Process.wait(@parent) # ensure runner_begin was fired - assert File.exists?( alternate_target_file ) + assert_file_exists alternate_target_file end - should "fire runner_end event after successful shutting down" do + should "fire runner_end event" do run_the_runner(@pipe, [HydraExtension::RunnerListener::RunnerEndTest.new] ) Process.wait(@parent) - wait_for_file_for_a_while alternate_target_file, 2 - - # ensure runner_end was fired - assert File.exists?( alternate_target_file ) + assert_file_exists alternate_target_file end end @@ -171,17 +168,6 @@ class RunnerTest < Test::Unit::TestCase end module RunnerTestHelper - - #this method allow us to wait for a file for a maximum number of time, so the - #test can pass in slower machines. This helps to speed up the tests - def wait_for_file_for_a_while file, time_to_wait - time_begin = Time.now - - until Time.now - time_begin >= time_to_wait or File.exists?( file ) do - sleep 0.01 - end - end - def request_a_file_and_verify_completion(pipe, file) pipe.identify_as_parent diff --git a/test/test_helper.rb b/test/test_helper.rb index ca7383c..a05954c 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -80,12 +80,14 @@ class Test::Unit::TestCase #this method allow us to wait for a file for a maximum number of time, so the #test can pass in slower machines. This helps to speed up the tests - def wait_for_file_for_a_while file, time_to_wait + def assert_file_exists file, time_to_wait = 2 time_begin = Time.now until Time.now - time_begin >= time_to_wait or File.exists?( file ) do sleep 0.01 end + + assert File.exists?( file ) end end From 49dfd644b81cab85de1b92f317c63e2d706d86c8 Mon Sep 17 00:00:00 2001 From: Arturo Pie Date: Fri, 3 Jun 2011 18:51:36 -0400 Subject: [PATCH 08/11] Sig Hup was implemented --- lib/hydra/runner.rb | 42 ++++++++++++++++++------------- test/fixtures/runner_listeners.rb | 5 ++++ test/master_test.rb | 40 +++++++++++++++-------------- test/runner_test.rb | 2 +- 4 files changed, 51 insertions(+), 38 deletions(-) diff --git a/lib/hydra/runner.rb b/lib/hydra/runner.rb index 696a232..dd22e1e 100644 --- a/lib/hydra/runner.rb +++ b/lib/hydra/runner.rb @@ -16,16 +16,16 @@ module Hydra #:nodoc: # Boot up a runner. It takes an IO object (generally a pipe from its # parent) to send it messages on which files to execute. def initialize(opts = {}) + redirect_output + reg_trap_sighup + @io = opts.fetch(:io) { raise "No IO Object" } @verbose = opts.fetch(:verbose) { false } @event_listeners = Array( opts.fetch( :runner_listeners ) { nil } ) $stdout.sync = true - runner_begin - reg_exit_hook - trace 'Booted. Sending Request for file' @io.write RequestFile.new begin @@ -36,18 +36,19 @@ module Hydra #:nodoc: end end + def reg_trap_sighup + trap :SIGHUP do + File.open("_log_output", 'a'){ |f| f << "SIGHUP trapped"} + stop + end + @runner_began = true + end + def runner_begin trace "Firing runner_begin event" @event_listeners.each {|l| l.runner_begin( self ) } end - def reg_exit_hook - at_exit do - # NOTE: do not use trace here - stop - end - end - # Run a test file and report the results def run_file(file) trace "Running file: #{file}" @@ -71,16 +72,19 @@ module Hydra #:nodoc: # Stop running def stop - # NOTE: do not use trace here - runner_end if @running - @running = false + runner_end if @runner_began + @runner_began = @running = false end def runner_end -# trace "Firing runner_end event" + trace "Ending runner #{self.inspect}" @event_listeners.each {|l| l.runner_end( self ) } end + def format_exception(ex) + "#{ex.class.name}: #{ex.message}\n #{ex.backtrace.join("\n ")}" + end + private # The runner will continually read messages and handle them. @@ -108,10 +112,6 @@ module Hydra #:nodoc: "Error in #{file}:\n #{format_exception(ex)}" end - def format_exception(ex) - "#{ex.class.name}: #{ex.message}\n #{ex.backtrace.join("\n ")}" - end - # Run all the Test::Unit Suites in a ruby file def run_test_unit_file(file) begin @@ -288,5 +288,11 @@ module Hydra #:nodoc: end end.compact end + + def redirect_output file_name = nil + file_name = 'log/hydra.log' if !file_name and File.exists? 'log/' + file_name = 'hydra.log' unless file_name + $stderr = $stdout = File.open(file_name, 'a') + end end end diff --git a/test/fixtures/runner_listeners.rb b/test/fixtures/runner_listeners.rb index fb4b12d..1221423 100644 --- a/test/fixtures/runner_listeners.rb +++ b/test/fixtures/runner_listeners.rb @@ -8,9 +8,14 @@ module HydraExtension end class RunnerEndTest < Hydra::RunnerListener::Abstract + # Fired by the runner just before requesting the first file + def runner_begin( runner ) + FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'runner_began_flag')) #used to know when the runner is ready + end # Fired by the runner just after stoping def runner_end( runner ) # NOTE: do not use trace here + #runner.trace "Ending runner" FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt')) end end diff --git a/test/master_test.rb b/test/master_test.rb index 2d9cc21..2881ada 100644 --- a/test/master_test.rb +++ b/test/master_test.rb @@ -188,8 +188,8 @@ class MasterTest < Test::Unit::TestCase FileUtils.rm_f(target_file) FileUtils.rm_f(alternate_target_file) - @worker_began_flag = File.expand_path(File.join(Dir.consistent_tmpdir, 'worker_began_flag')) #used to know when the worker is ready - FileUtils.rm_f(@worker_began_flag) + @runner_began_flag = File.expand_path(File.join(Dir.consistent_tmpdir, 'runner_began_flag')) #used to know when the worker is ready + FileUtils.rm_f(@runner_began_flag) @runner_listener = 'HydraExtension::RunnerListener::RunnerEndTest.new' # runner_end method that creates alternate_target_file @master_listener = HydraExtension::Listener::WorkerBeganFlag.new #used to know when the runner is up @@ -217,13 +217,7 @@ class MasterTest < Test::Unit::TestCase end should "run runner_end after interruption signal" do - - class << @master_listener - def worker_begin( worker ) - super - sleep 1 while true #ensure the process doesn't finish before killing it - end - end + add_infinite_worker_begin_to @master_listener capture_stderr do # redirect stderr @pid = Process.fork do @@ -248,6 +242,13 @@ class MasterTest < Test::Unit::TestCase context "running a remote worker" do setup do copy_worker_init_file # this method has a protection to avoid erasing an existing worker_init_file + end + + teardown do + FileUtils.rm_f(@remote_init_file) unless @protect_init_file + end + + should "run runner_end on successful termination" do capture_stderr do # redirect stderr @pid = Process.fork do Hydra::Master.new( @@ -265,13 +266,6 @@ class MasterTest < Test::Unit::TestCase ) end end - end - - teardown do - FileUtils.rm_f(@remote_init_file) unless @protect_init_file - end - - should "run runner_end on successful termination" do Process.waitpid @pid assert_file_exists alternate_target_file @@ -280,11 +274,19 @@ class MasterTest < Test::Unit::TestCase end private + + def add_infinite_worker_begin_to master_listener + class << master_listener + def worker_begin( worker ) + super + sleep 1 while true #ensure the process doesn't finish before killing it + end + end + end + # this requires that a worker_begin listener creates a file named worker_began_flag in tmp directory def wait_for_runner_to_begin - FileUtils.rm_f(@worker_began_flag) - - assert_file_exists @worker_began_flag + assert_file_exists @runner_began_flag end # with a protection to avoid erasing something important in lib diff --git a/test/runner_test.rb b/test/runner_test.rb index 3b04189..381229e 100644 --- a/test/runner_test.rb +++ b/test/runner_test.rb @@ -161,7 +161,7 @@ class RunnerTest < Test::Unit::TestCase run_the_runner(pipe, [HydraExtension::RunnerListener::RunnerEndTest.new] ) Process.wait(parent) - # ensure runner_begin was fired + # ensure runner_end was fired assert File.exists?( alternate_target_file ) end end From 0d4a1238f9021ca27da6c3dfbe8d4ab5db927db3 Mon Sep 17 00:00:00 2001 From: Arturo Pie Date: Sun, 5 Jun 2011 23:52:59 -0400 Subject: [PATCH 09/11] Implemented the runners log functionality. It needs to handle the case when an invalid path is given --- lib/hydra/master.rb | 5 +-- lib/hydra/runner.rb | 7 ++--- lib/hydra/tasks.rb | 7 ++++- lib/hydra/worker.rb | 3 +- test/master_test.rb | 74 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 88 insertions(+), 8 deletions(-) diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb index 8fb0903..fe3ff9e 100644 --- a/lib/hydra/master.rb +++ b/lib/hydra/master.rb @@ -67,6 +67,7 @@ module Hydra #:nodoc: @string_runner_event_listeners = Array( opts.fetch( 'runner_listeners' ) { nil } ) + @runner_log_file = opts.fetch('runner_log_file') { nil } @verbose = opts.fetch('verbose') { false } @autosort = opts.fetch('autosort') { true } @sync = opts.fetch('sync') { nil } @@ -163,7 +164,7 @@ module Hydra #:nodoc: pipe = Hydra::Pipe.new child = SafeFork.fork do pipe.identify_as_child - Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose, :runner_listeners => @string_runner_event_listeners ) + Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose, :runner_listeners => @string_runner_event_listeners, :runner_log_file => @runner_log_file ) end pipe.identify_as_parent @@ -175,7 +176,7 @@ module Hydra #:nodoc: runners = worker.fetch('runners') { raise "You must specify the number of runners" } command = worker.fetch('command') { - "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose}, :runner_listeners => \'#{@string_runner_event_listeners}\' );\"" + "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose}, :runner_listeners => \'#{@string_runner_event_listeners}\', :runner_log_file => \'#{@runner_log_file}\' );\"" } trace "Booting SSH worker" diff --git a/lib/hydra/runner.rb b/lib/hydra/runner.rb index dd22e1e..f96cc08 100644 --- a/lib/hydra/runner.rb +++ b/lib/hydra/runner.rb @@ -16,7 +16,7 @@ module Hydra #:nodoc: # Boot up a runner. It takes an IO object (generally a pipe from its # parent) to send it messages on which files to execute. def initialize(opts = {}) - redirect_output + redirect_output( opts.fetch( :runner_log_file ) { nil } ) reg_trap_sighup @io = opts.fetch(:io) { raise "No IO Object" } @@ -289,9 +289,8 @@ module Hydra #:nodoc: end.compact end - def redirect_output file_name = nil - file_name = 'log/hydra.log' if !file_name and File.exists? 'log/' - file_name = 'hydra.log' unless file_name + def redirect_output file_name + file_name = '/dev/null' unless file_name and !file_name.empty? $stderr = $stdout = File.open(file_name, 'a') end end diff --git a/lib/hydra/tasks.rb b/lib/hydra/tasks.rb index 2dc7309..72733d0 100644 --- a/lib/hydra/tasks.rb +++ b/lib/hydra/tasks.rb @@ -41,6 +41,10 @@ module Hydra #:nodoc: # Set to false if you don't want to show the total running time attr_accessor :show_time + # Set to a valid file path if you want to save the output of the runners + # in a log file + attr_accessor :runner_log_file + # # Search for the hydra config file def find_config_file @@ -98,7 +102,8 @@ module Hydra #:nodoc: :autosort => @autosort, :files => @files, :listeners => @listeners, - :environment => @environment + :environment => @environment, + :runner_log_file => @runner_log_file } if @config @opts.merge!(:config => @config) diff --git a/lib/hydra/worker.rb b/lib/hydra/worker.rb index 01350cd..df05c04 100644 --- a/lib/hydra/worker.rb +++ b/lib/hydra/worker.rb @@ -28,6 +28,7 @@ module Hydra #:nodoc: listener = eval(l) @runner_event_listeners << listener if listener.is_a?(Hydra::RunnerListener::Abstract) end + @runner_log_file = opts.fetch(:runner_log_file) { nil } boot_runners(opts.fetch(:runners) { 1 }) @io.write(Hydra::Messages::Worker::WorkerBegin.new) @@ -91,7 +92,7 @@ module Hydra #:nodoc: pipe = Hydra::Pipe.new child = SafeFork.fork do pipe.identify_as_child - Hydra::Runner.new(:io => pipe, :verbose => @verbose, :runner_listeners => @runner_event_listeners ) + Hydra::Runner.new(:io => pipe, :verbose => @verbose, :runner_listeners => @runner_event_listeners, :runner_log_file => @runner_log_file ) end pipe.identify_as_parent @runners << { :pid => child, :io => pipe, :idle => false } diff --git a/test/master_test.rb b/test/master_test.rb index 2881ada..2225e24 100644 --- a/test/master_test.rb +++ b/test/master_test.rb @@ -273,8 +273,82 @@ class MasterTest < Test::Unit::TestCase end end + context "redirecting runner's output and errors" do + setup do + # avoid having other tests interfering with us + sleep(0.2) + FileUtils.rm_f(target_file) + FileUtils.rm_f(runner_log_file) + FileUtils.rm_f("#{remote_dir_path}/#{runner_log_file}") + end + + teardown do + FileUtils.rm_f(target_file) + FileUtils.rm_f(runner_log_file) + FileUtils.rm_f("#{remote_dir_path}/#{runner_log_file}") + end + + should "create a runner log file when usign local worker and passing a log file name" do + @pid = Process.fork do + Hydra::Master.new( + :files => [test_file], + :runner_log_file => runner_log_file, + :verbose => false + ) + end + Process.waitpid @pid + + assert_file_exists target_file # ensure the test was successfully ran + assert_file_exists runner_log_file + end + + should "create a runner log file when usign remote worker and passing a log file name" do + @pid = Process.fork do + Hydra::Master.new( + :files => [test_file], + :workers => [{ + :type => :ssh, + :connect => 'localhost', + :directory => remote_dir_path, + :runners => 1 + }], + :verbose => false, + :runner_log_file => runner_log_file + ) + end + Process.waitpid @pid + + assert_file_exists target_file # ensure the test was successfully ran + assert_file_exists "#{remote_dir_path}/#{runner_log_file}" + end + + # should "NOT create a runner log file when passing a incorrect log file path, but it should run successfully" do + # @pid = Process.fork do + # Hydra::Master.new( + # :files => [test_file], + # :workers => [{ + # :type => :ssh, + # :connect => 'localhost', + # :directory => remote_dir_path, + # :runners => 1 + # }], + # :verbose => false, + # :runner_log_file => 'invalid-dir/runner.log' + # ) + # end + # Process.waitpid @pid + + # assert_file_exists target_file # ensure the test was successfully ran + # assert_file_exists runner_log_file + # end + end + private + def runner_log_file + "hydra_runner.log" + end + def add_infinite_worker_begin_to master_listener class << master_listener def worker_begin( worker ) From 8a536781b21aae116bf0166e3969b45a4c2c82a8 Mon Sep 17 00:00:00 2001 From: Arturo Pie Date: Mon, 6 Jun 2011 11:02:24 -0400 Subject: [PATCH 10/11] Added a default runner log, and a test --- .gitignore | 1 + lib/hydra/runner.rb | 13 +++++++++---- test/master_test.rb | 44 +++++++++++++++++++++++++------------------- 3 files changed, 35 insertions(+), 23 deletions(-) diff --git a/.gitignore b/.gitignore index cf7c4c0..0f26641 100644 --- a/.gitignore +++ b/.gitignore @@ -19,5 +19,6 @@ rdoc pkg tags .rvmrc +hydra-runner.log ## PROJECT::SPECIFIC diff --git a/lib/hydra/runner.rb b/lib/hydra/runner.rb index f96cc08..52b1398 100644 --- a/lib/hydra/runner.rb +++ b/lib/hydra/runner.rb @@ -13,10 +13,13 @@ module Hydra #:nodoc: class Runner include Hydra::Messages::Runner traceable('RUNNER') + + DEFAULT_LOG_FILE = 'hydra-runner.log' + # Boot up a runner. It takes an IO object (generally a pipe from its # parent) to send it messages on which files to execute. def initialize(opts = {}) - redirect_output( opts.fetch( :runner_log_file ) { nil } ) + redirect_output( opts.fetch( :runner_log_file ) { DEFAULT_LOG_FILE } ) reg_trap_sighup @io = opts.fetch(:io) { raise "No IO Object" } @@ -38,7 +41,6 @@ module Hydra #:nodoc: def reg_trap_sighup trap :SIGHUP do - File.open("_log_output", 'a'){ |f| f << "SIGHUP trapped"} stop end @runner_began = true @@ -290,8 +292,11 @@ module Hydra #:nodoc: end def redirect_output file_name - file_name = '/dev/null' unless file_name and !file_name.empty? - $stderr = $stdout = File.open(file_name, 'a') + begin + $stderr = $stdout = File.open(file_name, 'a') + rescue + $stderr = $stdout = File.open(DEFAULT_LOG_FILE, 'a') + end end end end diff --git a/test/master_test.rb b/test/master_test.rb index 2225e24..9458ef6 100644 --- a/test/master_test.rb +++ b/test/master_test.rb @@ -322,31 +322,37 @@ class MasterTest < Test::Unit::TestCase assert_file_exists "#{remote_dir_path}/#{runner_log_file}" end - # should "NOT create a runner log file when passing a incorrect log file path, but it should run successfully" do - # @pid = Process.fork do - # Hydra::Master.new( - # :files => [test_file], - # :workers => [{ - # :type => :ssh, - # :connect => 'localhost', - # :directory => remote_dir_path, - # :runners => 1 - # }], - # :verbose => false, - # :runner_log_file => 'invalid-dir/runner.log' - # ) - # end - # Process.waitpid @pid + should "create the default runner log file when passing an incorrect log file path" do + default_log_file = "#{remote_dir_path}/#{Hydra::Runner::DEFAULT_LOG_FILE}" # hydra-runner.log" + FileUtils.rm_f(default_log_file) - # assert_file_exists target_file # ensure the test was successfully ran - # assert_file_exists runner_log_file - # end + @pid = Process.fork do + Hydra::Master.new( + :files => [test_file], + :workers => [{ + :type => :ssh, + :connect => 'localhost', + :directory => remote_dir_path, + :runners => 1 + }], + :verbose => false, + :runner_log_file => 'invalid-dir/#{runner_log_file}' + ) + end + Process.waitpid @pid + + assert_file_exists target_file # ensure the test was successfully ran + assert_file_exists default_log_file #default log file + assert !File.exists?( "#{remote_dir_path}/#{runner_log_file}" ) + + FileUtils.rm_f(default_log_file) + end end private def runner_log_file - "hydra_runner.log" + "my-hydra-runner.log" end def add_infinite_worker_begin_to master_listener From 5f0b8dc4a51b3f645b025647db40b72a7b006155 Mon Sep 17 00:00:00 2001 From: Arturo Pie Date: Mon, 6 Jun 2011 11:44:12 -0400 Subject: [PATCH 11/11] Added functionality to handle unexpected termination when using local worker --- lib/hydra/runner.rb | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/hydra/runner.rb b/lib/hydra/runner.rb index 52b1398..2749765 100644 --- a/lib/hydra/runner.rb +++ b/lib/hydra/runner.rb @@ -40,8 +40,10 @@ module Hydra #:nodoc: end def reg_trap_sighup - trap :SIGHUP do - stop + for sign in [:SIGHUP, :INT] + trap sign do + stop + end end @runner_began = true end @@ -295,6 +297,8 @@ module Hydra #:nodoc: begin $stderr = $stdout = File.open(file_name, 'a') rescue + # it should always redirect output in order to handle unexpected interruption + # successfully $stderr = $stdout = File.open(DEFAULT_LOG_FILE, 'a') end end