diff --git a/lib/hydra/message/runner_messages.rb b/lib/hydra/message/runner_messages.rb index ee91a0d..389d459 100644 --- a/lib/hydra/message/runner_messages.rb +++ b/lib/hydra/message/runner_messages.rb @@ -27,7 +27,7 @@ module Hydra #:nodoc: super(:output => @output, :file => @file) end def handle(worker, runner) #:nodoc: - worker.relay_results(self) + worker.relay_results(self, runner) end end diff --git a/lib/hydra/message/worker_messages.rb b/lib/hydra/message/worker_messages.rb index 580f9ca..1ebafc5 100644 --- a/lib/hydra/message/worker_messages.rb +++ b/lib/hydra/message/worker_messages.rb @@ -1,4 +1,4 @@ -module Hydra #:nodoc: +module Hydra #:nodoc: module Messages #:nodoc: module Worker #:nodoc: # Message indicating that a work needs a file to delegate to a runner @@ -16,6 +16,7 @@ module Hydra #:nodoc: class Results < Hydra::Messages::Runner::Results end + # Message telling the worker to shut down. class Shutdown < Hydra::Messages::Runner::Shutdown def handle(worker) worker.shutdown diff --git a/lib/hydra/worker.rb b/lib/hydra/worker.rb index 850360a..f9abea8 100644 --- a/lib/hydra/worker.rb +++ b/lib/hydra/worker.rb @@ -1,5 +1,9 @@ module Hydra #:nodoc: + # Hydra class responsible to dispatching runners and communicating with the master. class Worker + # Create a new worker. + # * io: The IO object to use to communicate with the master + # * num_runners: The number of runners to launch def initialize(io, num_runners) @io = io @runners = [] @@ -9,7 +13,47 @@ module Hydra #:nodoc: @runners.each{|r| Process.wait r[:pid] } end - def boot_runners(num_runners) + + # message handling methods + + # When a runner wants a file, it hits this method with a message. + # Then the worker bubbles the file request up to the master. + def request_file(message, runner) + @io.write(Hydra::Messages::Worker::RequestFile.new) + runner[:idle] = true + end + + # When the master sends a file down to the worker, it hits this + # method. Then the worker delegates the file down to a runner. + def delegate_file(message) + r = idle_runner + r[:idle] = false + r[:io].write(Hydra::Messages::Runner::RunFile.new(eval(message.serialize))) + end + + # When a runner finishes, it sends the results up to the worker. Then the + # worker sends the results up to the master. + # TODO: when we relay results, it should trigger a RunFile or Shutdown from + # the master implicitly + def relay_results(message, runner) + runner[:idle] = true + @io.write(Hydra::Messages::Worker::Results.new(eval(message.serialize))) + end + + # When a master issues a shutdown order, it hits this method, which causes + # the worker to send shutdown messages to its runners. + # TODO: implement a ShutdownComplete message, so that we can kill the + # processes if necessary. + def shutdown + @running = false + @runners.each do |r| + r[:io].write(Hydra::Messages::Runner::Shutdown.new) + end + end + + private + + def boot_runners(num_runners) #:nodoc: num_runners.times do pipe = Hydra::Pipe.new child = Process.fork do @@ -21,9 +65,13 @@ module Hydra #:nodoc: end end - def process_messages + # Continuously process messages + def process_messages #:nodoc: @running = true + # Abort the worker if one of the runners has an exception + # TODO: catch this exception, return a dying message to the master + # then shutdown Thread.abort_on_exception = true # Worker listens and handles messages @@ -39,15 +87,22 @@ module Hydra #:nodoc: @runners.each do |r| @listeners << Thread.new do while @running - message = r[:io].gets - message.handle(self, r) if message + begin + message = r[:io].gets + message.handle(self, r) if message + rescue IOError => ex + # If the other end of the pipe closes + # we will continue, because we're probably + # not @running anymore + end end end end @listeners.each{|l| l.join } end - def idle_runner + # Get the next idle runner + def idle_runner #:nodoc: idle_r = nil while idle_r.nil? idle_r = @runners.detect{|r| r[:idle]} @@ -55,29 +110,5 @@ module Hydra #:nodoc: end return idle_r end - - # message handling methods - - def request_file(message, runner) - @io.write(Hydra::Messages::Worker::RequestFile.new) - runner[:idle] = true - end - - def delegate_file(message) - r = idle_runner - r[:idle] = false - r[:io].write(Hydra::Messages::Runner::RunFile.new(eval(message.serialize))) - end - - def relay_results(message) - @io.write(Hydra::Messages::Worker::Results.new(eval(message.serialize))) - end - - def shutdown - @running = false - @runners.each do |r| - r[:io].write(Hydra::Messages::Runner::Shutdown.new) - end - end end end diff --git a/test/worker_test.rb b/test/worker_test.rb index 80e2dcd..242c30c 100644 --- a/test/worker_test.rb +++ b/test/worker_test.rb @@ -14,14 +14,17 @@ class WorkerTest < Test::Unit::TestCase end should "run a test" do + num_runners = 4 @pipe = Hydra::Pipe.new @child = Process.fork do @pipe.identify_as_child - Hydra::Worker.new(@pipe, 1) + Hydra::Worker.new(@pipe, num_runners) @pipe.close end @pipe.identify_as_parent - assert @pipe.gets.is_a?(Hydra::Messages::Worker::RequestFile) + num_runners.times do + assert @pipe.gets.is_a?(Hydra::Messages::Worker::RequestFile) + end @pipe.write(Hydra::Messages::Worker::RunFile.new(:file => TESTFILE)) response = @pipe.gets @@ -32,8 +35,8 @@ class WorkerTest < Test::Unit::TestCase assert File.exists?(TARGET) assert_equal "HYDRA", File.read(TARGET) - @pipe.close Process.wait(@child) + @pipe.close end end end