Two runner events implemented and tested at runner level
This commit is contained in:
parent
b51bb8c136
commit
2648c3679f
@ -13,4 +13,4 @@ require 'hydra/listener/minimal_output'
|
|||||||
require 'hydra/listener/report_generator'
|
require 'hydra/listener/report_generator'
|
||||||
require 'hydra/listener/notifier'
|
require 'hydra/listener/notifier'
|
||||||
require 'hydra/listener/progress_bar'
|
require 'hydra/listener/progress_bar'
|
||||||
|
require 'hydra/runner_listener/abstract'
|
||||||
|
@ -17,10 +17,14 @@ module Hydra #:nodoc:
|
|||||||
# parent) to send it messages on which files to execute.
|
# parent) to send it messages on which files to execute.
|
||||||
def initialize(opts = {})
|
def initialize(opts = {})
|
||||||
@io = opts.fetch(:io) { raise "No IO Object" }
|
@io = opts.fetch(:io) { raise "No IO Object" }
|
||||||
@verbose = opts.fetch(:verbose) { false }
|
@verbose = opts.fetch(:verbose) { false }
|
||||||
|
@event_listeners = Array( opts.fetch( :runner_listeners ) { nil } )
|
||||||
|
|
||||||
$stdout.sync = true
|
$stdout.sync = true
|
||||||
trace 'Booted. Sending Request for file'
|
trace 'Booted. Sending Request for file'
|
||||||
|
|
||||||
|
runner_begin
|
||||||
|
|
||||||
@io.write RequestFile.new
|
@io.write RequestFile.new
|
||||||
begin
|
begin
|
||||||
process_messages
|
process_messages
|
||||||
@ -30,6 +34,11 @@ module Hydra #:nodoc:
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def runner_begin
|
||||||
|
trace "Firing runner_begin event"
|
||||||
|
@event_listeners.each {|l| l.runner_begin }
|
||||||
|
end
|
||||||
|
|
||||||
# Run a test file and report the results
|
# Run a test file and report the results
|
||||||
def run_file(file)
|
def run_file(file)
|
||||||
trace "Running file: #{file}"
|
trace "Running file: #{file}"
|
||||||
@ -54,6 +63,12 @@ module Hydra #:nodoc:
|
|||||||
# Stop running
|
# Stop running
|
||||||
def stop
|
def stop
|
||||||
@running = false
|
@running = false
|
||||||
|
runner_end
|
||||||
|
end
|
||||||
|
|
||||||
|
def runner_end
|
||||||
|
trace "Firing runner_end event"
|
||||||
|
@event_listeners.each {|l| l.runner_end }
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
@ -74,7 +89,7 @@ module Hydra #:nodoc:
|
|||||||
end
|
end
|
||||||
rescue IOError => ex
|
rescue IOError => ex
|
||||||
trace "Runner lost Worker"
|
trace "Runner lost Worker"
|
||||||
@running = false
|
stop
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
23
lib/hydra/runner_listener/abstract.rb
Normal file
23
lib/hydra/runner_listener/abstract.rb
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
module Hydra #:nodoc:
|
||||||
|
module RunnerListener #:nodoc:
|
||||||
|
# Abstract listener that implements all the events
|
||||||
|
# but does nothing.
|
||||||
|
class Abstract
|
||||||
|
# Create a new listener.
|
||||||
|
#
|
||||||
|
# Output: The IO object for outputting any information.
|
||||||
|
# Defaults to STDOUT, but you could pass a file in, or STDERR
|
||||||
|
def initialize(output = $stdout)
|
||||||
|
@output = output
|
||||||
|
end
|
||||||
|
|
||||||
|
# Fired by the runner just before requesting the first file
|
||||||
|
def runner_begin
|
||||||
|
end
|
||||||
|
|
||||||
|
# Fired by the runner just after stoping
|
||||||
|
def runner_end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
17
test/fixtures/runner_listeners.rb
vendored
Normal file
17
test/fixtures/runner_listeners.rb
vendored
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
require File.join(File.dirname(__FILE__), '..', 'test_helper')
|
||||||
|
|
||||||
|
module RunnerListener
|
||||||
|
class RunnerBeginTest < Hydra::RunnerListener::Abstract
|
||||||
|
# Fired by the runner just before requesting the first file
|
||||||
|
def runner_begin
|
||||||
|
FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt'))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class RunnerEndTest < Hydra::RunnerListener::Abstract
|
||||||
|
# Fired by the runner just after stoping
|
||||||
|
def runner_end
|
||||||
|
FileUtils.touch File.expand_path(File.join(Dir.consistent_tmpdir, 'alternate_hydra_test.txt'))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
@ -1,4 +1,5 @@
|
|||||||
require File.join(File.dirname(__FILE__), 'test_helper')
|
require File.join(File.dirname(__FILE__), 'test_helper')
|
||||||
|
require File.join(File.dirname(__FILE__), 'fixtures', 'runner_listeners')
|
||||||
|
|
||||||
class RunnerTest < Test::Unit::TestCase
|
class RunnerTest < Test::Unit::TestCase
|
||||||
context "with a file to test and a destination to verify" do
|
context "with a file to test and a destination to verify" do
|
||||||
@ -96,32 +97,65 @@ class RunnerTest < Test::Unit::TestCase
|
|||||||
end
|
end
|
||||||
|
|
||||||
should "be able to run a runner over ssh" do
|
should "be able to run a runner over ssh" do
|
||||||
ssh = Hydra::SSH.new(
|
send_file_to_ssh_runner_and_verify_completion
|
||||||
'localhost',
|
end
|
||||||
remote_dir_path,
|
|
||||||
"ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Runner.new(:io => Hydra::Stdio.new, :verbose => true);\""
|
|
||||||
)
|
|
||||||
assert ssh.gets.is_a?(Hydra::Messages::Runner::RequestFile)
|
|
||||||
ssh.write(Hydra::Messages::Worker::RunFile.new(:file => test_file))
|
|
||||||
|
|
||||||
# grab its response. This makes us wait for it to finish
|
|
||||||
echo = ssh.gets # get the ssh echo
|
|
||||||
response = ssh.gets
|
|
||||||
|
|
||||||
assert_equal Hydra::Messages::Runner::Results, response.class
|
context "using runner events" do
|
||||||
|
should "fire runner_begin event" do
|
||||||
# tell it to shut down
|
pipe = Hydra::Pipe.new
|
||||||
ssh.write(Hydra::Messages::Worker::Shutdown.new)
|
parent = Process.fork do
|
||||||
|
request_a_file_and_verify_completion(pipe, test_file)
|
||||||
|
end
|
||||||
|
|
||||||
ssh.close
|
run_the_runner(pipe, [RunnerListener::RunnerBeginTest.new] )
|
||||||
|
Process.wait(parent)
|
||||||
# ensure it ran
|
|
||||||
assert File.exists?(target_file)
|
# ensure runner_begin was fired
|
||||||
assert_equal "HYDRA", File.read(target_file)
|
assert File.exists?( alternate_target_file )
|
||||||
|
end
|
||||||
|
|
||||||
|
should "fire runner_end event after successful shutting down" do
|
||||||
|
send_file_to_ssh_runner_and_verify_completion ", :runner_listeners => [RunnerListener::RunnerEndTest.new]"
|
||||||
|
|
||||||
|
wait_for_file_for_a_while alternate_target_file, 2
|
||||||
|
|
||||||
|
# ensure runner_end was fired
|
||||||
|
assert File.exists?( alternate_target_file )
|
||||||
|
end
|
||||||
|
|
||||||
|
should "fire runner_end event after losing communication with worker" do
|
||||||
|
pipe = Hydra::Pipe.new
|
||||||
|
parent = Process.fork do
|
||||||
|
pipe.identify_as_parent
|
||||||
|
|
||||||
|
# grab its response.
|
||||||
|
response = pipe.gets
|
||||||
|
|
||||||
|
pipe.close #this will be detected by the runner and it should call runner_end
|
||||||
|
|
||||||
|
end
|
||||||
|
|
||||||
|
run_the_runner(pipe, [RunnerListener::RunnerEndTest.new] )
|
||||||
|
Process.wait(parent)
|
||||||
|
|
||||||
|
# ensure runner_begin was fired
|
||||||
|
assert File.exists?( alternate_target_file )
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
module RunnerTestHelper
|
module RunnerTestHelper
|
||||||
|
|
||||||
|
#this method allow us to wait for a file for a maximum number of time, so the
|
||||||
|
#test can pass in slower machines. This helps to speed up the tests
|
||||||
|
def wait_for_file_for_a_while file, time_to_wait
|
||||||
|
time_begin = Time.now
|
||||||
|
|
||||||
|
until Time.now - time_begin >= time_to_wait or File.exists?( file ) do
|
||||||
|
sleep 0.01
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def request_a_file_and_verify_completion(pipe, file)
|
def request_a_file_and_verify_completion(pipe, file)
|
||||||
pipe.identify_as_parent
|
pipe.identify_as_parent
|
||||||
|
|
||||||
@ -140,9 +174,35 @@ class RunnerTest < Test::Unit::TestCase
|
|||||||
assert_equal "HYDRA", File.read(target_file)
|
assert_equal "HYDRA", File.read(target_file)
|
||||||
end
|
end
|
||||||
|
|
||||||
def run_the_runner(pipe)
|
def run_the_runner(pipe, listeners = [])
|
||||||
pipe.identify_as_child
|
pipe.identify_as_child
|
||||||
Hydra::Runner.new(:io => pipe)
|
Hydra::Runner.new( :io => pipe, :runner_listeners => listeners )
|
||||||
|
end
|
||||||
|
|
||||||
|
def send_file_to_ssh_runner_and_verify_completion opts = ""
|
||||||
|
ssh = Hydra::SSH.new(
|
||||||
|
'localhost',
|
||||||
|
remote_dir_path,
|
||||||
|
"ruby -e \"require 'rubygems'; require 'hydra'; require '../test/fixtures/runner_listeners' ; Hydra::Runner.new(:io => Hydra::Stdio.new, :verbose => true #{opts} );\""
|
||||||
|
)
|
||||||
|
|
||||||
|
assert ssh.gets.is_a?(Hydra::Messages::Runner::RequestFile)
|
||||||
|
ssh.write(Hydra::Messages::Worker::RunFile.new(:file => test_file))
|
||||||
|
|
||||||
|
# grab its response. This makes us wait for it to finish
|
||||||
|
echo = ssh.gets # get the ssh echo
|
||||||
|
response = ssh.gets
|
||||||
|
|
||||||
|
assert_equal Hydra::Messages::Runner::Results, response.class
|
||||||
|
|
||||||
|
# tell it to shut down
|
||||||
|
ssh.write(Hydra::Messages::Worker::Shutdown.new)
|
||||||
|
|
||||||
|
ssh.close
|
||||||
|
|
||||||
|
# ensure it ran
|
||||||
|
assert File.exists?(target_file)
|
||||||
|
assert_equal "HYDRA", File.read(target_file)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
include RunnerTestHelper
|
include RunnerTestHelper
|
||||||
|
Loading…
Reference in New Issue
Block a user