introduce custom listeners for worker_begin/worker_end

Conflicts:

	lib/hydra/listener/abstract.rb
	lib/hydra/message/worker_messages.rb
This commit is contained in:
Lee Bankewitz and Luke Melia 2010-06-08 11:02:57 -04:00
parent e5633f42ac
commit c8091718eb
4 changed files with 45 additions and 18 deletions

View File

@ -2,7 +2,7 @@ module Hydra #:nodoc:
module Listener #:nodoc: module Listener #:nodoc:
# Abstract listener that implements all the events # Abstract listener that implements all the events
# but does nothing. # but does nothing.
class Abstract class Abstract
# Create a new listener. # Create a new listener.
# #
# Output: The IO object for outputting any information. # Output: The IO object for outputting any information.
@ -10,14 +10,23 @@ module Hydra #:nodoc:
def initialize(output = $stdout) def initialize(output = $stdout)
@output = output @output = output
end end
# Fired when testing has started # Fired when testing has started
def testing_begin(files) def testing_begin(files)
end end
# Fired when testing finishes # Fired when testing finishes, after the workers shutdown
def testing_end def testing_end
end end
# Fired after runner processes have been started
def worker_begin(worker)
end
# Fired before shutting down the worker
def worker_end(worker)
end
# Fired when a file is started # Fired when a file is started
def file_begin(file) def file_begin(file)
end end

View File

@ -9,7 +9,7 @@ module Hydra #:nodoc:
# #
# The Master is run once for any given testing session. # The Master is run once for any given testing session.
class YmlLoadError < StandardError; end class YmlLoadError < StandardError; end
class Master class Master
include Hydra::Messages::Master include Hydra::Messages::Master
include Open3 include Open3
@ -37,7 +37,7 @@ module Hydra #:nodoc:
puts "Testing halted by user. Untested files:" puts "Testing halted by user. Untested files:"
puts @incomplete_files.join("\n") puts @incomplete_files.join("\n")
exit exit
end end
opts.stringify_keys! opts.stringify_keys!
config_file = opts.delete('config') { nil } config_file = opts.delete('config') { nil }
@ -54,7 +54,7 @@ module Hydra #:nodoc:
rescue StandardError => e rescue StandardError => e
raise(YmlLoadError,"config file was found, but could not be parsed.\n#{$!.inspect}") raise(YmlLoadError,"config file was found, but could not be parsed.\n#{$!.inspect}")
end end
opts.merge!(config_yml.stringify_keys!) opts.merge!(config_yml.stringify_keys!)
end end
@files = Array(opts.fetch('files') { nil }) @files = Array(opts.fetch('files') { nil })
@ -74,7 +74,7 @@ module Hydra #:nodoc:
@environment = opts.fetch('environment') { 'test' } @environment = opts.fetch('environment') { 'test' }
if @autosort if @autosort
sort_files_from_report sort_files_from_report
@event_listeners << Hydra::Listener::ReportGenerator.new(File.new(heuristic_file, 'w')) @event_listeners << Hydra::Listener::ReportGenerator.new(File.new(heuristic_file, 'w'))
end end
@ -93,8 +93,11 @@ module Hydra #:nodoc:
end end
# Message handling # Message handling
def worker_begin(worker)
# Send a file down to a worker. @event_listeners.each {|l| l.worker_begin(worker) }
end
# Send a file down to a worker.
def send_file(worker) def send_file(worker)
f = @files.shift f = @files.shift
if f if f
@ -120,6 +123,10 @@ module Hydra #:nodoc:
trace "#{@incomplete_files.size} Files Remaining" trace "#{@incomplete_files.size} Files Remaining"
@event_listeners.each{|l| l.file_end(message.file, message.output) } @event_listeners.each{|l| l.file_end(message.file, message.output) }
if @incomplete_files.empty? if @incomplete_files.empty?
@workers.each do |worker|
@event_listeners.each{|l| l.worker_end(worker) }
end
shutdown_all_workers shutdown_all_workers
else else
send_file(worker) send_file(worker)
@ -131,7 +138,7 @@ module Hydra #:nodoc:
attr_reader :report_text attr_reader :report_text
private private
def boot_workers(workers) def boot_workers(workers)
trace "Booting #{workers.size} workers" trace "Booting #{workers.size} workers"
workers.each do |worker| workers.each do |worker|
@ -150,12 +157,13 @@ module Hydra #:nodoc:
def boot_local_worker(worker) def boot_local_worker(worker)
runners = worker.fetch('runners') { raise "You must specify the number of runners" } runners = worker.fetch('runners') { raise "You must specify the number of runners" }
trace "Booting local worker" trace "Booting local worker"
pipe = Hydra::Pipe.new pipe = Hydra::Pipe.new
child = SafeFork.fork do child = SafeFork.fork do
pipe.identify_as_child pipe.identify_as_child
Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose) Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose)
end end
pipe.identify_as_parent pipe.identify_as_parent
@workers << { :pid => child, :io => pipe, :idle => false, :type => :local } @workers << { :pid => child, :io => pipe, :idle => false, :type => :local }
end end
@ -164,11 +172,11 @@ module Hydra #:nodoc:
sync = Sync.new(worker, @sync, @verbose) sync = Sync.new(worker, @sync, @verbose)
runners = worker.fetch('runners') { raise "You must specify the number of runners" } runners = worker.fetch('runners') { raise "You must specify the number of runners" }
command = worker.fetch('command') { command = worker.fetch('command') {
"RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\"" "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\""
} }
trace "Booting SSH worker" trace "Booting SSH worker"
ssh = Hydra::SSH.new("#{sync.ssh_opts} #{sync.connect}", sync.remote_dir, command) ssh = Hydra::SSH.new("#{sync.ssh_opts} #{sync.connect}", sync.remote_dir, command)
return { :io => ssh, :idle => false, :type => :ssh, :connect => sync.connect } return { :io => ssh, :idle => false, :type => :ssh, :connect => sync.connect }
end end
@ -177,7 +185,7 @@ module Hydra #:nodoc:
trace "Shutting down all workers" trace "Shutting down all workers"
@workers.each do |worker| @workers.each do |worker|
worker[:io].write(Shutdown.new) if worker[:io] worker[:io].write(Shutdown.new) if worker[:io]
worker[:io].close if worker[:io] worker[:io].close if worker[:io]
end end
@listeners.each{|t| t.exit} @listeners.each{|t| t.exit}
end end
@ -201,7 +209,7 @@ module Hydra #:nodoc:
# if it exists and its for me. # if it exists and its for me.
# SSH gives us back echoes, so we need to ignore our own messages # SSH gives us back echoes, so we need to ignore our own messages
if message and !message.class.to_s.index("Worker").nil? if message and !message.class.to_s.index("Worker").nil?
message.handle(self, worker) message.handle(self, worker)
end end
rescue IOError rescue IOError
trace "lost Worker [#{worker.inspect}]" trace "lost Worker [#{worker.inspect}]"
@ -210,7 +218,7 @@ module Hydra #:nodoc:
end end
end end
end end
@listeners.each{|l| l.join} @listeners.each{|l| l.join}
@event_listeners.each{|l| l.testing_end} @event_listeners.each{|l| l.testing_end}
end end

