ssh-master working

This commit is contained in:
Nick Gauthier 2010-02-03 16:03:16 -05:00
parent 35cf4cc8e5
commit 3d6484e315
7 changed files with 54 additions and 21 deletions

View File

@ -49,24 +49,23 @@ module Hydra #:nodoc:
def boot_workers(workers) def boot_workers(workers)
$stdout.write "MASTER| Booting workers\n" if @verbose $stdout.write "MASTER| Booting workers\n" if @verbose
workers.select{|worker| worker[:type] == :local}.each do |worker| workers.select{|worker| worker[:type] == :local}.each do |worker|
$stdout.write "MASTER| Booting local worker\n" if @verbose
boot_local_worker(worker) boot_local_worker(worker)
end end
workers.select{|worker| worker[:type] == :ssh}.each do |worker| workers.select{|worker| worker[:type] == :ssh}.each do |worker|
$stdout.write "MASTER| Booting ssh worker\n" if @verbose @workers << worker # will boot later, during the listening phase
boot_ssh_worker(worker)
end end
end end
def boot_local_worker(worker) def boot_local_worker(worker)
runners = worker.fetch(:runners) { raise "You must specify the number of runners" } runners = worker.fetch(:runners) { raise "You must specify the number of runners" }
$stdout.write "MASTER| Booting local worker\n" if @verbose
pipe = Hydra::Pipe.new pipe = Hydra::Pipe.new
child = Process.fork do child = Process.fork do
pipe.identify_as_child pipe.identify_as_child
Hydra::Worker.new(:io => pipe, :runners => runners) Hydra::Worker.new(:io => pipe, :runners => runners)
end end
pipe.identify_as_parent pipe.identify_as_parent
@workers << { :pid => child, :io => pipe, :idle => false } @workers << { :pid => child, :io => pipe, :idle => false, :type => :local }
end end
def boot_ssh_worker(worker) def boot_ssh_worker(worker)
@ -74,24 +73,28 @@ module Hydra #:nodoc:
connect = worker.fetch(:connect) { raise "You must specify SSH connection options" } connect = worker.fetch(:connect) { raise "You must specify SSH connection options" }
directory = worker.fetch(:directory) { raise "You must specify a remote directory" } directory = worker.fetch(:directory) { raise "You must specify a remote directory" }
command = worker.fetch(:command) { command = worker.fetch(:command) {
"ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners});\"" "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\""
} }
ssh = nil $stdout.write "MASTER| Booting SSH worker\n" if @verbose
child = Process.fork do
ssh = Hydra::SSH.new(connect, directory, command) ssh = Hydra::SSH.new(connect, directory, command)
end return { :io => ssh, :idle => false, :type => :ssh }
@workers << { :pid => child, :io => ssh, :idle => false }
end end
def process_messages def process_messages
Thread.abort_on_exception = true Thread.abort_on_exception = true
$stdout.write "MASTER| Processing Messages\n" if @verbose
$stdout.write "MASTER| Workers: #{@workers}\n" if @verbose
@workers.each do |worker| @workers.each do |worker|
@listeners << Thread.new do @listeners << Thread.new do
$stdout.write "MASTER| Listening to #{worker.inspect}\n" if @verbose
worker = boot_ssh_worker(worker) if worker.fetch(:type){ :local } == :ssh
while true while true
begin begin
$stdout.write "MASTER| listen....\n" if @verbose
message = worker[:io].gets message = worker[:io].gets
$stdout.write "MASTER| got message: #{message}\n" if @verbose
message.handle(self, worker) if message message.handle(self, worker) if message
rescue IOError => ex rescue IOError => ex
$stderr.write "Master lost Worker [#{worker.inspect}]\n" $stderr.write "Master lost Worker [#{worker.inspect}]\n"

View File

@ -12,9 +12,9 @@ module Hydra #:nodoc:
message = @reader.gets message = @reader.gets
return nil unless message return nil unless message
return Message.build(eval(message.chomp)) return Message.build(eval(message.chomp))
rescue SyntaxError => ex rescue SyntaxError, NameError
$stderr.write "Not a message: [#{message.inspect}]\n" $stderr.write "Not a message: [#{message.inspect}]\n"
return nil return gets
end end
# Write a Message to the output IO object. It will automatically # Write a Message to the output IO object. It will automatically

View File

