master can now delegate files to workers

This commit is contained in:
Nick Gauthier 2010-01-29 11:45:42 -05:00
parent 7767b76a70
commit cbc5bb7a8e
11 changed files with 169 additions and 60 deletions

71
TODO
View File

@ -1,54 +1,21 @@
# runners = []
# cores.each do |c|
# in = pipes[c][0]
# out = pipes[c][1]
# runners << TestRunner.new(in, out)
# end
#
# files = [ ... ]
# results = []
#
# runners.each do |r|
# Thread.new do
# while !files.empty?
# results << r.run_file(files.pop)
# end
# r.shutdown
# end
# end
#
# puts results.join("\n")
#
Master
boot up workers
listen for worker messages
add worker messages to message queue
process message queue
"reply" to a message allows sending a message back down to worker
When worker asks for file but no files left, send shutdown message to worker
when worker connection breaks, end thread
wait on all threads
when all threads are done, all workers must be done
# Master
# boot up workers
# listen for worker messages
# add worker messages to message queue
# process message queue
# "reply" to a message allows sending a message back down to worker
#
# When worker asks for file but no files left, send shutdown message to worker
# when worker connection breaks, end thread
# wait on all threads
# when all threads are done, all workers must be done
#
#
# Worker
# boot up runners
# listen for runner messages
# add runner messages to message queue
# process message queue
# "reply" to a message allows sending message back down to runner
#
# when a runner asks for file but master responds with shutdown, mark self
# as terminated, shut down runners. Any runner that asks for a file is
# auto-terminated
# wait for runner threads to finish
# then exit, breaking master connection
#
# Runner
# when booted, ask for a file
# then process messages on the queue
# when it's a file, run it and send a results message
# when it's a shutdown, break main loop
when a runner asks for file but master responds with shutdown, mark self
as terminated, shut down runners. Any runner that asks for a file is
auto-terminated
wait for runner threads to finish
then exit, breaking master connection
Test individual messages? Ensure they handle the right method? May not be worth it.

View File

@ -3,4 +3,5 @@ require 'hydra/ssh'
require 'hydra/message'
require 'hydra/runner'
require 'hydra/worker'
require 'hydra/master'

64
lib/hydra/master.rb Normal file
View File

@ -0,0 +1,64 @@
module Hydra #:nodoc:
class Master
def initialize(opts = { })
@files = opts.fetch(:files) { [] }
@workers = []
@listeners = []
boot_workers
process_messages
end
# Message handling
# Send a file down to a worker
def send_file(worker)
f = @files.pop
if f
worker[:io].write(Hydra::Messages::Worker::RunFile.new(:file => f))
else
worker[:io].write(Hydra::Messages::Worker::Shutdown.new)
Thread.exit
end
end
private
def boot_workers
# simple on worker method for now
# TODO: read config
pipe = Hydra::Pipe.new
child = Process.fork do
pipe.identify_as_child
# TODO num runners opt in next line
Hydra::Worker.new(pipe, 1)
end
pipe.identify_as_parent
@workers << { :pid => child, :io => pipe, :idle => false }
end
def process_messages
@running = true
# TODO catch exceptions and report to stdout
Thread.abort_on_exception = true
@workers.each do |w|
@listeners << Thread.new do
while @running
begin
message = w[:io].gets
message.handle(self, w) if message
rescue IOError => ex
$stderr.write "Master lost Worker [#{w.inspect}]\n"
w[:io].close
@workers.delete(w)
Thread.exit
end
end
end
end
@listeners.each{|l| l.join}
end
end
end

View File

@ -43,4 +43,5 @@ end
require 'hydra/message/runner_messages'
require 'hydra/message/worker_messages'
require 'hydra/message/master_messages'

View File

@ -0,0 +1,6 @@
module Hydra #:nodoc:
module Messages #:nodoc:
module Master #:nodoc:
end
end
end

View File

