From 249e3d77bac4d96efec7bf63925821a5b0a1d562 Mon Sep 17 00:00:00 2001 From: Nick Gauthier Date: Fri, 5 Feb 2010 15:54:48 -0500 Subject: [PATCH] added safe forking for active record. fixed ssh socket closing and tty --- hydra.gemspec | 2 +- lib/hydra.rb | 1 + lib/hydra/master.rb | 8 ++++++-- lib/hydra/messaging_io.rb | 2 +- lib/hydra/runner.rb | 9 ++++++--- lib/hydra/safe_fork.rb | 23 +++++++++++++++++++++++ lib/hydra/ssh.rb | 7 ++++++- lib/hydra/worker.rb | 6 +++--- test/fixtures/echo_the_dolphin.rb | 4 ++-- test/runner_test.rb | 7 ++++++- test/ssh_test.rb | 26 ++++++++++---------------- 11 files changed, 65 insertions(+), 30 deletions(-) create mode 100644 lib/hydra/safe_fork.rb diff --git a/hydra.gemspec b/hydra.gemspec index 4ea67e6..b5f0f92 100644 --- a/hydra.gemspec +++ b/hydra.gemspec @@ -9,7 +9,7 @@ Gem::Specification.new do |s| s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version= s.authors = ["Nick Gauthier"] - s.date = %q{2010-02-04} + s.date = %q{2010-02-05} s.description = %q{Spread your tests over multiple machines to test your code faster.} s.email = %q{nick@smartlogicsolutions.com} s.extra_rdoc_files = [ diff --git a/lib/hydra.rb b/lib/hydra.rb index bcd4f1f..d659346 100644 --- a/lib/hydra.rb +++ b/lib/hydra.rb @@ -3,6 +3,7 @@ require 'hydra/pipe' require 'hydra/ssh' require 'hydra/stdio' require 'hydra/message' +require 'hydra/safe_fork' require 'hydra/runner' require 'hydra/worker' require 'hydra/master' diff --git a/lib/hydra/master.rb b/lib/hydra/master.rb index 87d9c8e..ec883a4 100644 --- a/lib/hydra/master.rb +++ b/lib/hydra/master.rb @@ -83,7 +83,7 @@ module Hydra #:nodoc: runners = worker.fetch('runners') { raise "You must specify the number of runners" } trace "Booting local worker" pipe = Hydra::Pipe.new - child = Process.fork do + child = SafeFork.fork do pipe.identify_as_child Hydra::Worker.new(:io => pipe, :runners => runners, :verbose => @verbose) end @@ -129,7 +129,11 @@ module Hydra #:nodoc: begin message = worker[:io].gets trace "got message: #{message}" - message.handle(self, worker) if message + # if it exists and its for me. + # SSH gives us back echoes, so we need to ignore our own messages + if message and !message.class.to_s.index("Worker").nil? + message.handle(self, worker) + end rescue IOError trace "lost Worker [#{worker.inspect}]" Thread.exit diff --git a/lib/hydra/messaging_io.rb b/lib/hydra/messaging_io.rb index 4b5a468..0534d25 100644 --- a/lib/hydra/messaging_io.rb +++ b/lib/hydra/messaging_io.rb @@ -13,7 +13,7 @@ module Hydra #:nodoc: return nil unless message return Message.build(eval(message.chomp)) rescue SyntaxError, NameError - $stderr.write "Not a message: [#{message.inspect}]\n" + #$stderr.write "Not a message: [#{message.inspect}]\n" return gets end diff --git a/lib/hydra/runner.rb b/lib/hydra/runner.rb index 3fc50af..0a0c72c 100644 --- a/lib/hydra/runner.rb +++ b/lib/hydra/runner.rb @@ -66,7 +66,7 @@ module Hydra #:nodoc: while @running begin message = @io.gets - if message + if message and !message.class.to_s.index("Worker").nil? trace "Received message from worker" trace "\t#{message.inspect}" message.handle(self) @@ -92,9 +92,12 @@ module Hydra #:nodoc: eval(c.first) end rescue NameError - trace "Could not load [#{c.first}] from [#{f}]" + # means we could not load [c.first], but thats ok, its just not + # one of the classes we want to test + nil rescue SyntaxError - trace "Could not load [#{c.first}] from [#{f}]" + # see above + nil end end return klasses.select{|k| k.respond_to? 'suite'} diff --git a/lib/hydra/safe_fork.rb b/lib/hydra/safe_fork.rb new file mode 100644 index 0000000..f98aecb --- /dev/null +++ b/lib/hydra/safe_fork.rb @@ -0,0 +1,23 @@ +class SafeFork + def self.fork + begin + # remove our connection so it doesn't get cloned + ActiveRecord::Base.remove_connection if defined?(ActiveRecord) + # fork a process + child = Process.fork do + begin + # create a new connection and perform the action + ActiveRecord::Base.establish_connection if defined?(ActiveRecord) + yield + ensure + # make sure we remove the connection before we're done + ActiveRecord::Base.remove_connection if defined?(ActiveRecord) + end + end + ensure + # make sure we re-establish the connection before returning to the main instance + ActiveRecord::Base.establish_connection if defined?(ActiveRecord) + end + return child + end +end diff --git a/lib/hydra/ssh.rb b/lib/hydra/ssh.rb index 3985939..f0e1340 100644 --- a/lib/hydra/ssh.rb +++ b/lib/hydra/ssh.rb @@ -24,9 +24,14 @@ module Hydra #:nodoc: # Hydra::SSH.new('-p 3022 user@server.com') # etc.. def initialize(connection_options, directory, command) - @writer, @reader, @error = popen3("ssh #{connection_options}") + @writer, @reader, @error = popen3("ssh -tt #{connection_options}") @writer.write("cd #{directory}\n") @writer.write(command+"\n") end + + def close + @writer.write "exit\n" + super + end end end diff --git a/lib/hydra/worker.rb b/lib/hydra/worker.rb index 6375d70..5e3b23e 100644 --- a/lib/hydra/worker.rb +++ b/lib/hydra/worker.rb @@ -68,7 +68,7 @@ module Hydra #:nodoc: trace "Booting #{num_runners} Runners" num_runners.times do pipe = Hydra::Pipe.new - child = Process.fork do + child = SafeFork.fork do pipe.identify_as_child Hydra::Runner.new(:io => pipe, :verbose => @verbose) end @@ -98,7 +98,7 @@ module Hydra #:nodoc: while @running begin message = @io.gets - if message + if message and !message.class.to_s.index("Master").nil? trace "Received Message from Master" trace "\t#{message.inspect}" message.handle(self) @@ -120,7 +120,7 @@ module Hydra #:nodoc: while @running begin message = r[:io].gets - if message + if message and !message.class.to_s.index("Runner").nil? trace "Received Message from Runner" trace "\t#{message.inspect}" message.handle(self, r) diff --git a/test/fixtures/echo_the_dolphin.rb b/test/fixtures/echo_the_dolphin.rb index 9d9d12e..f4e5356 100755 --- a/test/fixtures/echo_the_dolphin.rb +++ b/test/fixtures/echo_the_dolphin.rb @@ -1,7 +1,7 @@ #!/usr/bin/env ruby # Echoes back to the sender $stdout.sync = true -while line = $stdin.gets - $stdout.write(line) +while line = $stdin.get + $stdout.write line end diff --git a/test/runner_test.rb b/test/runner_test.rb index a8bbec3..c196060 100644 --- a/test/runner_test.rb +++ b/test/runner_test.rb @@ -38,16 +38,21 @@ class RunnerTest < Test::Unit::TestCase ssh = Hydra::SSH.new( 'localhost', File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')), - "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Runner.new(:io => Hydra::Stdio.new);\"" + "ruby -e \"require 'rubygems'; require 'hydra'; Hydra::Runner.new(:io => Hydra::Stdio.new, :verbose => true);\"" ) assert ssh.gets.is_a?(Hydra::Messages::Runner::RequestFile) ssh.write(Hydra::Messages::Worker::RunFile.new(:file => test_file)) # grab its response. This makes us wait for it to finish + echo = ssh.gets # get the ssh echo response = ssh.gets + + assert_equal Hydra::Messages::Runner::Results, response.class # tell it to shut down ssh.write(Hydra::Messages::Worker::Shutdown.new) + + ssh.close # ensure it ran assert File.exists?(target_file) diff --git a/test/ssh_test.rb b/test/ssh_test.rb index e4b6011..e197254 100644 --- a/test/ssh_test.rb +++ b/test/ssh_test.rb @@ -1,21 +1,15 @@ require File.join(File.dirname(__FILE__), 'test_helper') class SSHTest < Test::Unit::TestCase - context "an ssh connection" do - setup do - @ssh = Hydra::SSH.new( - 'localhost', # connect to this machine - File.expand_path(File.join(File.dirname(__FILE__))), # move to the test directory - "ruby fixtures/echo_the_dolphin.rb" - ) - @message = Hydra::Messages::TestMessage.new - end - teardown do - @ssh.close - end - should "be able to execute a command" do - @ssh.write @message - assert_equal @message.text, @ssh.gets.text - end + should "be able to execute a command over ssh" do + ssh = Hydra::SSH.new( + 'localhost', # connect to this machine + File.expand_path(File.join(File.dirname(__FILE__))), # move to the test directory + "ruby fixtures/echo_the_dolphin.rb" + ) + message = Hydra::Messages::TestMessage.new + ssh.write message + assert_equal message.text, ssh.gets.text + ssh.close end end