added safe forking for active record. fixed ssh socket closing and tty

This commit is contained in:
Nick Gauthier 2010-02-05 15:54:48 -05:00
parent 71b4ce2845
commit 249e3d77ba
11 changed files with 65 additions and 30 deletions

View File

@ -9,7 +9,7 @@ Gem::Specification.new do |s|
s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version= s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
s.authors = ["Nick Gauthier"] s.authors = ["Nick Gauthier"]
s.date = %q{2010-02-04} s.date = %q{2010-02-05}
s.description = %q{Spread your tests over multiple machines to test your code faster.} s.description = %q{Spread your tests over multiple machines to test your code faster.}
s.email = %q{nick@smartlogicsolutions.com} s.email = %q{nick@smartlogicsolutions.com}
s.extra_rdoc_files = [ s.extra_rdoc_files = [

View File

@ -3,6 +3,7 @@ require 'hydra/pipe'
require 'hydra/ssh' require 'hydra/ssh'
require 'hydra/stdio' require 'hydra/stdio'
require 'hydra/message' require 'hydra/message'
require 'hydra/safe_fork'
require 'hydra/runner' require 'hydra/runner'
require 'hydra/worker' require 'hydra/worker'
require 'hydra/master' require 'hydra/master'

View File

@ -83,7 +83,7 @@ module Hydra #:nodoc:
runners = worker.fetch('runners') { raise "You must specify the number of runners" } runners = worker.fetch('runners') { raise "You must specify the number of runners" }
trace "Booting local worker" trace "Booting local worker"
pipe = Hydra::Pipe.new pipe = Hydra::Pipe.new
child = Process.fork do child = SafeFork.fork do
pipe.identify_as_child pipe.identify_as_child
Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose) Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose)
end end
@ -129,7 +129,11 @@ module Hydra #:nodoc:
begin begin
message = worker[:io].gets message = worker[:io].gets
trace "got message: #{message}" trace "got message: #{message}"
message.handle(self, worker) if message # if it exists and its for me.
# SSH gives us back echoes, so we need to ignore our own messages
if message and !message.class.to_s.index("Worker").nil?
message.handle(self, worker)
end
rescue IOError rescue IOError
trace "lost Worker [#{worker.inspect}]" trace "lost Worker [#{worker.inspect}]"
Thread.exit Thread.exit

View File

@ -13,7 +13,7 @@ module Hydra #:nodoc:
return nil unless message return nil unless message
return Message.build(eval(message.chomp)) return Message.build(eval(message.chomp))
rescue SyntaxError, NameError rescue SyntaxError, NameError
$stderr.write "Not a message: [#{message.inspect}]\n" #$stderr.write "Not a message: [#{message.inspect}]\n"
return gets return gets
end end

View File

@ -66,7 +66,7 @@ module Hydra #:nodoc:
while @running while @running
begin begin
message = @io.gets message = @io.gets
if message if message and !message.class.to_s.index("Worker").nil?
trace "Received message from worker" trace "Received message from worker"
trace "\t#{message.inspect}" trace "\t#{message.inspect}"
message.handle(self) message.handle(self)
@ -92,9 +92,12 @@ module Hydra #:nodoc:
eval(c.first) eval(c.first)
end end
rescue NameError rescue NameError
trace "Could not load [#{c.first}] from [#{f}]" # means we could not load [c.first], but thats ok, its just not
# one of the classes we want to test
nil
rescue SyntaxError rescue SyntaxError
trace "Could not load [#{c.first}] from [#{f}]" # see above
nil
end end
end end
return klasses.select{|k| k.respond_to? 'suite'} return klasses.select{|k| k.respond_to? 'suite'}

23
lib/hydra/safe_fork.rb Normal file
View File

@ -0,0 +1,23 @@
class SafeFork
def self.fork
begin
# remove our connection so it doesn't get cloned
ActiveRecord::Base.remove_connection if defined?(ActiveRecord)
# fork a process
child = Process.fork do
begin
# create a new connection and perform the action
ActiveRecord::Base.establish_connection if defined?(ActiveRecord)
yield
ensure
# make sure we remove the connection before we're done
ActiveRecord::Base.remove_connection if defined?(ActiveRecord)
end
end
ensure
# make sure we re-establish the connection before returning to the main instance
ActiveRecord::Base.establish_connection if defined?(ActiveRecord)
end
return child
end
end

View File

