YAML config loading. SSH worker termination fixes.
This commit is contained in:
parent
e8e309a2c8
commit
2bee1dde2d
|
@ -0,0 +1,14 @@
|
||||||
|
class Hash
|
||||||
|
def stringify_keys
|
||||||
|
inject({}) do |options, (key, value)|
|
||||||
|
options[key.to_s] = value
|
||||||
|
options
|
||||||
|
end
|
||||||
|
end
|
||||||
|
def stringify_keys!
|
||||||
|
keys.each do |key|
|
||||||
|
self[key.to_s] = delete(key)
|
||||||
|
end
|
||||||
|
self
|
||||||
|
end
|
||||||
|
end
|
|
@ -1,3 +1,4 @@
|
||||||
|
require 'hydra/hash'
|
||||||
module Hydra #:nodoc:
|
module Hydra #:nodoc:
|
||||||
# Hydra class responsible for delegate work down to workers.
|
# Hydra class responsible for delegate work down to workers.
|
||||||
#
|
#
|
||||||
|
@ -15,16 +16,23 @@ module Hydra #:nodoc:
|
||||||
# * An array of hashes. Each hash should be the configuration options
|
# * An array of hashes. Each hash should be the configuration options
|
||||||
# for a worker.
|
# for a worker.
|
||||||
def initialize(opts = { })
|
def initialize(opts = { })
|
||||||
@files = opts.fetch(:files) { [] }
|
opts.stringify_keys!
|
||||||
|
config_file = opts.delete('config') { nil }
|
||||||
|
if config_file
|
||||||
|
opts.merge!(YAML.load_file(config_file).stringify_keys!)
|
||||||
|
end
|
||||||
|
@files = opts.fetch('files') { [] }
|
||||||
|
@incomplete_files = @files.dup
|
||||||
@workers = []
|
@workers = []
|
||||||
@listeners = []
|
@listeners = []
|
||||||
@verbose = opts.fetch(:verbose) { false }
|
@verbose = opts.fetch('verbose') { false }
|
||||||
# default is one worker that is configured to use a pipe with one runner
|
# default is one worker that is configured to use a pipe with one runner
|
||||||
worker_cfg = opts.fetch(:workers) {
|
worker_cfg = opts.fetch('workers') { [ { 'type' => 'local', 'runners' => 1} ] }
|
||||||
[ { :type => :local, :runners => 1} ]
|
|
||||||
}
|
|
||||||
|
|
||||||
$stdout.write "MASTER| Initialized\n" if @verbose
|
$stdout.write "MASTER| Initialized\n" if @verbose
|
||||||
|
$stdout.write "MASTER| Files: (#{@files.inspect})\n" if @verbose
|
||||||
|
$stdout.write "MASTER| Workers: (#{worker_cfg.inspect})\n" if @verbose
|
||||||
|
$stdout.write "MASTER| Verbose: (#{@verbose.inspect})\n" if @verbose
|
||||||
|
|
||||||
boot_workers worker_cfg
|
boot_workers worker_cfg
|
||||||
process_messages
|
process_messages
|
||||||
|
@ -36,43 +44,57 @@ module Hydra #:nodoc:
|
||||||
# worker down.
|
# worker down.
|
||||||
def send_file(worker)
|
def send_file(worker)
|
||||||
f = @files.pop
|
f = @files.pop
|
||||||
if f
|
worker[:io].write(RunFile.new(:file => f)) if f
|
||||||
worker[:io].write(RunFile.new(:file => f))
|
end
|
||||||
|
|
||||||
|
# Process the results coming back from the worker.
|
||||||
|
def process_results(worker, message)
|
||||||
|
$stdout.write message.output
|
||||||
|
# only delete one
|
||||||
|
@incomplete_files.delete_at(@incomplete_files.index(message.file))
|
||||||
|
$stdout.write "MASTER| #{@incomplete_files.size} Files Remaining\n" if @verbose
|
||||||
|
if @incomplete_files.empty?
|
||||||
|
shutdown_all_workers
|
||||||
else
|
else
|
||||||
worker[:io].write(Shutdown.new)
|
send_file(worker)
|
||||||
Thread.exit
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def boot_workers(workers)
|
def boot_workers(workers)
|
||||||
$stdout.write "MASTER| Booting workers\n" if @verbose
|
$stdout.write "MASTER| Booting #{workers.size} workers\n" if @verbose
|
||||||
workers.select{|worker| worker[:type] == :local}.each do |worker|
|
workers.each do |worker|
|
||||||
|
worker.stringify_keys!
|
||||||
|
$stdout.write "MASTER| worker opts #{worker.inspect}\n" if @verbose
|
||||||
|
type = worker.fetch('type') { 'local' }
|
||||||
|
if type.to_s == 'local'
|
||||||
boot_local_worker(worker)
|
boot_local_worker(worker)
|
||||||
end
|
elsif type.to_s == 'ssh'
|
||||||
workers.select{|worker| worker[:type] == :ssh}.each do |worker|
|
|
||||||
@workers << worker # will boot later, during the listening phase
|
@workers << worker # will boot later, during the listening phase
|
||||||
|
else
|
||||||
|
raise "Worker type not recognized: (#{type.to_s})"
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def boot_local_worker(worker)
|
def boot_local_worker(worker)
|
||||||
runners = worker.fetch(:runners) { raise "You must specify the number of runners" }
|
runners = worker.fetch('runners') { raise "You must specify the number of runners" }
|
||||||
$stdout.write "MASTER| Booting local worker\n" if @verbose
|
$stdout.write "MASTER| Booting local worker\n" if @verbose
|
||||||
pipe = Hydra::Pipe.new
|
pipe = Hydra::Pipe.new
|
||||||
child = Process.fork do
|
child = Process.fork do
|
||||||
pipe.identify_as_child
|
pipe.identify_as_child
|
||||||
Hydra::Worker.new(:io => pipe, :runners => runners)
|
Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose)
|
||||||
end
|
end
|
||||||
pipe.identify_as_parent
|
pipe.identify_as_parent
|
||||||
@workers << { :pid => child, :io => pipe, :idle => false, :type => :local }
|
@workers << { :pid => child, :io => pipe, :idle => false, :type => :local }
|
||||||
end
|
end
|
||||||
|
|
||||||
def boot_ssh_worker(worker)
|
def boot_ssh_worker(worker)
|
||||||
runners = worker.fetch(:runners) { raise "You must specify the number of runners" }
|
runners = worker.fetch('runners') { raise "You must specify the number of runners" }
|
||||||
connect = worker.fetch(:connect) { raise "You must specify SSH connection options" }
|
connect = worker.fetch('connect') { raise "You must specify SSH connection options" }
|
||||||
directory = worker.fetch(:directory) { raise "You must specify a remote directory" }
|
directory = worker.fetch('directory') { raise "You must specify a remote directory" }
|
||||||
command = worker.fetch(:command) {
|
command = worker.fetch('command') {
|
||||||
"ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\""
|
"ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,25 +103,34 @@ module Hydra #:nodoc:
|
||||||
return { :io => ssh, :idle => false, :type => :ssh }
|
return { :io => ssh, :idle => false, :type => :ssh }
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def shutdown_all_workers
|
||||||
|
$stdout.write "MASTER| Shutting down all workers\n" if @verbose
|
||||||
|
@workers.each do |worker|
|
||||||
|
worker[:io].write(Shutdown.new) if worker[:io]
|
||||||
|
worker[:io].close if worker[:io]
|
||||||
|
end
|
||||||
|
@listeners.each{|t| t.exit}
|
||||||
|
end
|
||||||
|
|
||||||
def process_messages
|
def process_messages
|
||||||
Thread.abort_on_exception = true
|
Thread.abort_on_exception = true
|
||||||
|
|
||||||
$stdout.write "MASTER| Processing Messages\n" if @verbose
|
$stdout.write "MASTER| Processing Messages\n" if @verbose
|
||||||
$stdout.write "MASTER| Workers: #{@workers}\n" if @verbose
|
$stdout.write "MASTER| Workers: #{@workers.inspect}\n" if @verbose
|
||||||
@workers.each do |worker|
|
@workers.each do |worker|
|
||||||
@listeners << Thread.new do
|
@listeners << Thread.new do
|
||||||
$stdout.write "MASTER| Listening to #{worker.inspect}\n" if @verbose
|
$stdout.write "MASTER| Listening to #{worker.inspect}\n" if @verbose
|
||||||
worker = boot_ssh_worker(worker) if worker.fetch(:type){ :local } == :ssh
|
if worker.fetch('type') { 'local' }.to_s == 'ssh'
|
||||||
|
worker = boot_ssh_worker(worker)
|
||||||
|
@workers << worker
|
||||||
|
end
|
||||||
while true
|
while true
|
||||||
begin
|
begin
|
||||||
$stdout.write "MASTER| listen....\n" if @verbose
|
|
||||||
message = worker[:io].gets
|
message = worker[:io].gets
|
||||||
$stdout.write "MASTER| got message: #{message}\n" if @verbose
|
$stdout.write "MASTER| got message: #{message}\n" if @verbose
|
||||||
message.handle(self, worker) if message
|
message.handle(self, worker) if message
|
||||||
rescue IOError => ex
|
rescue IOError
|
||||||
$stderr.write "Master lost Worker [#{worker.inspect}]\n"
|
$stderr.write "MASTER| lost Worker [#{worker.inspect}]\n" if @verbose
|
||||||
worker[:io].close
|
|
||||||
@workers.delete(worker)
|
|
||||||
Thread.exit
|
Thread.exit
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -30,8 +30,7 @@ module Hydra #:nodoc:
|
||||||
# Message relaying the results of a worker up to the master
|
# Message relaying the results of a worker up to the master
|
||||||
class Results < Hydra::Messages::Runner::Results
|
class Results < Hydra::Messages::Runner::Results
|
||||||
def handle(master, worker) #:nodoc:
|
def handle(master, worker) #:nodoc:
|
||||||
$stdout.write output
|
master.process_results(worker, self)
|
||||||
master.send_file(worker)
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -102,6 +102,7 @@ module Hydra #:nodoc:
|
||||||
$stdout.write " | #{message.inspect}\n" if @verbose
|
$stdout.write " | #{message.inspect}\n" if @verbose
|
||||||
message.handle(self)
|
message.handle(self)
|
||||||
else
|
else
|
||||||
|
$stdout.write "WORKER| Nothing from Master, Pinging\n" if @verbose
|
||||||
@io.write Ping.new
|
@io.write Ping.new
|
||||||
end
|
end
|
||||||
rescue IOError => ex
|
rescue IOError => ex
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
---
|
||||||
|
workers:
|
||||||
|
- type: local
|
||||||
|
runners: 2
|
|
@ -21,9 +21,7 @@ class MasterTest < Test::Unit::TestCase
|
||||||
should "run a test 6 times on 1 worker with 2 runners" do
|
should "run a test 6 times on 1 worker with 2 runners" do
|
||||||
Hydra::Master.new(
|
Hydra::Master.new(
|
||||||
:files => [test_file]*6,
|
:files => [test_file]*6,
|
||||||
:local => {
|
:workers => [ { :type => :local, :runners => 2 } ]
|
||||||
:runners => 2
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
assert File.exists?(target_file)
|
assert File.exists?(target_file)
|
||||||
assert_equal "HYDRA"*6, File.read(target_file)
|
assert_equal "HYDRA"*6, File.read(target_file)
|
||||||
|
@ -73,5 +71,14 @@ class MasterTest < Test::Unit::TestCase
|
||||||
assert File.exists?(target_file)
|
assert File.exists?(target_file)
|
||||||
assert_equal "HYDRA", File.read(target_file)
|
assert_equal "HYDRA", File.read(target_file)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
should "run a test with config from a yaml file" do
|
||||||
|
Hydra::Master.new(
|
||||||
|
:files => [test_file],
|
||||||
|
:config => File.join(File.dirname(__FILE__), 'fixtures', 'config.yml')
|
||||||
|
)
|
||||||
|
assert File.exists?(target_file)
|
||||||
|
assert_equal "HYDRA", File.read(target_file)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue