messaging system working for pipes, custom serialization. broken for ssh, since it runs commands.

This commit is contained in:
Nick Gauthier 2010-01-27 15:19:48 -05:00
parent 9916d7a6e5
commit 33b9885bc0
8 changed files with 72 additions and 104 deletions

29
lib/hydra/io.rb Normal file
View File

@ -0,0 +1,29 @@
module Hydra #:nodoc:
module MessagingIO
# Read a line from the input IO object.
def gets
raise IOError unless @reader
message = @reader.gets
return nil unless message
return Message.build(eval(message.chomp))
end
# Write a line to the output IO object
def write(message)
raise IOError unless @writer
raise UnprocessableMessage unless message.is_a?(Hydra::Message)
begin
@writer.write(message.serialize+"\n")
rescue Errno::EPIPE
raise IOError
end
end
class UnprocessableMessage < RuntimeError
attr_accessor :message
def initialize(message = "Message expected")
@message = message
end
end
end
end

View File

@ -1,7 +1,12 @@
module Hydra #:nodoc:
class Message #:nodoc:
def self.build(str)
eval(str).new
def self.build(hash)
hash.delete(:class).new(hash)
end
def serialize(opts = {})
opts[:class] = self.class
opts.inspect
end
end
end

View File

@ -1,9 +1,7 @@
module Hydra #:nodoc:
module Messages #:nodoc:
# Message indicating that a Runner needs a file to run
class RunnerRequestsFile < Hydra::Message
def serialize
"Hydra::Messages::RunnerRequestsFile"
end
end
end
end

View File

@ -1,3 +1,4 @@
require 'hydra/io'
module Hydra #:nodoc:
# Read and write between two processes via pipes. For example:
# @pipe = Hydra::Pipe.new
@ -24,32 +25,13 @@ module Hydra #:nodoc:
# One tube is for sending from parent to child, and the other
# tube is for sending from child to parent.
class Pipe
include Hydra::MessagingIO
# Creates a new uninitialized pipe pair.
def initialize
@child_read, @parent_write = IO.pipe
@parent_read, @child_write = IO.pipe
end
# Read a line from a pipe. It will have a trailing newline.
def gets
force_identification
@reader.gets.chomp
end
# Write a line to a pipe. It must have a trailing newline.
def write(str)
force_identification
unless str =~ /\n$/
str += "\n"
end
begin
@writer.write(str)
return str
rescue Errno::EPIPE
raise Hydra::PipeError::Broken
end
end
# Identify this side of the pipe as the child.
def identify_as_child
@parent_write.close
@ -65,43 +47,5 @@ module Hydra #:nodoc:
@reader = @parent_read
@writer = @parent_write
end
# closes the pipes. Once a pipe is closed on one end, the other
# end will get a PipeError::Broken if it tries to write.
def close
done_reading
done_writing
end
private
def done_writing #:nodoc:
@writer.close unless @writer.closed?
end
def done_reading #:nodoc:
@reader.close unless @reader.closed?
end
def force_identification #:nodoc:
raise PipeError::Unidentified if @reader.nil? or @writer.nil?
end
end
module PipeError #:nodoc:
# Raised if you try to read or write to a pipe when it is unidentified.
# Use identify_as_parent and identify_as_child to identify a pipe.
class Unidentified < RuntimeError
def message #:nodoc:
"Must identify as child or parent"
end
end
# Raised when a pipe has been broken between two processes.
# This happens when a process exits, and is a signal that
# there is no more data to communicate.
class Broken < RuntimeError
def message #:nodoc:
"Other side closed the connection"
end
end
end
end

View File

@ -1,4 +1,5 @@
require 'open3'
require 'hydra/io'
module Hydra #:nodoc:
# Read and write with an ssh connection. For example:
# @ssh = Hydra::SSH.new('nick@nite')
@ -17,6 +18,7 @@ module Hydra #:nodoc:
# => "8" # the output from irb
class SSH
include Open3
include Hydra::MessagingIO
# Initialize new SSH connection. The single parameters is passed
# directly to ssh for starting a connection. So you can do:
@ -25,24 +27,7 @@ module Hydra #:nodoc:
# Hydra::SSH.new('-p 3022 user@server.com')
# etc..
def initialize(connection_options)
@stdin, @stdout, @stderr = popen3("ssh #{connection_options}")
end
# Write a string to ssh. This method returns the string passed to
# ssh. Note that if you do not add a newline at the end, it adds
# one for you, and the modified string is returned
def write(str)
unless str =~ /\n$/
str += "\n"
end
@stdin.write(str)
return str
end
# Read a line from ssh. This call blocks when there is nothing
# to read.
def gets
@stdout.gets.chomp
@writer, @reader, @error = popen3("ssh #{connection_options}")
end
end
end

View File

@ -8,3 +8,18 @@ require 'hydra'
class Test::Unit::TestCase
end
module Hydra #:nodoc:
module Messages #:nodoc:
class TestMessage < Hydra::Message
attr_accessor :text
def initialize(opts = {})
@text = opts.fetch(:text){ "test" }
end
def serialize
super(:text => @text)
end
end
end
end

View File

@ -6,39 +6,29 @@ class TestPipe < Test::Unit::TestCase
@pipe = Hydra::Pipe.new
end
should "be able to write messages" do
Process.fork do
child = Process.fork do
@pipe.identify_as_child
assert_equal "Test Message", @pipe.gets
@pipe.write "Message Received"
@pipe.write "Second Message"
@pipe.close
assert_equal "Test Message", @pipe.gets.text
@pipe.write Hydra::Messages::TestMessage.new(:text => "Message Received")
@pipe.write Hydra::Messages::TestMessage.new(:text => "Second Message")
end
@pipe.identify_as_parent
@pipe.write "Test Message"
assert_equal "Message Received", @pipe.gets
assert_equal "Second Message", @pipe.gets
assert_raise Hydra::PipeError::Broken do
@pipe.write "anybody home?"
@pipe.write Hydra::Messages::TestMessage.new(:text => "Test Message")
assert_equal "Message Received", @pipe.gets.text
assert_equal "Second Message", @pipe.gets.text
assert_raise IOError do
@pipe.write Hydra::Messages::TestMessage.new(:text => "anyone there?")
end
@pipe.close
end
should "not allow writing if unidentified" do
assert_raise Hydra::PipeError::Unidentified do
@pipe.write "hey\n"
assert_raise IOError do
@pipe.write Hydra::Messages::TestMessage.new(:text => "Test Message")
end
end
should "not allow reading if unidentified" do
assert_raise Hydra::PipeError::Unidentified do
assert_raise IOError do
@pipe.gets
end
end
should "handle newlines" do
Process.fork do
@pipe.identify_as_child
@pipe.write "Message\n"
end
@pipe.identify_as_parent
assert_equal "Message", @pipe.gets
end
end
end

View File

@ -14,7 +14,9 @@ class TestRunner < Test::Unit::TestCase
@message = Hydra::Message.build(@pipe.gets)
assert @message.is_a?(Hydra::Messages::RunnerRequestsFile)
end
should "return a result message after processing a file"
should "return a result message after processing a file" do
end
should "terminate when sent a shutdown message"
end
end