@ -24,9 +24,14 @@ module Hydra #:nodoc:
# Hydra::SSH.new('-p 3022 user@server.com') # Hydra::SSH.new('-p 3022 user@server.com')
# etc.. # etc..
def initialize(connection_options, directory, command) def initialize(connection_options, directory, command)
@writer, @reader, @error = popen3("ssh #{connection_options}") @writer, @reader, @error = popen3("ssh -tt #{connection_options}")
@writer.write("cd #{directory}\n") @writer.write("cd #{directory}\n")
@writer.write(command+"\n") @writer.write(command+"\n")
end end
def close
@writer.write "exit\n"
super
end
end end
end end

View File

@ -68,7 +68,7 @@ module Hydra #:nodoc:
trace "Booting #{num_runners} Runners" trace "Booting #{num_runners} Runners"
num_runners.times do num_runners.times do
pipe = Hydra::Pipe.new pipe = Hydra::Pipe.new
child = Process.fork do child = SafeFork.fork do
pipe.identify_as_child pipe.identify_as_child
Hydra::Runner.new(:io => pipe, :verbose => @verbose) Hydra::Runner.new(:io => pipe, :verbose => @verbose)
end end
@ -98,7 +98,7 @@ module Hydra #:nodoc:
while @running while @running
begin begin
message = @io.gets message = @io.gets
if message if message and !message.class.to_s.index("Master").nil?
trace "Received Message from Master" trace "Received Message from Master"
trace "\t#{message.inspect}" trace "\t#{message.inspect}"
message.handle(self) message.handle(self)
@ -120,7 +120,7 @@ module Hydra #:nodoc:
while @running while @running
begin begin
message = r[:io].gets message = r[:io].gets
if message if message and !message.class.to_s.index("Runner").nil?
trace "Received Message from Runner" trace "Received Message from Runner"
trace "\t#{message.inspect}" trace "\t#{message.inspect}"
message.handle(self, r) message.handle(self, r)

View File

@ -1,7 +1,7 @@
#!/usr/bin/env ruby #!/usr/bin/env ruby
# Echoes back to the sender # Echoes back to the sender
$stdout.sync = true $stdout.sync = true
while line = $stdin.gets while line = $stdin.get
$stdout.write(line) $stdout.write line
end end

View File

@ -38,16 +38,21 @@ class RunnerTest < Test::Unit::TestCase
ssh = Hydra::SSH.new( ssh = Hydra::SSH.new(
'localhost', 'localhost',
File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')), File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')),
"ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Runner.new(:io => Hydra::Stdio.new);\"" "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Runner.new(:io => Hydra::Stdio.new, :verbose => true);\""
) )
assert ssh.gets.is_a?(Hydra::Messages::Runner::RequestFile) assert ssh.gets.is_a?(Hydra::Messages::Runner::RequestFile)
ssh.write(Hydra::Messages::Worker::RunFile.new(:file => test_file)) ssh.write(Hydra::Messages::Worker::RunFile.new(:file => test_file))
# grab its response. This makes us wait for it to finish # grab its response. This makes us wait for it to finish
echo = ssh.gets # get the ssh echo
response = ssh.gets response = ssh.gets
assert_equal Hydra::Messages::Runner::Results, response.class
# tell it to shut down # tell it to shut down
ssh.write(Hydra::Messages::Worker::Shutdown.new) ssh.write(Hydra::Messages::Worker::Shutdown.new)
ssh.close
# ensure it ran # ensure it ran
assert File.exists?(target_file) assert File.exists?(target_file)

View File

@ -1,21 +1,15 @@
require File.join(File.dirname(__FILE__), 'test_helper') require File.join(File.dirname(__FILE__), 'test_helper')
class SSHTest < Test::Unit::TestCase class SSHTest < Test::Unit::TestCase
context "an ssh connection" do should "be able to execute a command over ssh" do
setup do ssh = Hydra::SSH.new(
@ssh = Hydra::SSH.new( 'localhost', # connect to this machine
'localhost', # connect to this machine File.expand_path(File.join(File.dirname(__FILE__))), # move to the test directory
File.expand_path(File.join(File.dirname(__FILE__))), # move to the test directory "ruby fixtures/echo_the_dolphin.rb"
"ruby fixtures/echo_the_dolphin.rb" )
) message = Hydra::Messages::TestMessage.new
@message = Hydra::Messages::TestMessage.new ssh.write message
end assert_equal message.text, ssh.gets.text
teardown do ssh.close
@ssh.close
end
should "be able to execute a command" do
@ssh.write @message
assert_equal @message.text, @ssh.gets.text
end
end end
end end