diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb index 6fb41dc..5621c39 100644 --- a/lib/hydra/master.rb +++ b/lib/hydra/master.rb @@ -30,7 +30,7 @@ module Hydra #:nodoc: child = Process.fork do pipe.identify_as_child # TODO num runners opt in next line - Hydra::Worker.new(pipe, 1) + Hydra::Worker.new(:io => pipe, :runners => 1) end pipe.identify_as_parent @workers << { :pid => child, :io => pipe, :idle => false } diff --git a/lib/hydra/runner.rb b/lib/hydra/runner.rb index f6eb1c8..cbeecbf 100644 --- a/lib/hydra/runner.rb +++ b/lib/hydra/runner.rb @@ -3,22 +3,30 @@ module Hydra #:nodoc: class Runner # Boot up a runner. It takes an IO object (generally a pipe from its # parent) to send it messages on which files to execute. - def initialize(io) - @io = io + def initialize(opts = {}) + @io = opts.fetch(:io) { raise "No IO Object" } + @verbose = opts.fetch(:verbose) { false } + @io.write Hydra::Messages::Runner::RequestFile.new process_messages end # The runner will continually read messages and handle them. def process_messages + $stdout.write "RUNNER| Processing Messages\n" if @verbose @running = true while @running begin message = @io.gets - message.handle(self) if message - @io.write Hydra::Messages::Runner::Ping.new + if message + $stdout.write "RUNNER| Received message from worker\n" if @verbose + $stdout.write " | #{message.inspect}\n" if @verbose + message.handle(self) + else + @io.write Hydra::Messages::Runner::Ping.new + end rescue IOError => ex - $stderr.write "Runner lost Worker\n" + $stderr.write "Runner lost Worker\n" if @verbose @running = false end end diff --git a/lib/hydra/worker.rb b/lib/hydra/worker.rb index dc30e3a..5ebc62c 100644 --- a/lib/hydra/worker.rb +++ b/lib/hydra/worker.rb @@ -4,12 +4,16 @@ module Hydra #:nodoc: # Create a new worker. # * io: The IO object to use to communicate with the master # * num_runners: The number of runners to launch - def initialize(io, num_runners) - @io = io + # TODO: options hash + def initialize(opts = {}) + @verbose = opts.fetch(:verbose) { false } + @io = opts.fetch(:io) { raise "No IO Object" } @runners = [] @listeners = [] - boot_runners(num_runners) + + boot_runners(opts.fetch(:runners) { 1 }) process_messages + @runners.each{|r| Process.wait r[:pid] } end @@ -46,28 +50,34 @@ module Hydra #:nodoc: # processes if necessary. def shutdown @running = false + $stdout.write "WORKER| Notifying #{@runners.size} Runners of Shutdown\n" if @verbose @runners.each do |r| + $stdout.write "WORKER| Sending Shutdown to Runner\n" if @verbose + $stdout.write " | #{r.inspect}\n" if @verbose r[:io].write(Hydra::Messages::Runner::Shutdown.new) - Thread.exit end + Thread.exit end private def boot_runners(num_runners) #:nodoc: + $stdout.write "WORKER| Booting #{num_runners} Runners\n" if @verbose num_runners.times do pipe = Hydra::Pipe.new child = Process.fork do pipe.identify_as_child - Hydra::Runner.new(pipe) + Hydra::Runner.new(:io => pipe, :verbose => @verbose) end pipe.identify_as_parent @runners << { :pid => child, :io => pipe, :idle => false } end + $stdout.write "WORKER| #{@runners.size} Runners booted\n" if @verbose end # Continuously process messages def process_messages #:nodoc: + $stdout.write "WORKER| Processing Messages\n" if @verbose @running = true # Abort the worker if one of the runners has an exception @@ -80,10 +90,15 @@ module Hydra #:nodoc: while @running begin message = @io.gets - message.handle(self) if message - @io.write Hydra::Messages::Worker::Ping.new + if message + $stdout.write "WORKER| Received Message from Master\n" if @verbose + $stdout.write " | #{message.inspect}\n" if @verbose + message.handle(self) + else + @io.write Hydra::Messages::Worker::Ping.new + end rescue IOError => ex - $stderr.write "Worker lost Master\n" + $stderr.write "Worker lost Master\n" if @verbose Thread.exit end end @@ -96,10 +111,13 @@ module Hydra #:nodoc: while @running begin message = r[:io].gets - message.handle(self, r) if message + if message + $stdout.write "WORKER| Received Message from Runner\n" if @verbose + $stdout.write " | #{message.inspect}\n" if @verbose + message.handle(self, r) + end rescue IOError => ex $stderr.write "Worker lost Runner [#{r.inspect}]\n" - @runners.delete(r) Thread.exit end end @@ -107,6 +125,7 @@ module Hydra #:nodoc: end @listeners.each{|l| l.join } @io.close + $stdout.write "WORKER| Done processing messages\n" if @verbose end # Get the next idle runner diff --git a/test/master_test.rb b/test/master_test.rb index 0568706..ea381f3 100644 --- a/test/master_test.rb +++ b/test/master_test.rb @@ -1,8 +1,5 @@ require File.join(File.dirname(__FILE__), 'test_helper') -TARGET = File.join(Dir.tmpdir, 'hydra_test.txt') -TESTFILE = File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb') - class MasterTest < Test::Unit::TestCase context "with a file to test and a destination to verify" do setup do diff --git a/test/runner_test.rb b/test/runner_test.rb index 44f4314..f251a62 100644 --- a/test/runner_test.rb +++ b/test/runner_test.rb @@ -1,8 +1,5 @@ require File.join(File.dirname(__FILE__), 'test_helper') -TARGET = File.join(Dir.tmpdir, 'hydra_test.txt') -TESTFILE = File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb') - class RunnerTest < Test::Unit::TestCase context "with a file to test and a destination to verify" do setup do @@ -55,14 +52,11 @@ class RunnerTest < Test::Unit::TestCase # ensure it ran assert File.exists?(TARGET) assert_equal "HYDRA", File.read(TARGET) - - pipe.close end def run_the_runner(pipe) pipe.identify_as_child - Hydra::Runner.new(pipe) - pipe.close + Hydra::Runner.new({:io => pipe}) end end include RunnerTestHelper diff --git a/test/test_helper.rb b/test/test_helper.rb index 5d5e8a1..58faa93 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -10,6 +10,9 @@ require 'hydra' class Test::Unit::TestCase end +TARGET = File.join(Dir.tmpdir, 'hydra_test.txt') +TESTFILE = File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb') + module Hydra #:nodoc: module Messages #:nodoc: class TestMessage < Hydra::Message diff --git a/test/worker_test.rb b/test/worker_test.rb index eb1a7cc..5e696ea 100644 --- a/test/worker_test.rb +++ b/test/worker_test.rb @@ -1,8 +1,5 @@ require File.join(File.dirname(__FILE__), 'test_helper') -TARGET = File.join(Dir.tmpdir, 'hydra_test.txt') -TESTFILE = File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb') - class WorkerTest < Test::Unit::TestCase context "with a file to test and a destination to verify" do setup do @@ -14,19 +11,18 @@ class WorkerTest < Test::Unit::TestCase end # run the worker in the foreground and the requests in the background - should "run a test" do + should "run a test in the foreground" do num_runners = 4 pipe = Hydra::Pipe.new child = Process.fork do request_a_file_and_verify_completion(pipe, num_runners) - pipe.close end run_the_worker(pipe, num_runners) Process.wait(child) end # inverse of the above test to run the worker in the background - should "be able to tell a worker to run a test" do + should "run a test in the background" do num_runners = 4 pipe = Hydra::Pipe.new child = Process.fork do @@ -34,15 +30,13 @@ class WorkerTest < Test::Unit::TestCase end request_a_file_and_verify_completion(pipe, num_runners) Process.wait(child) - pipe.close end end module WorkerTestHelper def run_the_worker(pipe, num_runners) pipe.identify_as_child - Hydra::Worker.new(pipe, num_runners) - pipe.close + Hydra::Worker.new({:io => pipe, :runners => num_runners}) end def request_a_file_and_verify_completion(pipe, num_runners) @@ -52,8 +46,7 @@ class WorkerTest < Test::Unit::TestCase end pipe.write(Hydra::Messages::Worker::RunFile.new(:file => TESTFILE)) - response = pipe.gets - assert response.is_a?(Hydra::Messages::Worker::Results) + assert pipe.gets.is_a?(Hydra::Messages::Worker::Results) pipe.write(Hydra::Messages::Worker::Shutdown.new)