diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb index 173aac2..aa6d221 100644 --- a/lib/hydra/master.rb +++ b/lib/hydra/master.rb @@ -49,24 +49,23 @@ module Hydra #:nodoc: def boot_workers(workers) $stdout.write "MASTER| Booting workers\n" if @verbose workers.select{|worker| worker[:type] == :local}.each do |worker| - $stdout.write "MASTER| Booting local worker\n" if @verbose boot_local_worker(worker) end workers.select{|worker| worker[:type] == :ssh}.each do |worker| - $stdout.write "MASTER| Booting ssh worker\n" if @verbose - boot_ssh_worker(worker) + @workers << worker # will boot later, during the listening phase end end def boot_local_worker(worker) runners = worker.fetch(:runners) { raise "You must specify the number of runners" } + $stdout.write "MASTER| Booting local worker\n" if @verbose pipe = Hydra::Pipe.new child = Process.fork do pipe.identify_as_child Hydra::Worker.new(:io => pipe, :runners => runners) end pipe.identify_as_parent - @workers << { :pid => child, :io => pipe, :idle => false } + @workers << { :pid => child, :io => pipe, :idle => false, :type => :local } end def boot_ssh_worker(worker) @@ -74,24 +73,28 @@ module Hydra #:nodoc: connect = worker.fetch(:connect) { raise "You must specify SSH connection options" } directory = worker.fetch(:directory) { raise "You must specify a remote directory" } command = worker.fetch(:command) { - "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners});\"" + "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\"" } - ssh = nil - child = Process.fork do - ssh = Hydra::SSH.new(connect, directory, command) - end - @workers << { :pid => child, :io => ssh, :idle => false } + $stdout.write "MASTER| Booting SSH worker\n" if @verbose + ssh = Hydra::SSH.new(connect, directory, command) + return { :io => ssh, :idle => false, :type => :ssh } end def process_messages Thread.abort_on_exception = true + $stdout.write "MASTER| Processing Messages\n" if @verbose + $stdout.write "MASTER| Workers: #{@workers}\n" if @verbose @workers.each do |worker| @listeners << Thread.new do + $stdout.write "MASTER| Listening to #{worker.inspect}\n" if @verbose + worker = boot_ssh_worker(worker) if worker.fetch(:type){ :local } == :ssh while true begin + $stdout.write "MASTER| listen....\n" if @verbose message = worker[:io].gets + $stdout.write "MASTER| got message: #{message}\n" if @verbose message.handle(self, worker) if message rescue IOError => ex $stderr.write "Master lost Worker [#{worker.inspect}]\n" diff --git a/lib/hydra/messaging_io.rb b/lib/hydra/messaging_io.rb index 9d34dc6..4b5a468 100644 --- a/lib/hydra/messaging_io.rb +++ b/lib/hydra/messaging_io.rb @@ -12,9 +12,9 @@ module Hydra #:nodoc: message = @reader.gets return nil unless message return Message.build(eval(message.chomp)) - rescue SyntaxError => ex + rescue SyntaxError, NameError $stderr.write "Not a message: [#{message.inspect}]\n" - return nil + return gets end # Write a Message to the output IO object. It will automatically diff --git a/lib/hydra/runner.rb b/lib/hydra/runner.rb index 62d57c1..5670eb9 100644 --- a/lib/hydra/runner.rb +++ b/lib/hydra/runner.rb @@ -1,4 +1,5 @@ require 'test/unit' +require 'test/unit/testresult' module Hydra #:nodoc: # Hydra class responsible for running test files. # @@ -16,13 +17,21 @@ module Hydra #:nodoc: @verbose = opts.fetch(:verbose) { false } Test::Unit.run = true + $stdout.sync = true + $stdout.write "RUNNER| Booted. Sending Request for file\n" if @verbose @io.write RequestFile.new - process_messages + begin + process_messages + rescue => ex + $stdout.write "#{ex.to_s}\n" if @verbose + raise ex + end end # Run a test file and report the results def run_file(file) + $stdout.write "RUNNER| Running file: #{file}\n" if @verbose require file output = [] @result = Test::Unit::TestResult.new diff --git a/lib/hydra/stdio.rb b/lib/hydra/stdio.rb index e2bd789..394fb05 100644 --- a/lib/hydra/stdio.rb +++ b/lib/hydra/stdio.rb @@ -8,6 +8,8 @@ module Hydra #:nodoc: def initialize() @reader = $stdin @writer = $stdout + @reader.sync = true + @writer.sync = true end end end diff --git a/test/master_test.rb b/test/master_test.rb index f7a2504..cacb11c 100644 --- a/test/master_test.rb +++ b/test/master_test.rb @@ -66,10 +66,9 @@ class MasterTest < Test::Unit::TestCase :workers => [{ :type => :ssh, :connect => 'localhost', - :directory => File.expand_path(File.join(File.dirname(__FILE__), '..')), + :directory => File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')), :runners => 1 - }], - :verbose => true + }] ) assert File.exists?(target_file) assert_equal "HYDRA", File.read(target_file) diff --git a/test/runner_test.rb b/test/runner_test.rb index a31244d..a8bbec3 100644 --- a/test/runner_test.rb +++ b/test/runner_test.rb @@ -11,7 +11,7 @@ class RunnerTest < Test::Unit::TestCase end - should "run a test" do + should "run a test in the foreground" do # flip it around to the parent is in the fork, this gives # us more direct control over the runner and proper test # coverage output @@ -25,7 +25,7 @@ class RunnerTest < Test::Unit::TestCase # this flips the above test, so that the main process runs a bit of the parent # code, but only with minimal assertion - should "be able to tell a runner to run a test" do + should "run a test in the background" do pipe = Hydra::Pipe.new child = Process.fork do run_the_runner(pipe) @@ -33,6 +33,26 @@ class RunnerTest < Test::Unit::TestCase request_a_file_and_verify_completion(pipe) Process.wait(child) end + + should "be able to run a runner over ssh" do + ssh = Hydra::SSH.new( + 'localhost', + File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')), + "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Runner.new(:io => Hydra::Stdio.new);\"" + ) + assert ssh.gets.is_a?(Hydra::Messages::Runner::RequestFile) + ssh.write(Hydra::Messages::Worker::RunFile.new(:file => test_file)) + + # grab its response. This makes us wait for it to finish + response = ssh.gets + + # tell it to shut down + ssh.write(Hydra::Messages::Worker::Shutdown.new) + + # ensure it ran + assert File.exists?(target_file) + assert_equal "HYDRA", File.read(target_file) + end end module RunnerTestHelper @@ -56,7 +76,7 @@ class RunnerTest < Test::Unit::TestCase def run_the_runner(pipe) pipe.identify_as_child - Hydra::Runner.new({:io => pipe}) + Hydra::Runner.new(:io => pipe) end end include RunnerTestHelper diff --git a/test/test_helper.rb b/test/test_helper.rb index 82080a2..4fe607a 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -9,11 +9,11 @@ require 'hydra' class Test::Unit::TestCase def target_file - File.join(Dir.tmpdir, 'hydra_test.txt') + File.expand_path(File.join(Dir.tmpdir, 'hydra_test.txt')) end def test_file - File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb') + File.expand_path(File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb')) end end