master can delegate to workers. all tests green now

This commit is contained in:
Nick Gauthier 2010-01-29 13:01:53 -05:00
parent cbc5bb7a8e
commit e7db4fadee
7 changed files with 51 additions and 37 deletions

View File

@ -30,7 +30,7 @@ module Hydra #:nodoc:
child = Process.fork do
pipe.identify_as_child
# TODO num runners opt in next line
Hydra::Worker.new(pipe, 1)
Hydra::Worker.new(:io => pipe, :runners => 1)
end
pipe.identify_as_parent
@workers << { :pid => child, :io => pipe, :idle => false }

View File

@ -3,22 +3,30 @@ module Hydra #:nodoc:
class Runner
# Boot up a runner. It takes an IO object (generally a pipe from its
# parent) to send it messages on which files to execute.
def initialize(io)
@io = io
def initialize(opts = {})
@io = opts.fetch(:io) { raise "No IO Object" }
@verbose = opts.fetch(:verbose) { false }
@io.write Hydra::Messages::Runner::RequestFile.new
process_messages
end
# The runner will continually read messages and handle them.
def process_messages
$stdout.write "RUNNER| Processing Messages\n" if @verbose
@running = true
while @running
begin
message = @io.gets
message.handle(self) if message
@io.write Hydra::Messages::Runner::Ping.new
if message
$stdout.write "RUNNER| Received message from worker\n" if @verbose
$stdout.write " | #{message.inspect}\n" if @verbose
message.handle(self)
else
@io.write Hydra::Messages::Runner::Ping.new
end
rescue IOError => ex
$stderr.write "Runner lost Worker\n"
$stderr.write "Runner lost Worker\n" if @verbose
@running = false
end
end

View File

@ -4,12 +4,16 @@ module Hydra #:nodoc:
# Create a new worker.
# * io: The IO object to use to communicate with the master
# * num_runners: The number of runners to launch
def initialize(io, num_runners)
@io = io
# TODO: options hash
def initialize(opts = {})
@verbose = opts.fetch(:verbose) { false }
@io = opts.fetch(:io) { raise "No IO Object" }
@runners = []
@listeners = []
boot_runners(num_runners)
boot_runners(opts.fetch(:runners) { 1 })
process_messages
@runners.each{|r| Process.wait r[:pid] }
end
@ -46,28 +50,34 @@ module Hydra #:nodoc:
# processes if necessary.
def shutdown
@running = false
$stdout.write "WORKER| Notifying #{@runners.size} Runners of Shutdown\n" if @verbose
@runners.each do |r|
$stdout.write "WORKER| Sending Shutdown to Runner\n" if @verbose
$stdout.write " | #{r.inspect}\n" if @verbose
r[:io].write(Hydra::Messages::Runner::Shutdown.new)
Thread.exit
end
Thread.exit
end
private
def boot_runners(num_runners) #:nodoc:
$stdout.write "WORKER| Booting #{num_runners} Runners\n" if @verbose
num_runners.times do
pipe = Hydra::Pipe.new
child = Process.fork do
pipe.identify_as_child
Hydra::Runner.new(pipe)
Hydra::Runner.new(:io => pipe, :verbose => @verbose)
end
pipe.identify_as_parent
@runners << { :pid => child, :io => pipe, :idle => false }
end
$stdout.write "WORKER| #{@runners.size} Runners booted\n" if @verbose
end
# Continuously process messages
def process_messages #:nodoc:
$stdout.write "WORKER| Processing Messages\n" if @verbose
@running = true
# Abort the worker if one of the runners has an exception
@ -80,10 +90,15 @@ module Hydra #:nodoc:
while @running
begin
message = @io.gets
message.handle(self) if message
@io.write Hydra::Messages::Worker::Ping.new
if message
$stdout.write "WORKER| Received Message from Master\n" if @verbose
$stdout.write " | #{message.inspect}\n" if @verbose
message.handle(self)
else
@io.write Hydra::Messages::Worker::Ping.new
end
rescue IOError => ex
$stderr.write "Worker lost Master\n"
$stderr.write "Worker lost Master\n" if @verbose
Thread.exit
end
end
@ -96,10 +111,13 @@ module Hydra #:nodoc:
while @running
begin
message = r[:io].gets
message.handle(self, r) if message
if message
$stdout.write "WORKER| Received Message from Runner\n" if @verbose
$stdout.write " | #{message.inspect}\n" if @verbose
message.handle(self, r)
end
rescue IOError => ex
$stderr.write "Worker lost Runner [#{r.inspect}]\n"
@runners.delete(r)
Thread.exit
end
end
@ -107,6 +125,7 @@ module Hydra #:nodoc:
end
@listeners.each{|l| l.join }
@io.close
$stdout.write "WORKER| Done processing messages\n" if @verbose
end
# Get the next idle runner

