From 75cca89d3f19de4f260f9eb4ea7c0979ed1601be Mon Sep 17 00:00:00 2001 From: Sean Kirby Date: Fri, 7 May 2010 14:41:17 -0400 Subject: [PATCH] refactored Sync object to own remote worker settings --- lib/hydra/master.rb | 12 ++----- lib/hydra/sync.rb | 78 +++++++++++++++++++++------------------------ 2 files changed, 38 insertions(+), 52 deletions(-) diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb index 9c62334..dc1d850 100644 --- a/lib/hydra/master.rb +++ b/lib/hydra/master.rb @@ -135,26 +135,18 @@ module Hydra #:nodoc: end def boot_ssh_worker(worker) - Sync.new worker, @sync, @verbose + sync = Sync.new(worker, @sync, @verbose) - connect, ssh_opts, directory = self.class.remote_connection_opts(worker) runners = worker.fetch('runners') { raise "You must specify the number of runners" } command = worker.fetch('command') { "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\"" } trace "Booting SSH worker" - ssh = Hydra::SSH.new("#{ssh_opts} #{connect}", directory, command) + ssh = Hydra::SSH.new("#{sync.ssh_opts} #{sync.connect}", sync.remote_dir, command) return { :io => ssh, :idle => false, :type => :ssh } end - def self.remote_connection_opts worker_opts - connect = worker_opts.fetch('connect') { raise "You must specify an SSH connection target" } - ssh_opts = worker_opts.fetch('ssh_opts') { "" } - directory = worker_opts.fetch('directory') { raise "You must specify a remote directory" } - [connect, ssh_opts, directory] - end - def shutdown_all_workers trace "Shutting down all workers" @workers.each do |worker| diff --git a/lib/hydra/sync.rb b/lib/hydra/sync.rb index 479f141..f906a63 100644 --- a/lib/hydra/sync.rb +++ b/lib/hydra/sync.rb @@ -5,54 +5,52 @@ module Hydra #:nodoc: # The Sync is run once for each remote worker. class Sync traceable('SYNC') - class << self - traceable('SYNC') - end + self.class.traceable('SYNC MANY') + + attr_reader :connect, :ssh_opts, :remote_dir # Create a new Sync instance to rsync source from the local machine to a remote worker # # Arguments: - # * :worker + # * :worker_opts # * A hash of the configuration options for a worker. # * :sync # * A hash of settings specifically for copying the source directory to be tested # to the remote worked # * :verbose # * Set to true to see lots of Hydra output (for debugging) - def initialize(worker, sync, verbose = false) + def initialize(worker_opts, sync_opts, verbose = false) + worker_opts ||= {} + worker_opts.stringify_keys! @verbose = verbose + @connect = worker_opts.fetch('connect') { raise "You must specify an SSH connection target" } + @ssh_opts = worker_opts.fetch('ssh_opts') { "" } + @remote_dir = worker_opts.fetch('directory') { raise "You must specify a remote directory" } + + return unless sync_opts + sync_opts.stringify_keys! + @local_dir = sync_opts.fetch('directory') { raise "You must specify a synchronization directory" } + @exclude_paths = sync_opts.fetch('exclude') { [] } trace "Initialized" - trace " Worker: (#{worker.inspect})" - trace " Sync: (#{sync.inspect})" - trace " Verbose: (#{@verbose.inspect})" + trace " Worker: (#{worker_opts.inspect})" + trace " Sync: (#{sync_opts.inspect})" - sync(worker, sync) + sync end - def sync worker_opts, sync_opts - return unless sync_opts and worker_opts - sync_opts.stringify_keys! - worker_opts.stringify_keys! - @verbose = sync_opts.fetch('verbose') { false } - - connect, ssh_opts, directory = Master.remote_connection_opts(worker_opts) - - trace "Synchronizing with #{connect}\n\t#{sync_opts.inspect}" - local_dir = sync_opts.fetch('directory') { - raise "You must specify a synchronization directory" - } - exclude_paths = sync_opts.fetch('exclude') { [] } - exclude_opts = exclude_paths.inject(''){|memo, path| memo += "--exclude=#{path} "} + def sync + #trace "Synchronizing with #{connect}\n\t#{sync_opts.inspect}" + exclude_opts = @exclude_paths.inject(''){|memo, path| memo += "--exclude=#{path} "} rsync_command = [ 'rsync', '-avz', '--delete', exclude_opts, - File.expand_path(local_dir)+'/', - "-e \"ssh #{ssh_opts}\"", - "#{connect}:#{directory}" + File.expand_path(@local_dir)+'/', + "-e \"ssh #{@ssh_opts}\"", + "#{@connect}:#{@remote_dir}" ].join(" ") trace rsync_command trace `#{rsync_command}` @@ -65,35 +63,31 @@ module Hydra #:nodoc: opts.merge!(YAML.load_file(config_file).stringify_keys!) end @verbose = opts.fetch('verbose') { false } - @sync = opts.fetch('sync') { nil } + @sync = opts.fetch('sync') { {} } - # default is one worker that is configured to use a pipe with one runner - worker_opts = opts.fetch('workers') { [ { 'type' => 'local', 'runners' => 1} ] } - @workers = [] - worker_opts.each do |worker| - worker.stringify_keys! - trace "worker opts #{worker.inspect}" - type = worker.fetch('type') { 'local' } - if type.to_s == 'ssh' - @workers << worker + workers_opts = opts.fetch('workers') { [] } + @remote_worker_opts = [] + workers_opts.each do |worker_opts| + worker_opts.stringify_keys! + if worker_opts['type'].to_s == 'ssh' + @remote_worker_opts << worker_opts end end trace "Initialized" trace " Sync: (#{@sync.inspect})" - trace " Workers: (#{@workers.inspect})" - trace " Verbose: (#{@verbose.inspect})" + trace " Workers: (#{@remote_worker_opts.inspect})" Thread.abort_on_exception = true trace "Processing workers" @listeners = [] - @workers.each do |worker| + @remote_worker_opts.each do |worker_opts| @listeners << Thread.new do begin - trace "Syncing #{worker.inspect}" - Sync.new worker, @sync, @verbose + trace "Syncing #{worker_opts.inspect}" + Sync.new worker_opts, @sync, @verbose rescue - trace "Syncing failed [#{worker.inspect}]" + trace "Syncing failed [#{worker_opts.inspect}]" Thread.exit end end