added documentation for the worker

This commit is contained in:
Nick Gauthier 2010-01-28 16:32:06 -05:00
parent a29fb25fa7
commit 90b04216aa
4 changed files with 69 additions and 34 deletions

View File

@ -27,7 +27,7 @@ module Hydra #:nodoc:
super(:output => @output, :file => @file) super(:output => @output, :file => @file)
end end
def handle(worker, runner) #:nodoc: def handle(worker, runner) #:nodoc:
worker.relay_results(self) worker.relay_results(self, runner)
end end
end end

View File

@ -1,4 +1,4 @@
module Hydra #:nodoc: module Hydra #:nodoc:
module Messages #:nodoc: module Messages #:nodoc:
module Worker #:nodoc: module Worker #:nodoc:
# Message indicating that a work needs a file to delegate to a runner # Message indicating that a work needs a file to delegate to a runner
@ -16,6 +16,7 @@ module Hydra #:nodoc:
class Results < Hydra::Messages::Runner::Results class Results < Hydra::Messages::Runner::Results
end end
# Message telling the worker to shut down.
class Shutdown < Hydra::Messages::Runner::Shutdown class Shutdown < Hydra::Messages::Runner::Shutdown
def handle(worker) def handle(worker)
worker.shutdown worker.shutdown

View File

@ -1,5 +1,9 @@
module Hydra #:nodoc: module Hydra #:nodoc:
# Hydra class responsible to dispatching runners and communicating with the master.
class Worker class Worker
# Create a new worker.
# * io: The IO object to use to communicate with the master
# * num_runners: The number of runners to launch
def initialize(io, num_runners) def initialize(io, num_runners)
@io = io @io = io
@runners = [] @runners = []
@ -9,7 +13,47 @@ module Hydra #:nodoc:
@runners.each{|r| Process.wait r[:pid] } @runners.each{|r| Process.wait r[:pid] }
end end
def boot_runners(num_runners)
# message handling methods
# When a runner wants a file, it hits this method with a message.
# Then the worker bubbles the file request up to the master.
def request_file(message, runner)
@io.write(Hydra::Messages::Worker::RequestFile.new)
runner[:idle] = true
end
# When the master sends a file down to the worker, it hits this
# method. Then the worker delegates the file down to a runner.
def delegate_file(message)
r = idle_runner
r[:idle] = false
r[:io].write(Hydra::Messages::Runner::RunFile.new(eval(message.serialize)))
end
# When a runner finishes, it sends the results up to the worker. Then the
# worker sends the results up to the master.
# TODO: when we relay results, it should trigger a RunFile or Shutdown from
# the master implicitly
def relay_results(message, runner)
runner[:idle] = true
@io.write(Hydra::Messages::Worker::Results.new(eval(message.serialize)))
end
# When a master issues a shutdown order, it hits this method, which causes
# the worker to send shutdown messages to its runners.
# TODO: implement a ShutdownComplete message, so that we can kill the
# processes if necessary.
def shutdown
@running = false
@runners.each do |r|
r[:io].write(Hydra::Messages::Runner::Shutdown.new)
end
end
private
def boot_runners(num_runners) #:nodoc:
num_runners.times do num_runners.times do
pipe = Hydra::Pipe.new pipe = Hydra::Pipe.new
child = Process.fork do child = Process.fork do
@ -21,9 +65,13 @@ module Hydra #:nodoc:
end end
end end
def process_messages # Continuously process messages
def process_messages #:nodoc:
@running = true @running = true
# Abort the worker if one of the runners has an exception
# TODO: catch this exception, return a dying message to the master
# then shutdown
Thread.abort_on_exception = true Thread.abort_on_exception = true
# Worker listens and handles messages # Worker listens and handles messages
@ -39,15 +87,22 @@ module Hydra #:nodoc:
@runners.each do |r| @runners.each do |r|
@listeners << Thread.new do @listeners << Thread.new do
while @running while @running
message = r[:io].gets begin
message.handle(self, r) if message message = r[:io].gets
message.handle(self, r) if message
rescue IOError => ex
# If the other end of the pipe closes
# we will continue, because we're probably
# not @running anymore
end
end end
end end
end end
@listeners.each{|l| l.join } @listeners.each{|l| l.join }
end end
def idle_runner # Get the next idle runner
def idle_runner #:nodoc:
idle_r = nil idle_r = nil
while idle_r.nil? while idle_r.nil?
idle_r = @runners.detect{|r| r[:idle]} idle_r = @runners.detect{|r| r[:idle]}
@ -55,29 +110,5 @@ module Hydra #:nodoc:
end end
return idle_r return idle_r
end end
# message handling methods
def request_file(message, runner)
@io.write(Hydra::Messages::Worker::RequestFile.new)
runner[:idle] = true
end
def delegate_file(message)
r = idle_runner
r[:idle] = false
r[:io].write(Hydra::Messages::Runner::RunFile.new(eval(message.serialize)))
end
def relay_results(message)
@io.write(Hydra::Messages::Worker::Results.new(eval(message.serialize)))
end
def shutdown
@running = false
@runners.each do |r|
r[:io].write(Hydra::Messages::Runner::Shutdown.new)
end
end
end end
end end

View File

@ -14,14 +14,17 @@ class WorkerTest < Test::Unit::TestCase
end end
should "run a test" do should "run a test" do
num_runners = 4
@pipe = Hydra::Pipe.new @pipe = Hydra::Pipe.new
@child = Process.fork do @child = Process.fork do
@pipe.identify_as_child @pipe.identify_as_child
Hydra::Worker.new(@pipe, 1) Hydra::Worker.new(@pipe, num_runners)
@pipe.close @pipe.close
end end
@pipe.identify_as_parent @pipe.identify_as_parent
assert @pipe.gets.is_a?(Hydra::Messages::Worker::RequestFile) num_runners.times do
assert @pipe.gets.is_a?(Hydra::Messages::Worker::RequestFile)
end
@pipe.write(Hydra::Messages::Worker::RunFile.new(:file => TESTFILE)) @pipe.write(Hydra::Messages::Worker::RunFile.new(:file => TESTFILE))
response = @pipe.gets response = @pipe.gets
@ -32,8 +35,8 @@ class WorkerTest < Test::Unit::TestCase
assert File.exists?(TARGET) assert File.exists?(TARGET)
assert_equal "HYDRA", File.read(TARGET) assert_equal "HYDRA", File.read(TARGET)
@pipe.close
Process.wait(@child) Process.wait(@child)
@pipe.close
end end
end end
end end