refactored Sync object to own remote worker settings

This commit is contained in:
Sean Kirby 2010-05-07 14:41:17 -04:00
parent 7821852a52
commit 75cca89d3f
2 changed files with 38 additions and 52 deletions

View File

@ -135,26 +135,18 @@ module Hydra #:nodoc:
end end
def boot_ssh_worker(worker) def boot_ssh_worker(worker)
Sync.new worker, @sync, @verbose sync = Sync.new(worker, @sync, @verbose)
connect, ssh_opts, directory = self.class.remote_connection_opts(worker)
runners = worker.fetch('runners') { raise "You must specify the number of runners" } runners = worker.fetch('runners') { raise "You must specify the number of runners" }
command = worker.fetch('command') { command = worker.fetch('command') {
"ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\"" "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\""
} }
trace "Booting SSH worker" trace "Booting SSH worker"
ssh = Hydra::SSH.new("#{ssh_opts} #{connect}", directory, command) ssh = Hydra::SSH.new("#{sync.ssh_opts} #{sync.connect}", sync.remote_dir, command)
return { :io => ssh, :idle => false, :type => :ssh } return { :io => ssh, :idle => false, :type => :ssh }
end end
def self.remote_connection_opts worker_opts
connect = worker_opts.fetch('connect') { raise "You must specify an SSH connection target" }
ssh_opts = worker_opts.fetch('ssh_opts') { "" }
directory = worker_opts.fetch('directory') { raise "You must specify a remote directory" }
[connect, ssh_opts, directory]
end
def shutdown_all_workers def shutdown_all_workers
trace "Shutting down all workers" trace "Shutting down all workers"
@workers.each do |worker| @workers.each do |worker|

View File

@ -5,54 +5,52 @@ module Hydra #:nodoc:
# The Sync is run once for each remote worker. # The Sync is run once for each remote worker.
class Sync class Sync
traceable('SYNC') traceable('SYNC')
class << self self.class.traceable('SYNC MANY')
traceable('SYNC')
end attr_reader :connect, :ssh_opts, :remote_dir
# Create a new Sync instance to rsync source from the local machine to a remote worker # Create a new Sync instance to rsync source from the local machine to a remote worker
# #
# Arguments: # Arguments:
# * :worker # * :worker_opts
# * A hash of the configuration options for a worker. # * A hash of the configuration options for a worker.
# * :sync # * :sync
# * A hash of settings specifically for copying the source directory to be tested # * A hash of settings specifically for copying the source directory to be tested
# to the remote worked # to the remote worked
# * :verbose # * :verbose
# * Set to true to see lots of Hydra output (for debugging) # * Set to true to see lots of Hydra output (for debugging)
def initialize(worker, sync, verbose = false) def initialize(worker_opts, sync_opts, verbose = false)
worker_opts ||= {}
worker_opts.stringify_keys!
@verbose = verbose @verbose = verbose
@connect = worker_opts.fetch('connect') { raise "You must specify an SSH connection target" }
@ssh_opts = worker_opts.fetch('ssh_opts') { "" }
@remote_dir = worker_opts.fetch('directory') { raise "You must specify a remote directory" }
return unless sync_opts
sync_opts.stringify_keys!
@local_dir = sync_opts.fetch('directory') { raise "You must specify a synchronization directory" }
@exclude_paths = sync_opts.fetch('exclude') { [] }
trace "Initialized" trace "Initialized"
trace " Worker: (#{worker.inspect})" trace " Worker: (#{worker_opts.inspect})"
trace " Sync: (#{sync.inspect})" trace " Sync: (#{sync_opts.inspect})"
trace " Verbose: (#{@verbose.inspect})"
sync(worker, sync) sync
end end
def sync worker_opts, sync_opts def sync
return unless sync_opts and worker_opts #trace "Synchronizing with #{connect}\n\t#{sync_opts.inspect}"
sync_opts.stringify_keys! exclude_opts = @exclude_paths.inject(''){|memo, path| memo += "--exclude=#{path} "}
worker_opts.stringify_keys!
@verbose = sync_opts.fetch('verbose') { false }
connect, ssh_opts, directory = Master.remote_connection_opts(worker_opts)
trace "Synchronizing with #{connect}\n\t#{sync_opts.inspect}"
local_dir = sync_opts.fetch('directory') {
raise "You must specify a synchronization directory"
}
exclude_paths = sync_opts.fetch('exclude') { [] }
exclude_opts = exclude_paths.inject(''){|memo, path| memo += "--exclude=#{path} "}
rsync_command = [ rsync_command = [
'rsync', 'rsync',
'-avz', '-avz',
'--delete', '--delete',
exclude_opts, exclude_opts,
File.expand_path(local_dir)+'/', File.expand_path(@local_dir)+'/',
"-e \"ssh #{ssh_opts}\"", "-e \"ssh #{@ssh_opts}\"",
"#{connect}:#{directory}" "#{@connect}:#{@remote_dir}"
].join(" ") ].join(" ")
trace rsync_command trace rsync_command
trace `#{rsync_command}` trace `#{rsync_command}`
@ -65,35 +63,31 @@ module Hydra #:nodoc:
opts.merge!(YAML.load_file(config_file).stringify_keys!) opts.merge!(YAML.load_file(config_file).stringify_keys!)
end end
@verbose = opts.fetch('verbose') { false } @verbose = opts.fetch('verbose') { false }
@sync = opts.fetch('sync') { nil } @sync = opts.fetch('sync') { {} }
# default is one worker that is configured to use a pipe with one runner workers_opts = opts.fetch('workers') { [] }
worker_opts = opts.fetch('workers') { [ { 'type' => 'local', 'runners' => 1} ] } @remote_worker_opts = []
@workers = [] workers_opts.each do |worker_opts|
worker_opts.each do |worker| worker_opts.stringify_keys!
worker.stringify_keys! if worker_opts['type'].to_s == 'ssh'
trace "worker opts #{worker.inspect}" @remote_worker_opts << worker_opts
type = worker.fetch('type') { 'local' }
if type.to_s == 'ssh'
@workers << worker
end end
end end
trace "Initialized" trace "Initialized"
trace " Sync: (#{@sync.inspect})" trace " Sync: (#{@sync.inspect})"
trace " Workers: (#{@workers.inspect})" trace " Workers: (#{@remote_worker_opts.inspect})"
trace " Verbose: (#{@verbose.inspect})"
Thread.abort_on_exception = true Thread.abort_on_exception = true
trace "Processing workers" trace "Processing workers"
@listeners = [] @listeners = []
@workers.each do |worker| @remote_worker_opts.each do |worker_opts|
@listeners << Thread.new do @listeners << Thread.new do
begin begin
trace "Syncing #{worker.inspect}" trace "Syncing #{worker_opts.inspect}"
Sync.new worker, @sync, @verbose Sync.new worker_opts, @sync, @verbose
rescue rescue
trace "Syncing failed [#{worker.inspect}]" trace "Syncing failed [#{worker_opts.inspect}]"
Thread.exit Thread.exit
end end
end end