made master traceable

This commit is contained in:
Nick Gauthier 2010-02-04 11:18:06 -05:00
parent aa4a936f09
commit 307a57d176
4 changed files with 22 additions and 20 deletions

View File

@ -5,6 +5,7 @@ module Hydra #:nodoc:
# The Master is run once for any given testing session. # The Master is run once for any given testing session.
class Master class Master
include Hydra::Messages::Master include Hydra::Messages::Master
traceable('MASTER')
# Create a new Master # Create a new Master
# #
# Options: # Options:
@ -29,10 +30,10 @@ module Hydra #:nodoc:
# default is one worker that is configured to use a pipe with one runner # default is one worker that is configured to use a pipe with one runner
worker_cfg = opts.fetch('workers') { [ { 'type' => 'local', 'runners' => 1} ] } worker_cfg = opts.fetch('workers') { [ { 'type' => 'local', 'runners' => 1} ] }
$stdout.write "MASTER| Initialized\n" if @verbose trace "Initialized"
$stdout.write "MASTER| Files: (#{@files.inspect})\n" if @verbose trace " Files: (#{@files.inspect})"
$stdout.write "MASTER| Workers: (#{worker_cfg.inspect})\n" if @verbose trace " Workers: (#{worker_cfg.inspect})"
$stdout.write "MASTER| Verbose: (#{@verbose.inspect})\n" if @verbose trace " Verbose: (#{@verbose.inspect})"
boot_workers worker_cfg boot_workers worker_cfg
process_messages process_messages
@ -52,7 +53,7 @@ module Hydra #:nodoc:
$stdout.write message.output $stdout.write message.output
# only delete one # only delete one
@incomplete_files.delete_at(@incomplete_files.index(message.file)) @incomplete_files.delete_at(@incomplete_files.index(message.file))
$stdout.write "MASTER| #{@incomplete_files.size} Files Remaining\n" if @verbose trace "#{@incomplete_files.size} Files Remaining"
if @incomplete_files.empty? if @incomplete_files.empty?
shutdown_all_workers shutdown_all_workers
else else
@ -63,10 +64,10 @@ module Hydra #:nodoc:
private private
def boot_workers(workers) def boot_workers(workers)
$stdout.write "MASTER| Booting #{workers.size} workers\n" if @verbose trace "Booting #{workers.size} workers"
workers.each do |worker| workers.each do |worker|
worker.stringify_keys! worker.stringify_keys!
$stdout.write "MASTER| worker opts #{worker.inspect}\n" if @verbose trace "worker opts #{worker.inspect}"
type = worker.fetch('type') { 'local' } type = worker.fetch('type') { 'local' }
if type.to_s == 'local' if type.to_s == 'local'
boot_local_worker(worker) boot_local_worker(worker)
@ -80,7 +81,7 @@ module Hydra #:nodoc:
def boot_local_worker(worker) def boot_local_worker(worker)
runners = worker.fetch('runners') { raise "You must specify the number of runners" } runners = worker.fetch('runners') { raise "You must specify the number of runners" }
$stdout.write "MASTER| Booting local worker\n" if @verbose trace "Booting local worker"
pipe = Hydra::Pipe.new pipe = Hydra::Pipe.new
child = Process.fork do child = Process.fork do
pipe.identify_as_child pipe.identify_as_child
@ -98,13 +99,13 @@ module Hydra #:nodoc:
"ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\"" "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\""
} }
$stdout.write "MASTER| Booting SSH worker\n" if @verbose trace "Booting SSH worker"
ssh = Hydra::SSH.new(connect, directory, command) ssh = Hydra::SSH.new(connect, directory, command)
return { :io => ssh, :idle => false, :type => :ssh } return { :io => ssh, :idle => false, :type => :ssh }
end end
def shutdown_all_workers def shutdown_all_workers
$stdout.write "MASTER| Shutting down all workers\n" if @verbose trace "Shutting down all workers"
@workers.each do |worker| @workers.each do |worker|
worker[:io].write(Shutdown.new) if worker[:io] worker[:io].write(Shutdown.new) if worker[:io]
worker[:io].close if worker[:io] worker[:io].close if worker[:io]
@ -115,11 +116,11 @@ module Hydra #:nodoc:
def process_messages def process_messages
Thread.abort_on_exception = true Thread.abort_on_exception = true
$stdout.write "MASTER| Processing Messages\n" if @verbose trace "Processing Messages"
$stdout.write "MASTER| Workers: #{@workers.inspect}\n" if @verbose trace "Workers: #{@workers.inspect}"
@workers.each do |worker| @workers.each do |worker|
@listeners << Thread.new do @listeners << Thread.new do
$stdout.write "MASTER| Listening to #{worker.inspect}\n" if @verbose trace "Listening to #{worker.inspect}"
if worker.fetch('type') { 'local' }.to_s == 'ssh' if worker.fetch('type') { 'local' }.to_s == 'ssh'
worker = boot_ssh_worker(worker) worker = boot_ssh_worker(worker)
@workers << worker @workers << worker
@ -127,10 +128,10 @@ module Hydra #:nodoc:
while true while true
begin begin
message = worker[:io].gets message = worker[:io].gets
$stdout.write "MASTER| got message: #{message}\n" if @verbose trace "got message: #{message}"
message.handle(self, worker) if message message.handle(self, worker) if message
rescue IOError rescue IOError
$stderr.write "MASTER| lost Worker [#{worker.inspect}]\n" if @verbose trace "lost Worker [#{worker.inspect}]"
Thread.exit Thread.exit
end end
end end

View File

@ -8,6 +8,7 @@ module Hydra #:nodoc:
include Hydra::Trace::InstanceMethods include Hydra::Trace::InstanceMethods
class << self; attr_accessor :_traceable_prefix; end class << self; attr_accessor :_traceable_prefix; end
self._traceable_prefix = prefix self._traceable_prefix = prefix
$stdout.sync = true
end end
end end

View File

@ -2,7 +2,7 @@ require File.join(File.dirname(__FILE__), '..', 'test_helper')
class WriteFileTest < Test::Unit::TestCase class WriteFileTest < Test::Unit::TestCase
def test_slow def test_slow
sleep(2) sleep(5)
end end
end end

View File

@ -27,10 +27,10 @@ class MasterTest < Test::Unit::TestCase
assert_equal "HYDRA"*6, File.read(target_file) assert_equal "HYDRA"*6, File.read(target_file)
end end
# The test being run sleeps for 2 seconds. So, if this was run in # The test being run sleeps for 5 seconds. So, if this was run in
# series, it would take at least 20 seconds. This test ensures that # series, it would take at least 50 seconds. This test ensures that
# in runs in less than that amount of time. Since there are 10 # in runs in less than that amount of time. Since there are 10
# runners to run the file 10 times, it should only take 2-4 seconds # runners to run the file 10 times, it should only take 5-10 seconds
# based on overhead. # based on overhead.
should "run a slow test 10 times on 1 worker with 10 runners quickly" do should "run a slow test 10 times on 1 worker with 10 runners quickly" do
start = Time.now start = Time.now
@ -41,7 +41,7 @@ class MasterTest < Test::Unit::TestCase
] ]
) )
finish = Time.now finish = Time.now
assert (finish-start) < 15, "took #{finish-start} seconds" assert (finish-start) < 30, "took #{finish-start} seconds"
end end
should "run a slow test 10 times on 2 workers with 5 runners each quickly" do should "run a slow test 10 times on 2 workers with 5 runners each quickly" do