refactoring some bad smells

This commit is contained in:
Nick Gauthier 2010-01-29 13:57:20 -05:00
parent ca14dd4b9a
commit 7fbb72d0a8
3 changed files with 27 additions and 27 deletions

View File

@ -39,11 +39,11 @@ module Hydra #:nodoc:
private private
def boot_workers(workers) def boot_workers(workers)
workers.each do |w| workers.each do |worker|
pipe = Hydra::Pipe.new pipe = Hydra::Pipe.new
child = Process.fork do child = Process.fork do
pipe.identify_as_child pipe.identify_as_child
Hydra::Worker.new(:io => pipe, :runners => w[:runners]) Hydra::Worker.new(:io => pipe, :runners => worker[:runners])
end end
pipe.identify_as_parent pipe.identify_as_parent
@workers << { :pid => child, :io => pipe, :idle => false } @workers << { :pid => child, :io => pipe, :idle => false }
@ -51,20 +51,18 @@ module Hydra #:nodoc:
end end
def process_messages def process_messages
@running = true
Thread.abort_on_exception = true Thread.abort_on_exception = true
@workers.each do |w| @workers.each do |worker|
@listeners << Thread.new do @listeners << Thread.new do
while @running while true
begin begin
message = w[:io].gets message = worker[:io].gets
message.handle(self, w) if message message.handle(self, worker) if message
rescue IOError => ex rescue IOError => ex
$stderr.write "Master lost Worker [#{w.inspect}]\n" $stderr.write "Master lost Worker [#{worker.inspect}]\n"
w[:io].close worker[:io].close
@workers.delete(w) @workers.delete(worker)
Thread.exit Thread.exit
end end
end end

View File

@ -20,11 +20,9 @@ module Hydra #:nodoc:
def write(message) def write(message)
raise IOError unless @writer raise IOError unless @writer
raise UnprocessableMessage unless message.is_a?(Hydra::Message) raise UnprocessableMessage unless message.is_a?(Hydra::Message)
begin @writer.write(message.serialize+"\n")
@writer.write(message.serialize+"\n") rescue Errno::EPIPE
rescue Errno::EPIPE raise IOError
raise IOError
end
end end
# Closes the IO object. # Closes the IO object.

View File

@ -36,9 +36,9 @@ module Hydra #:nodoc:
# When the master sends a file down to the worker, it hits this # When the master sends a file down to the worker, it hits this
# method. Then the worker delegates the file down to a runner. # method. Then the worker delegates the file down to a runner.
def delegate_file(message) def delegate_file(message)
r = idle_runner runner = idle_runner
r[:idle] = false runner[:idle] = false
r[:io].write(RunFile.new(eval(message.serialize))) runner[:io].write(RunFile.new(eval(message.serialize)))
end end
# When a runner finishes, it sends the results up to the worker. Then the # When a runner finishes, it sends the results up to the worker. Then the
@ -82,12 +82,19 @@ module Hydra #:nodoc:
$stdout.write "WORKER| Processing Messages\n" if @verbose $stdout.write "WORKER| Processing Messages\n" if @verbose
@running = true @running = true
# Abort the worker if one of the runners has an exception
# TODO: catch this exception, return a dying message to the master # TODO: catch this exception, return a dying message to the master
# then shutdown # then shutdown
Thread.abort_on_exception = true Thread.abort_on_exception = true
# Worker listens and handles messages process_messages_from_master
process_messages_from_runners
@listeners.each{|l| l.join }
@io.close
$stdout.write "WORKER| Done processing messages\n" if @verbose
end
def process_messages_from_master
@listeners << Thread.new do @listeners << Thread.new do
while @running while @running
begin begin
@ -105,9 +112,9 @@ module Hydra #:nodoc:
end end
end end
end end
end
# Runners listen, but when they handle they pass themselves def process_messages_from_runners
# so we can reference them when we deal with their messages
@runners.each do |r| @runners.each do |r|
@listeners << Thread.new do @listeners << Thread.new do
while @running while @running
@ -125,16 +132,13 @@ module Hydra #:nodoc:
end end
end end
end end
@listeners.each{|l| l.join }
@io.close
$stdout.write "WORKER| Done processing messages\n" if @verbose
end end
# Get the next idle runner # Get the next idle runner
def idle_runner #:nodoc: def idle_runner #:nodoc:
idle_r = nil idle_r = nil
while idle_r.nil? while idle_r.nil?
idle_r = @runners.detect{|r| r[:idle]} idle_r = @runners.detect{|runner| runner[:idle]}
sleep(1) sleep(1)
end end
return idle_r return idle_r