@ -1,4 +1,5 @@
require 'test/unit' require 'test/unit'
require 'test/unit/testresult'
module Hydra #:nodoc: module Hydra #:nodoc:
# Hydra class responsible for running test files. # Hydra class responsible for running test files.
# #
@ -16,13 +17,21 @@ module Hydra #:nodoc:
@verbose = opts.fetch(:verbose) { false } @verbose = opts.fetch(:verbose) { false }
Test::Unit.run = true Test::Unit.run = true
$stdout.sync = true
$stdout.write "RUNNER| Booted. Sending Request for file\n" if @verbose
@io.write RequestFile.new @io.write RequestFile.new
begin
process_messages process_messages
rescue => ex
$stdout.write "#{ex.to_s}\n" if @verbose
raise ex
end
end end
# Run a test file and report the results # Run a test file and report the results
def run_file(file) def run_file(file)
$stdout.write "RUNNER| Running file: #{file}\n" if @verbose
require file require file
output = [] output = []
@result = Test::Unit::TestResult.new @result = Test::Unit::TestResult.new

View File

@ -8,6 +8,8 @@ module Hydra #:nodoc:
def initialize() def initialize()
@reader = $stdin @reader = $stdin
@writer = $stdout @writer = $stdout
@reader.sync = true
@writer.sync = true
end end
end end
end end

View File

@ -66,10 +66,9 @@ class MasterTest < Test::Unit::TestCase
:workers => [{ :workers => [{
:type => :ssh, :type => :ssh,
:connect => 'localhost', :connect => 'localhost',
:directory => File.expand_path(File.join(File.dirname(__FILE__), '..')), :directory => File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')),
:runners => 1 :runners => 1
}], }]
:verbose => true
) )
assert File.exists?(target_file) assert File.exists?(target_file)
assert_equal "HYDRA", File.read(target_file) assert_equal "HYDRA", File.read(target_file)

View File

@ -11,7 +11,7 @@ class RunnerTest < Test::Unit::TestCase
end end
should "run a test" do should "run a test in the foreground" do
# flip it around to the parent is in the fork, this gives # flip it around to the parent is in the fork, this gives
# us more direct control over the runner and proper test # us more direct control over the runner and proper test
# coverage output # coverage output
@ -25,7 +25,7 @@ class RunnerTest < Test::Unit::TestCase
# this flips the above test, so that the main process runs a bit of the parent # this flips the above test, so that the main process runs a bit of the parent
# code, but only with minimal assertion # code, but only with minimal assertion
should "be able to tell a runner to run a test" do should "run a test in the background" do
pipe = Hydra::Pipe.new pipe = Hydra::Pipe.new
child = Process.fork do child = Process.fork do
run_the_runner(pipe) run_the_runner(pipe)
@ -33,6 +33,26 @@ class RunnerTest < Test::Unit::TestCase
request_a_file_and_verify_completion(pipe) request_a_file_and_verify_completion(pipe)
Process.wait(child) Process.wait(child)
end end
should "be able to run a runner over ssh" do
ssh = Hydra::SSH.new(
'localhost',
File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')),
"ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Runner.new(:io => Hydra::Stdio.new);\""
)
assert ssh.gets.is_a?(Hydra::Messages::Runner::RequestFile)
ssh.write(Hydra::Messages::Worker::RunFile.new(:file => test_file))
# grab its response. This makes us wait for it to finish
response = ssh.gets
# tell it to shut down
ssh.write(Hydra::Messages::Worker::Shutdown.new)
# ensure it ran
assert File.exists?(target_file)
assert_equal "HYDRA", File.read(target_file)
end
end end
module RunnerTestHelper module RunnerTestHelper
@ -56,7 +76,7 @@ class RunnerTest < Test::Unit::TestCase
def run_the_runner(pipe) def run_the_runner(pipe)
pipe.identify_as_child pipe.identify_as_child
Hydra::Runner.new({:io => pipe}) Hydra::Runner.new(:io => pipe)
end end
end end
include RunnerTestHelper include RunnerTestHelper

View File

@ -9,11 +9,11 @@ require 'hydra'
class Test::Unit::TestCase class Test::Unit::TestCase
def target_file def target_file
File.join(Dir.tmpdir, 'hydra_test.txt') File.expand_path(File.join(Dir.tmpdir, 'hydra_test.txt'))
end end
def test_file def test_file
File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb') File.expand_path(File.join(File.dirname(__FILE__), 'fixtures', 'write_file.rb'))
end end
end end