View File

@ -1,8 +1,5 @@
require File.join(File.dirname(__FILE__), 'test_helper')
TARGET = File.join(Dir.tmpdir, 'hydra_test.txt')
TESTFILE = File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb')
class MasterTest < Test::Unit::TestCase
context "with a file to test and a destination to verify" do
setup do

View File

@ -1,8 +1,5 @@
require File.join(File.dirname(__FILE__), 'test_helper')
TARGET = File.join(Dir.tmpdir, 'hydra_test.txt')
TESTFILE = File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb')
class RunnerTest < Test::Unit::TestCase
context "with a file to test and a destination to verify" do
setup do
@ -55,14 +52,11 @@ class RunnerTest < Test::Unit::TestCase
# ensure it ran
assert File.exists?(TARGET)
assert_equal "HYDRA", File.read(TARGET)
pipe.close
end
def run_the_runner(pipe)
pipe.identify_as_child
Hydra::Runner.new(pipe)
pipe.close
Hydra::Runner.new({:io => pipe})
end
end
include RunnerTestHelper

View File

@ -10,6 +10,9 @@ require 'hydra'
class Test::Unit::TestCase
end
TARGET = File.join(Dir.tmpdir, 'hydra_test.txt')
TESTFILE = File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb')
module Hydra #:nodoc:
module Messages #:nodoc:
class TestMessage < Hydra::Message

View File

@ -1,8 +1,5 @@
require File.join(File.dirname(__FILE__), 'test_helper')
TARGET = File.join(Dir.tmpdir, 'hydra_test.txt')
TESTFILE = File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb')
class WorkerTest < Test::Unit::TestCase
context "with a file to test and a destination to verify" do
setup do
@ -14,19 +11,18 @@ class WorkerTest < Test::Unit::TestCase
end
# run the worker in the foreground and the requests in the background
should "run a test" do
should "run a test in the foreground" do
num_runners = 4
pipe = Hydra::Pipe.new
child = Process.fork do
request_a_file_and_verify_completion(pipe, num_runners)
pipe.close
end
run_the_worker(pipe, num_runners)
Process.wait(child)
end
# inverse of the above test to run the worker in the background
should "be able to tell a worker to run a test" do
should "run a test in the background" do
num_runners = 4
pipe = Hydra::Pipe.new
child = Process.fork do
@ -34,15 +30,13 @@ class WorkerTest < Test::Unit::TestCase
end
request_a_file_and_verify_completion(pipe, num_runners)
Process.wait(child)
pipe.close
end
end
module WorkerTestHelper
def run_the_worker(pipe, num_runners)
pipe.identify_as_child
Hydra::Worker.new(pipe, num_runners)
pipe.close
Hydra::Worker.new({:io => pipe, :runners => num_runners})
end
def request_a_file_and_verify_completion(pipe, num_runners)
@ -52,8 +46,7 @@ class WorkerTest < Test::Unit::TestCase
end
pipe.write(Hydra::Messages::Worker::RunFile.new(:file => TESTFILE))
response = pipe.gets
assert response.is_a?(Hydra::Messages::Worker::Results)
assert pipe.gets.is_a?(Hydra::Messages::Worker::Results)
pipe.write(Hydra::Messages::Worker::Shutdown.new)