diff --git a/hydra.gemspec b/hydra.gemspec index 764670a..ed0266a 100644 --- a/hydra.gemspec +++ b/hydra.gemspec @@ -50,6 +50,7 @@ Gem::Specification.new do |s| "lib/hydra/spec/hydra_formatter.rb", "lib/hydra/ssh.rb", "lib/hydra/stdio.rb", + "lib/hydra/sync.rb", "lib/hydra/tasks.rb", "lib/hydra/trace.rb", "lib/hydra/worker.rb", @@ -70,6 +71,7 @@ Gem::Specification.new do |s| "test/pipe_test.rb", "test/runner_test.rb", "test/ssh_test.rb", + "test/sync_test.rb", "test/test_helper.rb", "test/worker_test.rb" ] @@ -94,6 +96,7 @@ Gem::Specification.new do |s| "test/test_helper.rb", "test/master_test.rb", "test/runner_test.rb", + "test/sync_test.rb", "test/worker_test.rb" ] diff --git a/lib/hydra.rb b/lib/hydra.rb index 2b61522..c8b7abf 100644 --- a/lib/hydra.rb +++ b/lib/hydra.rb @@ -7,6 +7,7 @@ require 'hydra/safe_fork' require 'hydra/runner' require 'hydra/worker' require 'hydra/master' +require 'hydra/sync' require 'hydra/listener/abstract' require 'hydra/listener/minimal_output' require 'hydra/listener/report_generator' diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb index 76891b2..9c62334 100644 --- a/lib/hydra/master.rb +++ b/lib/hydra/master.rb @@ -10,6 +10,7 @@ module Hydra #:nodoc: include Hydra::Messages::Master include Open3 traceable('MASTER') + # Create a new Master # # Options: @@ -134,41 +135,26 @@ module Hydra #:nodoc: end def boot_ssh_worker(worker) + Sync.new worker, @sync, @verbose + + connect, ssh_opts, directory = self.class.remote_connection_opts(worker) runners = worker.fetch('runners') { raise "You must specify the number of runners" } - connect = worker.fetch('connect') { raise "You must specify an SSH connection target" } - ssh_opts = worker.fetch('ssh_opts') { "" } - directory = worker.fetch('directory') { raise "You must specify a remote directory" } command = worker.fetch('command') { "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\"" } - if @sync - @sync.stringify_keys! - trace "Synchronizing with #{connect}\n\t#{@sync.inspect}" - local_dir = @sync.fetch('directory') { - raise "You must specify a synchronization directory" - } - exclude_paths = @sync.fetch('exclude') { [] } - exclude_opts = exclude_paths.inject(''){|memo, path| memo += "--exclude=#{path} "} - - rsync_command = [ - 'rsync', - '-avz', - '--delete', - exclude_opts, - File.expand_path(local_dir)+'/', - "-e \"ssh #{ssh_opts}\"", - "#{connect}:#{directory}" - ].join(" ") - trace rsync_command - trace `#{rsync_command}` - end - trace "Booting SSH worker" ssh = Hydra::SSH.new("#{ssh_opts} #{connect}", directory, command) return { :io => ssh, :idle => false, :type => :ssh } end + def self.remote_connection_opts worker_opts + connect = worker_opts.fetch('connect') { raise "You must specify an SSH connection target" } + ssh_opts = worker_opts.fetch('ssh_opts') { "" } + directory = worker_opts.fetch('directory') { raise "You must specify a remote directory" } + [connect, ssh_opts, directory] + end + def shutdown_all_workers trace "Shutting down all workers" @workers.each do |worker| diff --git a/lib/hydra/sync.rb b/lib/hydra/sync.rb new file mode 100644 index 0000000..479f141 --- /dev/null +++ b/lib/hydra/sync.rb @@ -0,0 +1,106 @@ +require 'yaml' +module Hydra #:nodoc: + # Hydra class responsible for delegate work down to workers. + # + # The Sync is run once for each remote worker. + class Sync + traceable('SYNC') + class << self + traceable('SYNC') + end + + # Create a new Sync instance to rsync source from the local machine to a remote worker + # + # Arguments: + # * :worker + # * A hash of the configuration options for a worker. + # * :sync + # * A hash of settings specifically for copying the source directory to be tested + # to the remote worked + # * :verbose + # * Set to true to see lots of Hydra output (for debugging) + def initialize(worker, sync, verbose = false) + @verbose = verbose + + trace "Initialized" + trace " Worker: (#{worker.inspect})" + trace " Sync: (#{sync.inspect})" + trace " Verbose: (#{@verbose.inspect})" + + sync(worker, sync) + end + + def sync worker_opts, sync_opts + return unless sync_opts and worker_opts + sync_opts.stringify_keys! + worker_opts.stringify_keys! + @verbose = sync_opts.fetch('verbose') { false } + + connect, ssh_opts, directory = Master.remote_connection_opts(worker_opts) + + trace "Synchronizing with #{connect}\n\t#{sync_opts.inspect}" + local_dir = sync_opts.fetch('directory') { + raise "You must specify a synchronization directory" + } + exclude_paths = sync_opts.fetch('exclude') { [] } + exclude_opts = exclude_paths.inject(''){|memo, path| memo += "--exclude=#{path} "} + + rsync_command = [ + 'rsync', + '-avz', + '--delete', + exclude_opts, + File.expand_path(local_dir)+'/', + "-e \"ssh #{ssh_opts}\"", + "#{connect}:#{directory}" + ].join(" ") + trace rsync_command + trace `#{rsync_command}` + end + + def self.sync_many opts + opts.stringify_keys! + config_file = opts.delete('config') { nil } + if config_file + opts.merge!(YAML.load_file(config_file).stringify_keys!) + end + @verbose = opts.fetch('verbose') { false } + @sync = opts.fetch('sync') { nil } + + # default is one worker that is configured to use a pipe with one runner + worker_opts = opts.fetch('workers') { [ { 'type' => 'local', 'runners' => 1} ] } + @workers = [] + worker_opts.each do |worker| + worker.stringify_keys! + trace "worker opts #{worker.inspect}" + type = worker.fetch('type') { 'local' } + if type.to_s == 'ssh' + @workers << worker + end + end + + trace "Initialized" + trace " Sync: (#{@sync.inspect})" + trace " Workers: (#{@workers.inspect})" + trace " Verbose: (#{@verbose.inspect})" + + Thread.abort_on_exception = true + trace "Processing workers" + @listeners = [] + @workers.each do |worker| + @listeners << Thread.new do + begin + trace "Syncing #{worker.inspect}" + Sync.new worker, @sync, @verbose + rescue + trace "Syncing failed [#{worker.inspect}]" + Thread.exit + end + end + end + + @listeners.each{|l| l.join} + end + + end +end diff --git a/lib/hydra/tasks.rb b/lib/hydra/tasks.rb index 0365dcc..80ca5b7 100644 --- a/lib/hydra/tasks.rb +++ b/lib/hydra/tasks.rb @@ -103,6 +103,43 @@ module Hydra #:nodoc: end end + # Define a sync task that uses hydra to rsync the source tree under test to remote workers. + # + # This task is very useful to run before a remote db:reset task to make sure the db/schema.rb + # file is up to date on the remote workers. + # + # Hydra::SyncTask.new('hydra:sync') do |t| + # t.verbose = false # optionally set to true for lots of debug messages + # end + class SyncTask < Hydra::Task + + # Create a new SyncTestTask + def initialize(name = :sync) + @name = name + @verbose = false + + yield self if block_given? + + @config = find_config_file + + @opts = { + :verbose => @verbose + } + @opts.merge!(:config => @config) if @config + + define + end + + private + # Create the rake task defined by this HydraSyncTask + def define + desc "Hydra Tests" + (@name == :hydra ? "" : " for #{@name}") + task @name do + Hydra::Sync.sync_many(@opts) + end + end + end + # Setup a task that will be run across all remote workers # Hydra::RemoteTask.new('db:reset') # diff --git a/test/sync_test.rb b/test/sync_test.rb new file mode 100644 index 0000000..f98a435 --- /dev/null +++ b/test/sync_test.rb @@ -0,0 +1,113 @@ +require File.join(File.dirname(__FILE__), 'test_helper') + +class SyncTest < Test::Unit::TestCase + context "with a file to test and a destination to verify" do + setup do + # avoid having other tests interfering with us + sleep(0.2) + #FileUtils.rm_f(target_file) + end + + teardown do + #FileUtils.rm_f(target_file) + end + + should "synchronize a test file over ssh with rsync" do + local = File.join(Dir.tmpdir, 'hydra', 'local') + remote = File.join(Dir.tmpdir, 'hydra', 'remote') + sync_test = File.join(File.dirname(__FILE__), 'fixtures', 'sync_test.rb') + [local, remote].each{|f| FileUtils.rm_rf f; FileUtils.mkdir_p f} + + # setup the folders: + # local: + # - test_a + # - test_c + # remote: + # - test_b + # + # add test_c to exludes + FileUtils.cp(sync_test, File.join(local, 'test_a.rb')) + FileUtils.cp(sync_test, File.join(local, 'test_c.rb')) + FileUtils.cp(sync_test, File.join(remote, 'test_b.rb')) + + # ensure a is not on remote + assert !File.exists?(File.join(remote, 'test_a.rb')), "A should not be on remote" + # ensure c is not on remote + assert !File.exists?(File.join(remote, 'test_c.rb')), "C should not be on remote" + # ensure b is on remote + assert File.exists?(File.join(remote, 'test_b.rb')), "B should be on remote" + + Hydra::Sync.new( + { + :type => :ssh, + :connect => 'localhost', + :directory => remote, + :runners => 1 + }, + { + :directory => local, + :exclude => ['test_c.rb'] + } + ) + # ensure a is copied + assert File.exists?(File.join(remote, 'test_a.rb')), "A was not copied" + # ensure c is not copied + assert !File.exists?(File.join(remote, 'test_c.rb')), "C was copied, should be excluded" + # ensure b is deleted + assert !File.exists?(File.join(remote, 'test_b.rb')), "B was not deleted" + end + + should "synchronize a test file over ssh with rsync to multiple workers" do + local = File.join(Dir.tmpdir, 'hydra', 'local') + remote_a = File.join(Dir.tmpdir, 'hydra', 'remote_a') + remote_b = File.join(Dir.tmpdir, 'hydra', 'remote_b') + sync_test = File.join(File.dirname(__FILE__), 'fixtures', 'sync_test.rb') + [local, remote_a, remote_b].each{|f| FileUtils.rm_rf f; FileUtils.mkdir_p f} + + # setup the folders: + # local: + # - test_a + # remote_a: + # - test_b + # remote_b: + # - test_c + # + # add test_c to exludes + FileUtils.cp(sync_test, File.join(local, 'test_a.rb')) + FileUtils.cp(sync_test, File.join(remote_a, 'test_b.rb')) + FileUtils.cp(sync_test, File.join(remote_b, 'test_c.rb')) + + # ensure a is not on remotes + assert !File.exists?(File.join(remote_a, 'test_a.rb')), "A should not be on remote_a" + assert !File.exists?(File.join(remote_b, 'test_a.rb')), "A should not be on remote_b" + # ensure b is on remote_a + assert File.exists?(File.join(remote_a, 'test_b.rb')), "B should be on remote_a" + # ensure c is on remote_b + assert File.exists?(File.join(remote_b, 'test_c.rb')), "C should be on remote_b" + + Hydra::Sync.sync_many( + :workers => [{ + :type => :ssh, + :connect => 'localhost', + :directory => remote_a, + :runners => 1 + }, + { + :type => :ssh, + :connect => 'localhost', + :directory => remote_b, + :runners => 1 + }], + :sync => { + :directory => local + } + ) + # ensure a is copied to both remotes + assert File.exists?(File.join(remote_a, 'test_a.rb')), "A was not copied to remote_a" + assert File.exists?(File.join(remote_b, 'test_a.rb')), "A was not copied to remote_b" + # ensure b and c are deleted from remotes + assert !File.exists?(File.join(remote_a, 'test_b.rb')), "B was not deleted from remote_a" + assert !File.exists?(File.join(remote_b, 'test_c.rb')), "C was not deleted from remote_b" + end + end +end