hydra/lib/hydra/worker.rb

147 lines
4.2 KiB
Ruby

module Hydra #:nodoc:
# Hydra class responsible to dispatching runners and communicating with the master.
#
# The Worker is never run directly by a user. Workers are created by a
# Master to delegate to Runners.
#
# The general convention is to have one Worker per machine on a distributed
# network.
class Worker
include Hydra::Messages::Worker
traceable('WORKER')
# Create a new worker.
# * io: The IO object to use to communicate with the master
# * num_runners: The number of runners to launch
def initialize(opts = {})
@verbose = opts.fetch(:verbose) { false }
@io = opts.fetch(:io) { raise "No IO Object" }
@runners = []
@listeners = []
boot_runners(opts.fetch(:runners) { 1 })
process_messages
@runners.each{|r| Process.wait r[:pid] }
end
# message handling methods
# When a runner wants a file, it hits this method with a message.
# Then the worker bubbles the file request up to the master.
def request_file(message, runner)
@io.write(RequestFile.new)
runner[:idle] = true
end
# When the master sends a file down to the worker, it hits this
# method. Then the worker delegates the file down to a runner.
def delegate_file(message)
runner = idle_runner
runner[:idle] = false
runner[:io].write(RunFile.new(eval(message.serialize)))
end
# When a runner finishes, it sends the results up to the worker. Then the
# worker sends the results up to the master.
def relay_results(message, runner)
runner[:idle] = true
@io.write(Results.new(eval(message.serialize)))
end
# When a master issues a shutdown order, it hits this method, which causes
# the worker to send shutdown messages to its runners.
def shutdown
@running = false
trace "Notifying #{@runners.size} Runners of Shutdown"
@runners.each do |r|
trace "Sending Shutdown to Runner"
trace "\t#{r.inspect}"
r[:io].write(Shutdown.new)
end
Thread.exit
end
private
def boot_runners(num_runners) #:nodoc:
trace "Booting #{num_runners} Runners"
num_runners.times do
pipe = Hydra::Pipe.new
child = SafeFork.fork do
pipe.identify_as_child
Hydra::Runner.new(:io => pipe, :verbose => @verbose)
end
pipe.identify_as_parent
@runners << { :pid => child, :io => pipe, :idle => false }
end
trace "#{@runners.size} Runners booted"
end
# Continuously process messages
def process_messages #:nodoc:
trace "Processing Messages"
@running = true
Thread.abort_on_exception = true
process_messages_from_master
process_messages_from_runners
@listeners.each{|l| l.join }
@io.close
trace "Done processing messages"
end
def process_messages_from_master
@listeners << Thread.new do
while @running
begin
message = @io.gets
if message and !message.class.to_s.index("Master").nil?
trace "Received Message from Master"
trace "\t#{message.inspect}"
message.handle(self)
else
trace "Nothing from Master, Pinging"
@io.write Ping.new
end
rescue IOError => ex
trace "Worker lost Master"
Thread.exit
end
end
end
end
def process_messages_from_runners
@runners.each do |r|
@listeners << Thread.new do
while @running
begin
message = r[:io].gets
if message and !message.class.to_s.index("Runner").nil?
trace "Received Message from Runner"
trace "\t#{message.inspect}"
message.handle(self, r)
end
rescue IOError => ex
trace "Worker lost Runner [#{r.inspect}]"
Thread.exit
end
end
end
end
end
# Get the next idle runner
def idle_runner #:nodoc:
idle_r = nil
while idle_r.nil?
idle_r = @runners.detect{|runner| runner[:idle]}
end
return idle_r
end
end
end