147 lines
4.2 KiB
Ruby
147 lines
4.2 KiB
Ruby
module Hydra #:nodoc:
|
|
# Hydra class responsible to dispatching runners and communicating with the master.
|
|
#
|
|
# The Worker is never run directly by a user. Workers are created by a
|
|
# Master to delegate to Runners.
|
|
#
|
|
# The general convention is to have one Worker per machine on a distributed
|
|
# network.
|
|
class Worker
|
|
include Hydra::Messages::Worker
|
|
traceable('WORKER')
|
|
# Create a new worker.
|
|
# * io: The IO object to use to communicate with the master
|
|
# * num_runners: The number of runners to launch
|
|
def initialize(opts = {})
|
|
@verbose = opts.fetch(:verbose) { false }
|
|
@io = opts.fetch(:io) { raise "No IO Object" }
|
|
@runners = []
|
|
@listeners = []
|
|
|
|
boot_runners(opts.fetch(:runners) { 1 })
|
|
process_messages
|
|
|
|
@runners.each{|r| Process.wait r[:pid] }
|
|
end
|
|
|
|
|
|
# message handling methods
|
|
|
|
# When a runner wants a file, it hits this method with a message.
|
|
# Then the worker bubbles the file request up to the master.
|
|
def request_file(message, runner)
|
|
@io.write(RequestFile.new)
|
|
runner[:idle] = true
|
|
end
|
|
|
|
# When the master sends a file down to the worker, it hits this
|
|
# method. Then the worker delegates the file down to a runner.
|
|
def delegate_file(message)
|
|
runner = idle_runner
|
|
runner[:idle] = false
|
|
runner[:io].write(RunFile.new(eval(message.serialize)))
|
|
end
|
|
|
|
# When a runner finishes, it sends the results up to the worker. Then the
|
|
# worker sends the results up to the master.
|
|
def relay_results(message, runner)
|
|
runner[:idle] = true
|
|
@io.write(Results.new(eval(message.serialize)))
|
|
end
|
|
|
|
# When a master issues a shutdown order, it hits this method, which causes
|
|
# the worker to send shutdown messages to its runners.
|
|
def shutdown
|
|
@running = false
|
|
trace "Notifying #{@runners.size} Runners of Shutdown"
|
|
@runners.each do |r|
|
|
trace "Sending Shutdown to Runner"
|
|
trace "\t#{r.inspect}"
|
|
r[:io].write(Shutdown.new)
|
|
end
|
|
Thread.exit
|
|
end
|
|
|
|
private
|
|
|
|
def boot_runners(num_runners) #:nodoc:
|
|
trace "Booting #{num_runners} Runners"
|
|
num_runners.times do
|
|
pipe = Hydra::Pipe.new
|
|
child = SafeFork.fork do
|
|
pipe.identify_as_child
|
|
Hydra::Runner.new(:io => pipe, :verbose => @verbose)
|
|
end
|
|
pipe.identify_as_parent
|
|
@runners << { :pid => child, :io => pipe, :idle => false }
|
|
end
|
|
trace "#{@runners.size} Runners booted"
|
|
end
|
|
|
|
# Continuously process messages
|
|
def process_messages #:nodoc:
|
|
trace "Processing Messages"
|
|
@running = true
|
|
|
|
Thread.abort_on_exception = true
|
|
|
|
process_messages_from_master
|
|
process_messages_from_runners
|
|
|
|
@listeners.each{|l| l.join }
|
|
@io.close
|
|
trace "Done processing messages"
|
|
end
|
|
|
|
def process_messages_from_master
|
|
@listeners << Thread.new do
|
|
while @running
|
|
begin
|
|
message = @io.gets
|
|
if message and !message.class.to_s.index("Master").nil?
|
|
trace "Received Message from Master"
|
|
trace "\t#{message.inspect}"
|
|
message.handle(self)
|
|
else
|
|
trace "Nothing from Master, Pinging"
|
|
@io.write Ping.new
|
|
end
|
|
rescue IOError => ex
|
|
trace "Worker lost Master"
|
|
Thread.exit
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def process_messages_from_runners
|
|
@runners.each do |r|
|
|
@listeners << Thread.new do
|
|
while @running
|
|
begin
|
|
message = r[:io].gets
|
|
if message and !message.class.to_s.index("Runner").nil?
|
|
trace "Received Message from Runner"
|
|
trace "\t#{message.inspect}"
|
|
message.handle(self, r)
|
|
end
|
|
rescue IOError => ex
|
|
trace "Worker lost Runner [#{r.inspect}]"
|
|
Thread.exit
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
# Get the next idle runner
|
|
def idle_runner #:nodoc:
|
|
idle_r = nil
|
|
while idle_r.nil?
|
|
idle_r = @runners.detect{|runner| runner[:idle]}
|
|
end
|
|
return idle_r
|
|
end
|
|
end
|
|
end
|