refactored out the Rsync functionality of the Master into the Sync class and added a rake task to invoke syncing

This commit is contained in:
Sean Kirby 2010-04-30 19:03:00 -04:00 committed by Tanzeeb Khalili
parent 60c2d7344b
commit 7821852a52
6 changed files with 271 additions and 25 deletions

View File

@ -50,6 +50,7 @@ Gem::Specification.new do |s|
"lib/hydra/spec/hydra_formatter.rb",
"lib/hydra/ssh.rb",
"lib/hydra/stdio.rb",
"lib/hydra/sync.rb",
"lib/hydra/tasks.rb",
"lib/hydra/trace.rb",
"lib/hydra/worker.rb",
@ -70,6 +71,7 @@ Gem::Specification.new do |s|
"test/pipe_test.rb",
"test/runner_test.rb",
"test/ssh_test.rb",
"test/sync_test.rb",
"test/test_helper.rb",
"test/worker_test.rb"
]
@ -94,6 +96,7 @@ Gem::Specification.new do |s|
"test/test_helper.rb",
"test/master_test.rb",
"test/runner_test.rb",
"test/sync_test.rb",
"test/worker_test.rb"
]

View File

@ -7,6 +7,7 @@ require 'hydra/safe_fork'
require 'hydra/runner'
require 'hydra/worker'
require 'hydra/master'
require 'hydra/sync'
require 'hydra/listener/abstract'
require 'hydra/listener/minimal_output'
require 'hydra/listener/report_generator'

View File

@ -10,6 +10,7 @@ module Hydra #:nodoc:
include Hydra::Messages::Master
include Open3
traceable('MASTER')
# Create a new Master
#
# Options:
@ -134,41 +135,26 @@ module Hydra #:nodoc:
end
def boot_ssh_worker(worker)
Sync.new worker, @sync, @verbose
connect, ssh_opts, directory = self.class.remote_connection_opts(worker)
runners = worker.fetch('runners') { raise "You must specify the number of runners" }
connect = worker.fetch('connect') { raise "You must specify an SSH connection target" }
ssh_opts = worker.fetch('ssh_opts') { "" }
directory = worker.fetch('directory') { raise "You must specify a remote directory" }
command = worker.fetch('command') {
"ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Worker.new(:io => Hydra::Stdio.new, :runners => #{runners}, :verbose => #{@verbose});\""
}
if @sync
@sync.stringify_keys!
trace "Synchronizing with #{connect}\n\t#{@sync.inspect}"
local_dir = @sync.fetch('directory') {
raise "You must specify a synchronization directory"
}
exclude_paths = @sync.fetch('exclude') { [] }
exclude_opts = exclude_paths.inject(''){|memo, path| memo += "--exclude=#{path} "}
rsync_command = [
'rsync',
'-avz',
'--delete',
exclude_opts,
File.expand_path(local_dir)+'/',
"-e \"ssh #{ssh_opts}\"",
"#{connect}:#{directory}"
].join(" ")
trace rsync_command
trace `#{rsync_command}`
end
trace "Booting SSH worker"
ssh = Hydra::SSH.new("#{ssh_opts} #{connect}", directory, command)
return { :io => ssh, :idle => false, :type => :ssh }
end
def self.remote_connection_opts worker_opts
connect = worker_opts.fetch('connect') { raise "You must specify an SSH connection target" }
ssh_opts = worker_opts.fetch('ssh_opts') { "" }
directory = worker_opts.fetch('directory') { raise "You must specify a remote directory" }
[connect, ssh_opts, directory]
end
def shutdown_all_workers
trace "Shutting down all workers"
@workers.each do |worker|

106
lib/hydra/sync.rb Normal file
View File

