From 2bee1dde2d34810329d9df92578a525f3a2ef872 Mon Sep 17 00:00:00 2001 From: Nick Gauthier Date: Thu, 4 Feb 2010 10:33:21 -0500 Subject: [PATCH] YAML config loading. SSH worker termination fixes. --- lib/hydra/hash.rb | 14 +++++ lib/hydra/master.rb | 87 +++++++++++++++++++--------- lib/hydra/message/worker_messages.rb | 3 +- lib/hydra/worker.rb | 1 + test/fixtures/config.yml | 4 ++ test/master_test.rb | 13 ++++- 6 files changed, 89 insertions(+), 33 deletions(-) create mode 100644 lib/hydra/hash.rb create mode 100644 test/fixtures/config.yml diff --git a/lib/hydra/hash.rb b/lib/hydra/hash.rb new file mode 100644 index 0000000..0358141 --- /dev/null +++ b/lib/hydra/hash.rb @@ -0,0 +1,14 @@ +class Hash + def stringify_keys + inject({}) do |options, (key, value)| + options[key.to_s] = value + options + end + end + def stringify_keys! + keys.each do |key| + self[key.to_s] = delete(key) + end + self + end +end diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb index aa6d221..9fb68e2 100644 --- a/lib/hydra/master.rb +++ b/lib/hydra/master.rb @@ -1,3 +1,4 @@ +require 'hydra/hash' module Hydra #:nodoc: # Hydra class responsible for delegate work down to workers. # @@ -15,16 +16,23 @@ module Hydra #:nodoc: # * An array of hashes. Each hash should be the configuration options # for a worker. def initialize(opts = { }) - @files = opts.fetch(:files) { [] } + opts.stringify_keys! + config_file = opts.delete('config') { nil } + if config_file + opts.merge!(YAML.load_file(config_file).stringify_keys!) + end + @files = opts.fetch('files') { [] } + @incomplete_files = @files.dup @workers = [] @listeners = [] - @verbose = opts.fetch(:verbose) { false } + @verbose = opts.fetch('verbose') { false } # default is one worker that is configured to use a pipe with one runner - worker_cfg = opts.fetch(:workers) { - [ { :type => :local, :runners => 1} ] - } + worker_cfg = opts.fetch('workers') { [ { 'type' => 'local', 'runners' => 1} ] } $stdout.write "MASTER| Initialized\n" if @verbose + $stdout.write "MASTER| Files: (#{@files.inspect})\n" if @verbose + $stdout.write "MASTER| Workers: (#{worker_cfg.inspect})\n" if @verbose + $stdout.write "MASTER| Verbose: (#{@verbose.inspect})\n" if @verbose boot_workers worker_cfg process_messages @@ -36,43 +44,57 @@ module Hydra #:nodoc: # worker down. def send_file(worker) f = @files.pop - if f - worker[:io].write(RunFile.new(:file => f)) + worker[:io].write(RunFile.new(:file => f)) if f + end + + # Process the results coming back from the worker. + def process_results(worker, message) + $stdout.write message.output + # only delete one + @incomplete_files.delete_at(@incomplete_files.index(message.file)) + $stdout.write "MASTER| #{@incomplete_files.size} Files Remaining\n" if @verbose + if @incomplete_files.empty? + shutdown_all_workers else - worker[:io].write(Shutdown.new) - Thread.exit + send_file(worker) end end private def boot_workers(workers) - $stdout.write "MASTER| Booting workers\n" if @verbose - workers.select{|worker| worker[:type] == :local}.each do |worker| - boot_local_worker(worker) - end - workers.select{|worker| worker[:type] == :ssh}.each do |worker| - @workers << worker # will boot later, during the listening phase + $stdout.write "MASTER| Booting #{workers.size} workers\n" if @verbose + workers.each do |worker| + worker.stringify_keys! + $stdout.write "MASTER| worker opts #{worker.inspect}\n" if @verbose + type = worker.fetch('type') { 'local' } + if type.to_s == 'local' + boot_local_worker(worker) + elsif type.to_s == 'ssh' + @workers << worker # will boot later, during the listening phase + else + raise "Worker type not recognized: (#{type.to_s})" + end end end def boot_local_worker(worker) - runners = worker.fetch(:runners) { raise "You must specify the number of runners" } + runners = worker.fetch('runners') { raise "You must specify the number of runners" } $stdout.write "MASTER| Booting local worker\n" if @verbose pipe = Hydra::Pipe.new child = Process.fork do pipe.identify_as_child - Hydra::Worker.new(:io => pipe, :runners => runners) + Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose) end pipe.identify_as_parent @workers << { :pid => child, :io => pipe, :idle => false, :type => :local } end def boot_ssh_worker(worker) - runners = worker.fetch(:runners) { raise "You must specify the number of runners" } - connect = worker.fetch(:connect) { raise "You must specify SSH connection options" } - directory = worker.fetch(:directory) { raise "You must specify a remote directory" } - command = worker.fetch(:command) { + runners = worker.fetch('runners') { raise "You must specify the number of runners" } + connect = worker.fetch('connect') { raise "You must specify SSH connection options" } + directory = worker.fetch('directory') { raise "You must specify a remote directory" } + command = worker.fetch('command') { "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\"" } @@ -81,25 +103,34 @@ module Hydra #:nodoc: return { :io => ssh, :idle => false, :type => :ssh } end + def shutdown_all_workers + $stdout.write "MASTER| Shutting down all workers\n" if @verbose + @workers.each do |worker| + worker[:io].write(Shutdown.new) if worker[:io] + worker[:io].close if worker[:io] + end + @listeners.each{|t| t.exit} + end + def process_messages Thread.abort_on_exception = true $stdout.write "MASTER| Processing Messages\n" if @verbose - $stdout.write "MASTER| Workers: #{@workers}\n" if @verbose + $stdout.write "MASTER| Workers: #{@workers.inspect}\n" if @verbose @workers.each do |worker| @listeners << Thread.new do $stdout.write "MASTER| Listening to #{worker.inspect}\n" if @verbose - worker = boot_ssh_worker(worker) if worker.fetch(:type){ :local } == :ssh + if worker.fetch('type') { 'local' }.to_s == 'ssh' + worker = boot_ssh_worker(worker) + @workers << worker + end while true begin - $stdout.write "MASTER| listen....\n" if @verbose message = worker[:io].gets $stdout.write "MASTER| got message: #{message}\n" if @verbose message.handle(self, worker) if message - rescue IOError => ex - $stderr.write "Master lost Worker [#{worker.inspect}]\n" - worker[:io].close - @workers.delete(worker) + rescue IOError + $stderr.write "MASTER| lost Worker [#{worker.inspect}]\n" if @verbose Thread.exit end end diff --git a/lib/hydra/message/worker_messages.rb b/lib/hydra/message/worker_messages.rb index a687ba2..c786b67 100644 --- a/lib/hydra/message/worker_messages.rb +++ b/lib/hydra/message/worker_messages.rb @@ -30,8 +30,7 @@ module Hydra #:nodoc: # Message relaying the results of a worker up to the master class Results < Hydra::Messages::Runner::Results def handle(master, worker) #:nodoc: - $stdout.write output - master.send_file(worker) + master.process_results(worker, self) end end diff --git a/lib/hydra/worker.rb b/lib/hydra/worker.rb index 63d39ea..5b2fa66 100644 --- a/lib/hydra/worker.rb +++ b/lib/hydra/worker.rb @@ -102,6 +102,7 @@ module Hydra #:nodoc: $stdout.write " | #{message.inspect}\n" if @verbose message.handle(self) else + $stdout.write "WORKER| Nothing from Master, Pinging\n" if @verbose @io.write Ping.new end rescue IOError => ex diff --git a/test/fixtures/config.yml b/test/fixtures/config.yml new file mode 100644 index 0000000..817c3bb --- /dev/null +++ b/test/fixtures/config.yml @@ -0,0 +1,4 @@ +--- + workers: + - type: local + runners: 2 diff --git a/test/master_test.rb b/test/master_test.rb index cacb11c..9544735 100644 --- a/test/master_test.rb +++ b/test/master_test.rb @@ -21,9 +21,7 @@ class MasterTest < Test::Unit::TestCase should "run a test 6 times on 1 worker with 2 runners" do Hydra::Master.new( :files => [test_file]*6, - :local => { - :runners => 2 - } + :workers => [ { :type => :local, :runners => 2 } ] ) assert File.exists?(target_file) assert_equal "HYDRA"*6, File.read(target_file) @@ -73,5 +71,14 @@ class MasterTest < Test::Unit::TestCase assert File.exists?(target_file) assert_equal "HYDRA", File.read(target_file) end + + should "run a test with config from a yaml file" do + Hydra::Master.new( + :files => [test_file], + :config => File.join(File.dirname(__FILE__), 'fixtures', 'config.yml') + ) + assert File.exists?(target_file) + assert_equal "HYDRA", File.read(target_file) + end end end