diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb index 587d9a2..d9f21d3 100644 --- a/lib/hydra/master.rb +++ b/lib/hydra/master.rb @@ -39,11 +39,11 @@ module Hydra #:nodoc: private def boot_workers(workers) - workers.each do |w| + workers.each do |worker| pipe = Hydra::Pipe.new child = Process.fork do pipe.identify_as_child - Hydra::Worker.new(:io => pipe, :runners => w[:runners]) + Hydra::Worker.new(:io => pipe, :runners => worker[:runners]) end pipe.identify_as_parent @workers << { :pid => child, :io => pipe, :idle => false } @@ -51,20 +51,18 @@ module Hydra #:nodoc: end def process_messages - @running = true - Thread.abort_on_exception = true - @workers.each do |w| + @workers.each do |worker| @listeners << Thread.new do - while @running + while true begin - message = w[:io].gets - message.handle(self, w) if message + message = worker[:io].gets + message.handle(self, worker) if message rescue IOError => ex - $stderr.write "Master lost Worker [#{w.inspect}]\n" - w[:io].close - @workers.delete(w) + $stderr.write "Master lost Worker [#{worker.inspect}]\n" + worker[:io].close + @workers.delete(worker) Thread.exit end end diff --git a/lib/hydra/messaging_io.rb b/lib/hydra/messaging_io.rb index 551826d..95e10b9 100644 --- a/lib/hydra/messaging_io.rb +++ b/lib/hydra/messaging_io.rb @@ -20,11 +20,9 @@ module Hydra #:nodoc: def write(message) raise IOError unless @writer raise UnprocessableMessage unless message.is_a?(Hydra::Message) - begin - @writer.write(message.serialize+"\n") - rescue Errno::EPIPE - raise IOError - end + @writer.write(message.serialize+"\n") + rescue Errno::EPIPE + raise IOError end # Closes the IO object. diff --git a/lib/hydra/worker.rb b/lib/hydra/worker.rb index f60864c..aabfe7b 100644 --- a/lib/hydra/worker.rb +++ b/lib/hydra/worker.rb @@ -36,9 +36,9 @@ module Hydra #:nodoc: # When the master sends a file down to the worker, it hits this # method. Then the worker delegates the file down to a runner. def delegate_file(message) - r = idle_runner - r[:idle] = false - r[:io].write(RunFile.new(eval(message.serialize))) + runner = idle_runner + runner[:idle] = false + runner[:io].write(RunFile.new(eval(message.serialize))) end # When a runner finishes, it sends the results up to the worker. Then the @@ -82,12 +82,19 @@ module Hydra #:nodoc: $stdout.write "WORKER| Processing Messages\n" if @verbose @running = true - # Abort the worker if one of the runners has an exception # TODO: catch this exception, return a dying message to the master # then shutdown Thread.abort_on_exception = true - # Worker listens and handles messages + process_messages_from_master + process_messages_from_runners + + @listeners.each{|l| l.join } + @io.close + $stdout.write "WORKER| Done processing messages\n" if @verbose + end + + def process_messages_from_master @listeners << Thread.new do while @running begin @@ -105,9 +112,9 @@ module Hydra #:nodoc: end end end + end - # Runners listen, but when they handle they pass themselves - # so we can reference them when we deal with their messages + def process_messages_from_runners @runners.each do |r| @listeners << Thread.new do while @running @@ -125,16 +132,13 @@ module Hydra #:nodoc: end end end - @listeners.each{|l| l.join } - @io.close - $stdout.write "WORKER| Done processing messages\n" if @verbose end # Get the next idle runner def idle_runner #:nodoc: idle_r = nil while idle_r.nil? - idle_r = @runners.detect{|r| r[:idle]} + idle_r = @runners.detect{|runner| runner[:idle]} sleep(1) end return idle_r