diff --git a/TODO b/TODO index ff085c4..67f5254 100644 --- a/TODO +++ b/TODO @@ -1,54 +1,21 @@ -# runners = [] -# cores.each do |c| -# in = pipes[c][0] -# out = pipes[c][1] -# runners << TestRunner.new(in, out) -# end -# -# files = [ ... ] -# results = [] -# -# runners.each do |r| -# Thread.new do -# while !files.empty? -# results << r.run_file(files.pop) -# end -# r.shutdown -# end -# end -# -# puts results.join("\n") -# +Master +boot up workers +listen for worker messages +add worker messages to message queue +process message queue +"reply" to a message allows sending a message back down to worker + +When worker asks for file but no files left, send shutdown message to worker +when worker connection breaks, end thread +wait on all threads +when all threads are done, all workers must be done -# Master -# boot up workers -# listen for worker messages -# add worker messages to message queue -# process message queue -# "reply" to a message allows sending a message back down to worker -# -# When worker asks for file but no files left, send shutdown message to worker -# when worker connection breaks, end thread -# wait on all threads -# when all threads are done, all workers must be done -# -# -# Worker -# boot up runners -# listen for runner messages -# add runner messages to message queue -# process message queue -# "reply" to a message allows sending message back down to runner -# -# when a runner asks for file but master responds with shutdown, mark self -# as terminated, shut down runners. Any runner that asks for a file is -# auto-terminated -# wait for runner threads to finish -# then exit, breaking master connection -# -# Runner -# when booted, ask for a file -# then process messages on the queue -# when it's a file, run it and send a results message -# when it's a shutdown, break main loop + +when a runner asks for file but master responds with shutdown, mark self + as terminated, shut down runners. Any runner that asks for a file is + auto-terminated +wait for runner threads to finish +then exit, breaking master connection + +Test individual messages? Ensure they handle the right method? May not be worth it. diff --git a/lib/hydra.rb b/lib/hydra.rb index 26c9b78..71a4c4c 100644 --- a/lib/hydra.rb +++ b/lib/hydra.rb @@ -3,4 +3,5 @@ require 'hydra/ssh' require 'hydra/message' require 'hydra/runner' require 'hydra/worker' +require 'hydra/master' diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb new file mode 100644 index 0000000..6fb41dc --- /dev/null +++ b/lib/hydra/master.rb @@ -0,0 +1,64 @@ +module Hydra #:nodoc: + class Master + def initialize(opts = { }) + @files = opts.fetch(:files) { [] } + @workers = [] + @listeners = [] + boot_workers + process_messages + end + + # Message handling + + # Send a file down to a worker + def send_file(worker) + f = @files.pop + if f + worker[:io].write(Hydra::Messages::Worker::RunFile.new(:file => f)) + else + worker[:io].write(Hydra::Messages::Worker::Shutdown.new) + Thread.exit + end + end + + private + + def boot_workers + # simple on worker method for now + # TODO: read config + pipe = Hydra::Pipe.new + child = Process.fork do + pipe.identify_as_child + # TODO num runners opt in next line + Hydra::Worker.new(pipe, 1) + end + pipe.identify_as_parent + @workers << { :pid => child, :io => pipe, :idle => false } + end + + def process_messages + @running = true + + # TODO catch exceptions and report to stdout + Thread.abort_on_exception = true + + @workers.each do |w| + @listeners << Thread.new do + while @running + begin + message = w[:io].gets + message.handle(self, w) if message + rescue IOError => ex + $stderr.write "Master lost Worker [#{w.inspect}]\n" + w[:io].close + @workers.delete(w) + Thread.exit + end + end + end + end + + @listeners.each{|l| l.join} + end + end +end diff --git a/lib/hydra/message.rb b/lib/hydra/message.rb index 98c727a..b9a899c 100644 --- a/lib/hydra/message.rb +++ b/lib/hydra/message.rb @@ -43,4 +43,5 @@ end require 'hydra/message/runner_messages' require 'hydra/message/worker_messages' +require 'hydra/message/master_messages' diff --git a/lib/hydra/message/master_messages.rb b/lib/hydra/message/master_messages.rb new file mode 100644 index 0000000..e97347f --- /dev/null +++ b/lib/hydra/message/master_messages.rb @@ -0,0 +1,6 @@ +module Hydra #:nodoc: + module Messages #:nodoc: + module Master #:nodoc: + end + end +end diff --git a/lib/hydra/message/runner_messages.rb b/lib/hydra/message/runner_messages.rb index 389d459..6aec99a 100644 --- a/lib/hydra/message/runner_messages.rb +++ b/lib/hydra/message/runner_messages.rb @@ -9,6 +9,7 @@ module Hydra #:nodoc: end # Message telling the Runner to run a file + # TODO: move to worker class RunFile < Hydra::Message attr_accessor :file def serialize #:nodoc: @@ -32,11 +33,20 @@ module Hydra #:nodoc: end # Message to tell the Runner to shut down + # TODO: move to worker class Shutdown < Hydra::Message def handle(runner) #:nodoc: runner.stop end end + + # Message a runner sends to a worker to verify the connection + class Ping < Hydra::Message + # We don't do anything to handle a ping. It's just to test + # the connectivity of the IO + def handle(worker, runner) + end + end end end end diff --git a/lib/hydra/message/worker_messages.rb b/lib/hydra/message/worker_messages.rb index 1ebafc5..04bd1e1 100644 --- a/lib/hydra/message/worker_messages.rb +++ b/lib/hydra/message/worker_messages.rb @@ -1,11 +1,15 @@ module Hydra #:nodoc: module Messages #:nodoc: module Worker #:nodoc: - # Message indicating that a work needs a file to delegate to a runner + # Message indicating that a worker needs a file to delegate to a runner class RequestFile < Hydra::Message + def handle(master, worker) + master.send_file(worker) + end end # Message telling a worker to delegate a file to a runner + # TODO: move to master messages class RunFile < Hydra::Messages::Runner::RunFile def handle(worker) worker.delegate_file(self) @@ -14,14 +18,26 @@ module Hydra #:nodoc: # Message relaying the results of a worker up to the master class Results < Hydra::Messages::Runner::Results + def handle(master, worker) + master.send_file(worker) + end end # Message telling the worker to shut down. + # TODO: move to master class Shutdown < Hydra::Messages::Runner::Shutdown def handle(worker) worker.shutdown end end + + # Message a worker sends to a master to verify the connection + class Ping < Hydra::Message + # We don't do anything to handle a ping. It's just to test + # the connectivity of the IO + def handle(master, worker) + end + end end end end diff --git a/lib/hydra/pipe.rb b/lib/hydra/pipe.rb index 0dde552..a53f106 100644 --- a/lib/hydra/pipe.rb +++ b/lib/hydra/pipe.rb @@ -51,5 +51,11 @@ module Hydra #:nodoc: @reader = @parent_read @writer = @parent_write end + + # Output pipe nicely + def inspect + "#<#{self.class} @reader=#{@reader.to_s}, @writer=#{@writer.to_s}>" + end + end end diff --git a/lib/hydra/runner.rb b/lib/hydra/runner.rb index 71712d6..f6eb1c8 100644 --- a/lib/hydra/runner.rb +++ b/lib/hydra/runner.rb @@ -13,8 +13,14 @@ module Hydra #:nodoc: def process_messages @running = true while @running - message = @io.gets - message.handle(self) if message + begin + message = @io.gets + message.handle(self) if message + @io.write Hydra::Messages::Runner::Ping.new + rescue IOError => ex + $stderr.write "Runner lost Worker\n" + @running = false + end end end diff --git a/lib/hydra/worker.rb b/lib/hydra/worker.rb index f9abea8..dc30e3a 100644 --- a/lib/hydra/worker.rb +++ b/lib/hydra/worker.rb @@ -48,6 +48,7 @@ module Hydra #:nodoc: @running = false @runners.each do |r| r[:io].write(Hydra::Messages::Runner::Shutdown.new) + Thread.exit end end @@ -77,8 +78,14 @@ module Hydra #:nodoc: # Worker listens and handles messages @listeners << Thread.new do while @running - message = @io.gets - message.handle(self) if message + begin + message = @io.gets + message.handle(self) if message + @io.write Hydra::Messages::Worker::Ping.new + rescue IOError => ex + $stderr.write "Worker lost Master\n" + Thread.exit + end end end @@ -91,14 +98,15 @@ module Hydra #:nodoc: message = r[:io].gets message.handle(self, r) if message rescue IOError => ex - # If the other end of the pipe closes - # we will continue, because we're probably - # not @running anymore + $stderr.write "Worker lost Runner [#{r.inspect}]\n" + @runners.delete(r) + Thread.exit end end end end @listeners.each{|l| l.join } + @io.close end # Get the next idle runner diff --git a/test/master_test.rb b/test/master_test.rb new file mode 100644 index 0000000..0568706 --- /dev/null +++ b/test/master_test.rb @@ -0,0 +1,24 @@ +require File.join(File.dirname(__FILE__), 'test_helper') + +TARGET = File.join(Dir.tmpdir, 'hydra_test.txt') +TESTFILE = File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb') + +class MasterTest < Test::Unit::TestCase + context "with a file to test and a destination to verify" do + setup do + FileUtils.rm_f(TARGET) + end + + teardown do + FileUtils.rm_f(TARGET) + end + + should "run a test" do + m = Hydra::Master.new({ + :files => Array(TESTFILE) + }) + assert File.exists?(TARGET) + assert_equal "HYDRA", File.read(TARGET) + end + end +end