worker can now create runners and delegate work to them

This commit is contained in:
Nick Gauthier 2010-01-28 16:15:41 -05:00
parent 7f1aff1967
commit a29fb25fa7
6 changed files with 157 additions and 0 deletions

View File

@ -2,4 +2,5 @@ require 'hydra/pipe'
require 'hydra/ssh'
require 'hydra/message'
require 'hydra/runner'
require 'hydra/worker'

View File

@ -19,6 +19,7 @@ module Hydra #:nodoc:
# are attributes of the message and the values are
# set to the attribute.
def initialize(opts = {})
opts.delete :class
opts.each do |variable,value|
self.send("#{variable}=",value)
end
@ -41,4 +42,5 @@ module Hydra #:nodoc:
end
require 'hydra/message/runner_messages'
require 'hydra/message/worker_messages'

View File

@ -3,6 +3,9 @@ module Hydra #:nodoc:
module Runner #:nodoc:
# Message indicating that a Runner needs a file to run
class RequestFile < Hydra::Message
def handle(worker, runner) #:nodoc:
worker.request_file(self, runner)
end
end
# Message telling the Runner to run a file
@ -23,6 +26,9 @@ module Hydra #:nodoc:
def serialize #:nodoc:
super(:output => @output, :file => @file)
end
def handle(worker, runner) #:nodoc:
worker.relay_results(self)
end
end
# Message to tell the Runner to shut down

View File

@ -0,0 +1,26 @@
module Hydra #:nodoc:
module Messages #:nodoc:
module Worker #:nodoc:
# Message indicating that a work needs a file to delegate to a runner
class RequestFile < Hydra::Message
end
# Message telling a worker to delegate a file to a runner
class RunFile < Hydra::Messages::Runner::RunFile
def handle(worker)
worker.delegate_file(self)
end
end
# Message relaying the results of a worker up to the master
class Results < Hydra::Messages::Runner::Results
end
class Shutdown < Hydra::Messages::Runner::Shutdown
def handle(worker)
worker.shutdown
end
end
end
end
end

83
lib/hydra/worker.rb Normal file
View File

@ -0,0 +1,83 @@
module Hydra #:nodoc:
class Worker
def initialize(io, num_runners)
@io = io
@runners = []
@listeners = []
boot_runners(num_runners)
process_messages
@runners.each{|r| Process.wait r[:pid] }
end
def boot_runners(num_runners)
num_runners.times do
pipe = Hydra::Pipe.new
child = Process.fork do
pipe.identify_as_child
Hydra::Runner.new(pipe)
end
pipe.identify_as_parent
@runners << { :pid => child, :io => pipe, :idle => false }
end
end
def process_messages
@running = true
Thread.abort_on_exception = true
# Worker listens and handles messages
@listeners << Thread.new do
while @running
message = @io.gets
message.handle(self) if message
end
end
# Runners listen, but when they handle they pass themselves
# so we can reference them when we deal with their messages
@runners.each do |r|
@listeners << Thread.new do
while @running
message = r[:io].gets
message.handle(self, r) if message
end
end
end
@listeners.each{|l| l.join }
end
def idle_runner
idle_r = nil
while idle_r.nil?
idle_r = @runners.detect{|r| r[:idle]}
sleep(1)
end
return idle_r
end
# message handling methods
def request_file(message, runner)
@io.write(Hydra::Messages::Worker::RequestFile.new)
runner[:idle] = true
end
def delegate_file(message)
r = idle_runner
r[:idle] = false
r[:io].write(Hydra::Messages::Runner::RunFile.new(eval(message.serialize)))
end
def relay_results(message)
@io.write(Hydra::Messages::Worker::Results.new(eval(message.serialize)))
end
def shutdown
@running = false
@runners.each do |r|
r[:io].write(Hydra::Messages::Runner::Shutdown.new)
end
end
end
end

39
test/worker_test.rb Normal file
View File

@ -0,0 +1,39 @@
require File.join(File.dirname(__FILE__), 'test_helper')
TARGET = File.join(Dir.tmpdir, 'hydra_test.txt')
TESTFILE = File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb')
class WorkerTest < Test::Unit::TestCase
context "with a file to test and a destination to verify" do
setup do
FileUtils.rm_f(TARGET)
end
teardown do
FileUtils.rm_f(TARGET)
end
should "run a test" do
@pipe = Hydra::Pipe.new
@child = Process.fork do
@pipe.identify_as_child
Hydra::Worker.new(@pipe, 1)
@pipe.close
end
@pipe.identify_as_parent
assert @pipe.gets.is_a?(Hydra::Messages::Worker::RequestFile)
@pipe.write(Hydra::Messages::Worker::RunFile.new(:file => TESTFILE))
response = @pipe.gets
assert response.is_a?(Hydra::Messages::Worker::Results)
@pipe.write(Hydra::Messages::Worker::Shutdown.new)
assert File.exists?(TARGET)
assert_equal "HYDRA", File.read(TARGET)
@pipe.close
Process.wait(@child)
end
end
end