diff --git a/lib/hydra.rb b/lib/hydra.rb index b2c301d..26c9b78 100644 --- a/lib/hydra.rb +++ b/lib/hydra.rb @@ -2,4 +2,5 @@ require 'hydra/pipe' require 'hydra/ssh' require 'hydra/message' require 'hydra/runner' +require 'hydra/worker' diff --git a/lib/hydra/message.rb b/lib/hydra/message.rb index 9c0d2d4..98c727a 100644 --- a/lib/hydra/message.rb +++ b/lib/hydra/message.rb @@ -19,6 +19,7 @@ module Hydra #:nodoc: # are attributes of the message and the values are # set to the attribute. def initialize(opts = {}) + opts.delete :class opts.each do |variable,value| self.send("#{variable}=",value) end @@ -41,4 +42,5 @@ module Hydra #:nodoc: end require 'hydra/message/runner_messages' +require 'hydra/message/worker_messages' diff --git a/lib/hydra/message/runner_messages.rb b/lib/hydra/message/runner_messages.rb index 5ac7a06..ee91a0d 100644 --- a/lib/hydra/message/runner_messages.rb +++ b/lib/hydra/message/runner_messages.rb @@ -3,6 +3,9 @@ module Hydra #:nodoc: module Runner #:nodoc: # Message indicating that a Runner needs a file to run class RequestFile < Hydra::Message + def handle(worker, runner) #:nodoc: + worker.request_file(self, runner) + end end # Message telling the Runner to run a file @@ -23,6 +26,9 @@ module Hydra #:nodoc: def serialize #:nodoc: super(:output => @output, :file => @file) end + def handle(worker, runner) #:nodoc: + worker.relay_results(self) + end end # Message to tell the Runner to shut down diff --git a/lib/hydra/message/worker_messages.rb b/lib/hydra/message/worker_messages.rb new file mode 100644 index 0000000..580f9ca --- /dev/null +++ b/lib/hydra/message/worker_messages.rb @@ -0,0 +1,26 @@ +module Hydra #:nodoc: + module Messages #:nodoc: + module Worker #:nodoc: + # Message indicating that a work needs a file to delegate to a runner + class RequestFile < Hydra::Message + end + + # Message telling a worker to delegate a file to a runner + class RunFile < Hydra::Messages::Runner::RunFile + def handle(worker) + worker.delegate_file(self) + end + end + + # Message relaying the results of a worker up to the master + class Results < Hydra::Messages::Runner::Results + end + + class Shutdown < Hydra::Messages::Runner::Shutdown + def handle(worker) + worker.shutdown + end + end + end + end +end diff --git a/lib/hydra/worker.rb b/lib/hydra/worker.rb new file mode 100644 index 0000000..850360a --- /dev/null +++ b/lib/hydra/worker.rb @@ -0,0 +1,83 @@ +module Hydra #:nodoc: + class Worker + def initialize(io, num_runners) + @io = io + @runners = [] + @listeners = [] + boot_runners(num_runners) + process_messages + @runners.each{|r| Process.wait r[:pid] } + end + + def boot_runners(num_runners) + num_runners.times do + pipe = Hydra::Pipe.new + child = Process.fork do + pipe.identify_as_child + Hydra::Runner.new(pipe) + end + pipe.identify_as_parent + @runners << { :pid => child, :io => pipe, :idle => false } + end + end + + def process_messages + @running = true + + Thread.abort_on_exception = true + + # Worker listens and handles messages + @listeners << Thread.new do + while @running + message = @io.gets + message.handle(self) if message + end + end + + # Runners listen, but when they handle they pass themselves + # so we can reference them when we deal with their messages + @runners.each do |r| + @listeners << Thread.new do + while @running + message = r[:io].gets + message.handle(self, r) if message + end + end + end + @listeners.each{|l| l.join } + end + + def idle_runner + idle_r = nil + while idle_r.nil? + idle_r = @runners.detect{|r| r[:idle]} + sleep(1) + end + return idle_r + end + + # message handling methods + + def request_file(message, runner) + @io.write(Hydra::Messages::Worker::RequestFile.new) + runner[:idle] = true + end + + def delegate_file(message) + r = idle_runner + r[:idle] = false + r[:io].write(Hydra::Messages::Runner::RunFile.new(eval(message.serialize))) + end + + def relay_results(message) + @io.write(Hydra::Messages::Worker::Results.new(eval(message.serialize))) + end + + def shutdown + @running = false + @runners.each do |r| + r[:io].write(Hydra::Messages::Runner::Shutdown.new) + end + end + end +end diff --git a/test/worker_test.rb b/test/worker_test.rb new file mode 100644 index 0000000..80e2dcd --- /dev/null +++ b/test/worker_test.rb @@ -0,0 +1,39 @@ +require File.join(File.dirname(__FILE__), 'test_helper') + +TARGET = File.join(Dir.tmpdir, 'hydra_test.txt') +TESTFILE = File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb') + +class WorkerTest < Test::Unit::TestCase + context "with a file to test and a destination to verify" do + setup do + FileUtils.rm_f(TARGET) + end + + teardown do + FileUtils.rm_f(TARGET) + end + + should "run a test" do + @pipe = Hydra::Pipe.new + @child = Process.fork do + @pipe.identify_as_child + Hydra::Worker.new(@pipe, 1) + @pipe.close + end + @pipe.identify_as_parent + assert @pipe.gets.is_a?(Hydra::Messages::Worker::RequestFile) + @pipe.write(Hydra::Messages::Worker::RunFile.new(:file => TESTFILE)) + + response = @pipe.gets + assert response.is_a?(Hydra::Messages::Worker::Results) + + @pipe.write(Hydra::Messages::Worker::Shutdown.new) + + assert File.exists?(TARGET) + assert_equal "HYDRA", File.read(TARGET) + + @pipe.close + Process.wait(@child) + end + end +end