@ -0,0 +1,106 @@
require 'yaml'
module Hydra #:nodoc:
# Hydra class responsible for delegate work down to workers.
#
# The Sync is run once for each remote worker.
class Sync
traceable('SYNC')
class << self
traceable('SYNC')
end
# Create a new Sync instance to rsync source from the local machine to a remote worker
#
# Arguments:
# * :worker
# * A hash of the configuration options for a worker.
# * :sync
# * A hash of settings specifically for copying the source directory to be tested
# to the remote worked
# * :verbose
# * Set to true to see lots of Hydra output (for debugging)
def initialize(worker, sync, verbose = false)
@verbose = verbose
trace "Initialized"
trace " Worker: (#{worker.inspect})"
trace " Sync: (#{sync.inspect})"
trace " Verbose: (#{@verbose.inspect})"
sync(worker, sync)
end
def sync worker_opts, sync_opts
return unless sync_opts and worker_opts
sync_opts.stringify_keys!
worker_opts.stringify_keys!
@verbose = sync_opts.fetch('verbose') { false }
connect, ssh_opts, directory = Master.remote_connection_opts(worker_opts)
trace "Synchronizing with #{connect}\n\t#{sync_opts.inspect}"
local_dir = sync_opts.fetch('directory') {
raise "You must specify a synchronization directory"
}
exclude_paths = sync_opts.fetch('exclude') { [] }
exclude_opts = exclude_paths.inject(''){|memo, path| memo += "--exclude=#{path} "}
rsync_command = [
'rsync',
'-avz',
'--delete',
exclude_opts,
File.expand_path(local_dir)+'/',
"-e \"ssh #{ssh_opts}\"",
"#{connect}:#{directory}"
].join(" ")
trace rsync_command
trace `#{rsync_command}`
end
def self.sync_many opts
opts.stringify_keys!
config_file = opts.delete('config') { nil }
if config_file
opts.merge!(YAML.load_file(config_file).stringify_keys!)
end
@verbose = opts.fetch('verbose') { false }
@sync = opts.fetch('sync') { nil }
# default is one worker that is configured to use a pipe with one runner
worker_opts = opts.fetch('workers') { [ { 'type' => 'local', 'runners' => 1} ] }
@workers = []
worker_opts.each do |worker|
worker.stringify_keys!
trace "worker opts #{worker.inspect}"
type = worker.fetch('type') { 'local' }
if type.to_s == 'ssh'
@workers << worker
end
end
trace "Initialized"
trace " Sync: (#{@sync.inspect})"
trace " Workers: (#{@workers.inspect})"
trace " Verbose: (#{@verbose.inspect})"
Thread.abort_on_exception = true
trace "Processing workers"
@listeners = []
@workers.each do |worker|
@listeners << Thread.new do
begin
trace "Syncing #{worker.inspect}"
Sync.new worker, @sync, @verbose
rescue
trace "Syncing failed [#{worker.inspect}]"
Thread.exit
end
end
end
@listeners.each{|l| l.join}
end
end
end

View File

@ -103,6 +103,43 @@ module Hydra #:nodoc:
end
end
# Define a sync task that uses hydra to rsync the source tree under test to remote workers.
#
# This task is very useful to run before a remote db:reset task to make sure the db/schema.rb
# file is up to date on the remote workers.
#
# Hydra::SyncTask.new('hydra:sync') do |t|
# t.verbose = false # optionally set to true for lots of debug messages
# end
class SyncTask < Hydra::Task
# Create a new SyncTestTask
def initialize(name = :sync)
@name = name
@verbose = false
yield self if block_given?
@config = find_config_file
@opts = {
:verbose => @verbose
}
@opts.merge!(:config => @config) if @config
define
end
private
# Create the rake task defined by this HydraSyncTask
def define
desc "Hydra Tests" + (@name == :hydra ? "" : " for #{@name}")
task @name do
Hydra::Sync.sync_many(@opts)
end
end
end
# Setup a task that will be run across all remote workers
# Hydra::RemoteTask.new('db:reset')
#

113
test/sync_test.rb Normal file
View File

