restructured messages

This commit is contained in:
Nick Gauthier 2010-01-29 13:30:25 -05:00
parent 1c25d7c73c
commit 847c77ef0f
7 changed files with 37 additions and 43 deletions

View File

@ -25,9 +25,9 @@ module Hydra #:nodoc:
def send_file(worker) def send_file(worker)
f = @files.pop f = @files.pop
if f if f
worker[:io].write(Hydra::Messages::Worker::RunFile.new(:file => f)) worker[:io].write(Hydra::Messages::Master::RunFile.new(:file => f))
else else
worker[:io].write(Hydra::Messages::Worker::Shutdown.new) worker[:io].write(Hydra::Messages::Master::Shutdown.new)
Thread.exit Thread.exit
end end
end end
@ -50,7 +50,6 @@ module Hydra #:nodoc:
def process_messages def process_messages
@running = true @running = true
# TODO catch exceptions and report to stdout
Thread.abort_on_exception = true Thread.abort_on_exception = true
@workers.each do |w| @workers.each do |w|

View File

@ -1,6 +1,19 @@
module Hydra #:nodoc: module Hydra #:nodoc:
module Messages #:nodoc: module Messages #:nodoc:
module Master #:nodoc: module Master #:nodoc:
# Message telling a worker to delegate a file to a runner
class RunFile < Hydra::Messages::Worker::RunFile
def handle(worker) #:nodoc:
worker.delegate_file(self)
end
end
# Message telling the worker to shut down.
class Shutdown < Hydra::Messages::Worker::Shutdown
def handle(worker) #:nodoc:
worker.shutdown
end
end
end end
end end
end end

View File

@ -8,19 +8,6 @@ module Hydra #:nodoc:
end end
end end
# Message telling the Runner to run a file
# TODO: move to worker
class RunFile < Hydra::Message
# The file that should be run
attr_accessor :file
def serialize #:nodoc:
super(:file => @file)
end
def handle(runner) #:nodoc:
runner.run_file(@file)
end
end
# Message for the Runner to respond with its results # Message for the Runner to respond with its results
class Results < Hydra::Message class Results < Hydra::Message
# The output from running the test # The output from running the test
@ -35,14 +22,6 @@ module Hydra #:nodoc:
end end
end end
# Message to tell the Runner to shut down
# TODO: move to worker
class Shutdown < Hydra::Message
def handle(runner) #:nodoc:
runner.stop
end
end
# Message a runner sends to a worker to verify the connection # Message a runner sends to a worker to verify the connection
class Ping < Hydra::Message class Ping < Hydra::Message
def handle(worker, runner) #:nodoc: def handle(worker, runner) #:nodoc:

View File

@ -8,11 +8,22 @@ module Hydra #:nodoc:
end end
end end
# Message telling a worker to delegate a file to a runner # Message telling the Runner to run a file
# TODO: move to master messages class RunFile < Hydra::Message
class RunFile < Hydra::Messages::Runner::RunFile # The file that should be run
def handle(worker) #:nodoc: attr_accessor :file
worker.delegate_file(self) def serialize #:nodoc:
super(:file => @file)
end
def handle(runner) #:nodoc:
runner.run_file(@file)
end
end
# Message to tell the Runner to shut down
class Shutdown < Hydra::Message
def handle(runner) #:nodoc:
runner.stop
end end
end end
@ -23,14 +34,6 @@ module Hydra #:nodoc:
end end
end end
# Message telling the worker to shut down.
# TODO: move to master
class Shutdown < Hydra::Messages::Runner::Shutdown
def handle(worker) #:nodoc:
worker.shutdown
end
end
# Message a worker sends to a master to verify the connection # Message a worker sends to a master to verify the connection
class Ping < Hydra::Message class Ping < Hydra::Message
def handle(master, worker) #:nodoc: def handle(master, worker) #:nodoc:

View File

@ -38,7 +38,7 @@ module Hydra #:nodoc:
def delegate_file(message) def delegate_file(message)
r = idle_runner r = idle_runner
r[:idle] = false r[:idle] = false
r[:io].write(Hydra::Messages::Runner::RunFile.new(eval(message.serialize))) r[:io].write(Hydra::Messages::Worker::RunFile.new(eval(message.serialize)))
end end
# When a runner finishes, it sends the results up to the worker. Then the # When a runner finishes, it sends the results up to the worker. Then the
@ -60,7 +60,7 @@ module Hydra #:nodoc:
@runners.each do |r| @runners.each do |r|
$stdout.write "WORKER| Sending Shutdown to Runner\n" if @verbose $stdout.write "WORKER| Sending Shutdown to Runner\n" if @verbose
$stdout.write " | #{r.inspect}\n" if @verbose $stdout.write " | #{r.inspect}\n" if @verbose
r[:io].write(Hydra::Messages::Runner::Shutdown.new) r[:io].write(Hydra::Messages::Worker::Shutdown.new)
end end
Thread.exit Thread.exit
end end

View File

@ -41,13 +41,13 @@ class RunnerTest < Test::Unit::TestCase
# make sure it asks for a file, then give it one # make sure it asks for a file, then give it one
assert pipe.gets.is_a?(Hydra::Messages::Runner::RequestFile) assert pipe.gets.is_a?(Hydra::Messages::Runner::RequestFile)
pipe.write(Hydra::Messages::Runner::RunFile.new(:file => TESTFILE)) pipe.write(Hydra::Messages::Worker::RunFile.new(:file => TESTFILE))
# grab its response. This makes us wait for it to finish # grab its response. This makes us wait for it to finish
response = pipe.gets response = pipe.gets
# tell it to shut down # tell it to shut down
pipe.write(Hydra::Messages::Runner::Shutdown.new) pipe.write(Hydra::Messages::Worker::Shutdown.new)
# ensure it ran # ensure it ran
assert File.exists?(TARGET) assert File.exists?(TARGET)

View File

@ -44,11 +44,11 @@ class WorkerTest < Test::Unit::TestCase
num_runners.times do num_runners.times do
assert pipe.gets.is_a?(Hydra::Messages::Worker::RequestFile) assert pipe.gets.is_a?(Hydra::Messages::Worker::RequestFile)
end end
pipe.write(Hydra::Messages::Worker::RunFile.new(:file => TESTFILE)) pipe.write(Hydra::Messages::Master::RunFile.new(:file => TESTFILE))
assert pipe.gets.is_a?(Hydra::Messages::Worker::Results) assert pipe.gets.is_a?(Hydra::Messages::Worker::Results)
pipe.write(Hydra::Messages::Worker::Shutdown.new) pipe.write(Hydra::Messages::Master::Shutdown.new)
assert File.exists?(TARGET) assert File.exists?(TARGET)
assert_equal "HYDRA", File.read(TARGET) assert_equal "HYDRA", File.read(TARGET)