@ -9,6 +9,7 @@ module Hydra #:nodoc:
end
# Message telling the Runner to run a file
# TODO: move to worker
class RunFile < Hydra::Message
attr_accessor :file
def serialize #:nodoc:
@ -32,11 +33,20 @@ module Hydra #:nodoc:
end
# Message to tell the Runner to shut down
# TODO: move to worker
class Shutdown < Hydra::Message
def handle(runner) #:nodoc:
runner.stop
end
end
# Message a runner sends to a worker to verify the connection
class Ping < Hydra::Message
# We don't do anything to handle a ping. It's just to test
# the connectivity of the IO
def handle(worker, runner)
end
end
end
end
end

View File

@ -1,11 +1,15 @@
module Hydra #:nodoc:
module Messages #:nodoc:
module Worker #:nodoc:
# Message indicating that a work needs a file to delegate to a runner
# Message indicating that a worker needs a file to delegate to a runner
class RequestFile < Hydra::Message
def handle(master, worker)
master.send_file(worker)
end
end
# Message telling a worker to delegate a file to a runner
# TODO: move to master messages
class RunFile < Hydra::Messages::Runner::RunFile
def handle(worker)
worker.delegate_file(self)
@ -14,14 +18,26 @@ module Hydra #:nodoc:
# Message relaying the results of a worker up to the master
class Results < Hydra::Messages::Runner::Results
def handle(master, worker)
master.send_file(worker)
end
end
# Message telling the worker to shut down.
# TODO: move to master
class Shutdown < Hydra::Messages::Runner::Shutdown
def handle(worker)
worker.shutdown
end
end
# Message a worker sends to a master to verify the connection
class Ping < Hydra::Message
# We don't do anything to handle a ping. It's just to test
# the connectivity of the IO
def handle(master, worker)
end
end
end
end
end

View File

@ -51,5 +51,11 @@ module Hydra #:nodoc:
@reader = @parent_read
@writer = @parent_write
end
# Output pipe nicely
def inspect
"#<#{self.class} @reader=#{@reader.to_s}, @writer=#{@writer.to_s}>"
end
end
end

View File

@ -13,8 +13,14 @@ module Hydra #:nodoc:
def process_messages
@running = true
while @running
message = @io.gets
message.handle(self) if message
begin
message = @io.gets
message.handle(self) if message
@io.write Hydra::Messages::Runner::Ping.new
rescue IOError => ex
$stderr.write "Runner lost Worker\n"
@running = false
end
end
end

View File

@ -48,6 +48,7 @@ module Hydra #:nodoc:
@running = false
@runners.each do |r|
r[:io].write(Hydra::Messages::Runner::Shutdown.new)
Thread.exit
end
end
@ -77,8 +78,14 @@ module Hydra #:nodoc:
# Worker listens and handles messages
@listeners << Thread.new do
while @running
message = @io.gets
message.handle(self) if message
begin
message = @io.gets
message.handle(self) if message
@io.write Hydra::Messages::Worker::Ping.new
rescue IOError => ex
$stderr.write "Worker lost Master\n"
Thread.exit
end
end
end
@ -91,14 +98,15 @@ module Hydra #:nodoc:
message = r[:io].gets
message.handle(self, r) if message
rescue IOError => ex
# If the other end of the pipe closes
# we will continue, because we're probably
# not @running anymore
$stderr.write "Worker lost Runner [#{r.inspect}]\n"
@runners.delete(r)
Thread.exit
end
end
end
end
@listeners.each{|l| l.join }
@io.close
end
# Get the next idle runner

24
test/master_test.rb Normal file
View File

@ -0,0 +1,24 @@
require File.join(File.dirname(__FILE__), 'test_helper')
TARGET = File.join(Dir.tmpdir, 'hydra_test.txt')
TESTFILE = File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb')
class MasterTest < Test::Unit::TestCase
context "with a file to test and a destination to verify" do
setup do
FileUtils.rm_f(TARGET)
end
teardown do
FileUtils.rm_f(TARGET)
end
should "run a test" do
m = Hydra::Master.new({
:files => Array(TESTFILE)
})
assert File.exists?(TARGET)
assert_equal "HYDRA", File.read(TARGET)
end
end
end