@ -0,0 +1,113 @@
require File.join(File.dirname(__FILE__), 'test_helper')
class SyncTest < Test::Unit::TestCase
context "with a file to test and a destination to verify" do
setup do
# avoid having other tests interfering with us
sleep(0.2)
#FileUtils.rm_f(target_file)
end
teardown do
#FileUtils.rm_f(target_file)
end
should "synchronize a test file over ssh with rsync" do
local = File.join(Dir.tmpdir, 'hydra', 'local')
remote = File.join(Dir.tmpdir, 'hydra', 'remote')
sync_test = File.join(File.dirname(__FILE__), 'fixtures', 'sync_test.rb')
[local, remote].each{|f| FileUtils.rm_rf f; FileUtils.mkdir_p f}
# setup the folders:
# local:
# - test_a
# - test_c
# remote:
# - test_b
#
# add test_c to exludes
FileUtils.cp(sync_test, File.join(local, 'test_a.rb'))
FileUtils.cp(sync_test, File.join(local, 'test_c.rb'))
FileUtils.cp(sync_test, File.join(remote, 'test_b.rb'))
# ensure a is not on remote
assert !File.exists?(File.join(remote, 'test_a.rb')), "A should not be on remote"
# ensure c is not on remote
assert !File.exists?(File.join(remote, 'test_c.rb')), "C should not be on remote"
# ensure b is on remote
assert File.exists?(File.join(remote, 'test_b.rb')), "B should be on remote"
Hydra::Sync.new(
{
:type => :ssh,
:connect => 'localhost',
:directory => remote,
:runners => 1
},
{
:directory => local,
:exclude => ['test_c.rb']
}
)
# ensure a is copied
assert File.exists?(File.join(remote, 'test_a.rb')), "A was not copied"
# ensure c is not copied
assert !File.exists?(File.join(remote, 'test_c.rb')), "C was copied, should be excluded"
# ensure b is deleted
assert !File.exists?(File.join(remote, 'test_b.rb')), "B was not deleted"
end
should "synchronize a test file over ssh with rsync to multiple workers" do
local = File.join(Dir.tmpdir, 'hydra', 'local')
remote_a = File.join(Dir.tmpdir, 'hydra', 'remote_a')
remote_b = File.join(Dir.tmpdir, 'hydra', 'remote_b')
sync_test = File.join(File.dirname(__FILE__), 'fixtures', 'sync_test.rb')
[local, remote_a, remote_b].each{|f| FileUtils.rm_rf f; FileUtils.mkdir_p f}
# setup the folders:
# local:
# - test_a
# remote_a:
# - test_b
# remote_b:
# - test_c
#
# add test_c to exludes
FileUtils.cp(sync_test, File.join(local, 'test_a.rb'))
FileUtils.cp(sync_test, File.join(remote_a, 'test_b.rb'))
FileUtils.cp(sync_test, File.join(remote_b, 'test_c.rb'))
# ensure a is not on remotes
assert !File.exists?(File.join(remote_a, 'test_a.rb')), "A should not be on remote_a"
assert !File.exists?(File.join(remote_b, 'test_a.rb')), "A should not be on remote_b"
# ensure b is on remote_a
assert File.exists?(File.join(remote_a, 'test_b.rb')), "B should be on remote_a"
# ensure c is on remote_b
assert File.exists?(File.join(remote_b, 'test_c.rb')), "C should be on remote_b"
Hydra::Sync.sync_many(
:workers => [{
:type => :ssh,
:connect => 'localhost',
:directory => remote_a,
:runners => 1
},
{
:type => :ssh,
:connect => 'localhost',
:directory => remote_b,
:runners => 1
}],
:sync => {
:directory => local
}
)
# ensure a is copied to both remotes
assert File.exists?(File.join(remote_a, 'test_a.rb')), "A was not copied to remote_a"
assert File.exists?(File.join(remote_b, 'test_a.rb')), "A was not copied to remote_b"
# ensure b and c are deleted from remotes
assert !File.exists?(File.join(remote_a, 'test_b.rb')), "B was not deleted from remote_a"
assert !File.exists?(File.join(remote_b, 'test_c.rb')), "C was not deleted from remote_b"
end
end
end