View File

@ -8,6 +8,12 @@ module Hydra #:nodoc:
end end
end end
class WorkerBegin < Hydra::Message
def handle(master, worker)
master.worker_begin(worker)
end
end
# Message telling the Runner to run a file # Message telling the Runner to run a file
class RunFile < Hydra::Message class RunFile < Hydra::Message
# The file that should be run # The file that should be run

View File

@ -9,6 +9,8 @@ module Hydra #:nodoc:
class Worker class Worker
include Hydra::Messages::Worker include Hydra::Messages::Worker
traceable('WORKER') traceable('WORKER')
attr_reader :runners
# Create a new worker. # Create a new worker.
# * io: The IO object to use to communicate with the master # * io: The IO object to use to communicate with the master
# * num_runners: The number of runners to launch # * num_runners: The number of runners to launch
@ -19,14 +21,16 @@ module Hydra #:nodoc:
@listeners = [] @listeners = []
boot_runners(opts.fetch(:runners) { 1 }) boot_runners(opts.fetch(:runners) { 1 })
@io.write(Hydra::Messages::Worker::WorkerBegin.new)
process_messages process_messages
@runners.each{|r| Process.wait r[:pid] } @runners.each{|r| Process.wait r[:pid] }
end end
# message handling methods # message handling methods
# When a runner wants a file, it hits this method with a message. # When a runner wants a file, it hits this method with a message.
# Then the worker bubbles the file request up to the master. # Then the worker bubbles the file request up to the master.
def request_file(message, runner) def request_file(message, runner)
@ -99,7 +103,7 @@ module Hydra #:nodoc:
begin begin
message = @io.gets message = @io.gets
if message and !message.class.to_s.index("Master").nil? if message and !message.class.to_s.index("Master").nil?
trace "Received Message from Master" trace "Received Message from Master"
trace "\t#{message.inspect}" trace "\t#{message.inspect}"
message.handle(self) message.handle(self)
else else