From c8091718eb5e7f8c39bae756481db7d0585864c9 Mon Sep 17 00:00:00 2001 From: Lee Bankewitz and Luke Melia Date: Tue, 8 Jun 2010 11:02:57 -0400 Subject: [PATCH] introduce custom listeners for worker_begin/worker_end Conflicts: lib/hydra/listener/abstract.rb lib/hydra/message/worker_messages.rb --- lib/hydra/listener/abstract.rb | 13 +++++++++-- lib/hydra/master.rb | 34 +++++++++++++++++----------- lib/hydra/message/worker_messages.rb | 6 +++++ lib/hydra/worker.rb | 10 +++++--- 4 files changed, 45 insertions(+), 18 deletions(-) diff --git a/lib/hydra/listener/abstract.rb b/lib/hydra/listener/abstract.rb index 9fc87ed..9d81ffa 100644 --- a/lib/hydra/listener/abstract.rb +++ b/lib/hydra/listener/abstract.rb @@ -2,7 +2,7 @@ module Hydra #:nodoc: module Listener #:nodoc: # Abstract listener that implements all the events # but does nothing. - class Abstract + class Abstract # Create a new listener. # # Output: The IO object for outputting any information. @@ -10,14 +10,23 @@ module Hydra #:nodoc: def initialize(output = $stdout) @output = output end + # Fired when testing has started def testing_begin(files) end - # Fired when testing finishes + # Fired when testing finishes, after the workers shutdown def testing_end end + # Fired after runner processes have been started + def worker_begin(worker) + end + + # Fired before shutting down the worker + def worker_end(worker) + end + # Fired when a file is started def file_begin(file) end diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb index e1858de..837d78f 100644 --- a/lib/hydra/master.rb +++ b/lib/hydra/master.rb @@ -9,7 +9,7 @@ module Hydra #:nodoc: # # The Master is run once for any given testing session. class YmlLoadError < StandardError; end - + class Master include Hydra::Messages::Master include Open3 @@ -37,7 +37,7 @@ module Hydra #:nodoc: puts "Testing halted by user. Untested files:" puts @incomplete_files.join("\n") exit - end + end opts.stringify_keys! config_file = opts.delete('config') { nil } @@ -54,7 +54,7 @@ module Hydra #:nodoc: rescue StandardError => e raise(YmlLoadError,"config file was found, but could not be parsed.\n#{$!.inspect}") end - + opts.merge!(config_yml.stringify_keys!) end @files = Array(opts.fetch('files') { nil }) @@ -74,7 +74,7 @@ module Hydra #:nodoc: @environment = opts.fetch('environment') { 'test' } if @autosort - sort_files_from_report + sort_files_from_report @event_listeners << Hydra::Listener::ReportGenerator.new(File.new(heuristic_file, 'w')) end @@ -93,8 +93,11 @@ module Hydra #:nodoc: end # Message handling - - # Send a file down to a worker. + def worker_begin(worker) + @event_listeners.each {|l| l.worker_begin(worker) } + end + + # Send a file down to a worker. def send_file(worker) f = @files.shift if f @@ -120,6 +123,10 @@ module Hydra #:nodoc: trace "#{@incomplete_files.size} Files Remaining" @event_listeners.each{|l| l.file_end(message.file, message.output) } if @incomplete_files.empty? + @workers.each do |worker| + @event_listeners.each{|l| l.worker_end(worker) } + end + shutdown_all_workers else send_file(worker) @@ -131,7 +138,7 @@ module Hydra #:nodoc: attr_reader :report_text private - + def boot_workers(workers) trace "Booting #{workers.size} workers" workers.each do |worker| @@ -150,12 +157,13 @@ module Hydra #:nodoc: def boot_local_worker(worker) runners = worker.fetch('runners') { raise "You must specify the number of runners" } - trace "Booting local worker" + trace "Booting local worker" pipe = Hydra::Pipe.new child = SafeFork.fork do pipe.identify_as_child Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose) end + pipe.identify_as_parent @workers << { :pid => child, :io => pipe, :idle => false, :type => :local } end @@ -164,11 +172,11 @@ module Hydra #:nodoc: sync = Sync.new(worker, @sync, @verbose) runners = worker.fetch('runners') { raise "You must specify the number of runners" } - command = worker.fetch('command') { + command = worker.fetch('command') { "RAILS_ENV=#{@environment} ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\"" } - trace "Booting SSH worker" + trace "Booting SSH worker" ssh = Hydra::SSH.new("#{sync.ssh_opts} #{sync.connect}", sync.remote_dir, command) return { :io => ssh, :idle => false, :type => :ssh, :connect => sync.connect } end @@ -177,7 +185,7 @@ module Hydra #:nodoc: trace "Shutting down all workers" @workers.each do |worker| worker[:io].write(Shutdown.new) if worker[:io] - worker[:io].close if worker[:io] + worker[:io].close if worker[:io] end @listeners.each{|t| t.exit} end @@ -201,7 +209,7 @@ module Hydra #:nodoc: # if it exists and its for me. # SSH gives us back echoes, so we need to ignore our own messages if message and !message.class.to_s.index("Worker").nil? - message.handle(self, worker) + message.handle(self, worker) end rescue IOError trace "lost Worker [#{worker.inspect}]" @@ -210,7 +218,7 @@ module Hydra #:nodoc: end end end - + @listeners.each{|l| l.join} @event_listeners.each{|l| l.testing_end} end diff --git a/lib/hydra/message/worker_messages.rb b/lib/hydra/message/worker_messages.rb index c786b67..fd89a4f 100644 --- a/lib/hydra/message/worker_messages.rb +++ b/lib/hydra/message/worker_messages.rb @@ -8,6 +8,12 @@ module Hydra #:nodoc: end end + class WorkerBegin < Hydra::Message + def handle(master, worker) + master.worker_begin(worker) + end + end + # Message telling the Runner to run a file class RunFile < Hydra::Message # The file that should be run diff --git a/lib/hydra/worker.rb b/lib/hydra/worker.rb index 94448c3..b4f7b47 100644 --- a/lib/hydra/worker.rb +++ b/lib/hydra/worker.rb @@ -9,6 +9,8 @@ module Hydra #:nodoc: class Worker include Hydra::Messages::Worker traceable('WORKER') + + attr_reader :runners # Create a new worker. # * io: The IO object to use to communicate with the master # * num_runners: The number of runners to launch @@ -19,14 +21,16 @@ module Hydra #:nodoc: @listeners = [] boot_runners(opts.fetch(:runners) { 1 }) + @io.write(Hydra::Messages::Worker::WorkerBegin.new) + process_messages - + @runners.each{|r| Process.wait r[:pid] } end # message handling methods - + # When a runner wants a file, it hits this method with a message. # Then the worker bubbles the file request up to the master. def request_file(message, runner) @@ -99,7 +103,7 @@ module Hydra #:nodoc: begin message = @io.gets if message and !message.class.to_s.index("Master").nil? - trace "Received Message from Master" + trace "Received Message from Master" trace "\t#{message.inspect}" message.handle(self) else