diff --git a/lib/hydra/io.rb b/lib/hydra/io.rb new file mode 100644 index 0000000..e02e211 --- /dev/null +++ b/lib/hydra/io.rb @@ -0,0 +1,29 @@ +module Hydra #:nodoc: + module MessagingIO + # Read a line from the input IO object. + def gets + raise IOError unless @reader + message = @reader.gets + return nil unless message + return Message.build(eval(message.chomp)) + end + + # Write a line to the output IO object + def write(message) + raise IOError unless @writer + raise UnprocessableMessage unless message.is_a?(Hydra::Message) + begin + @writer.write(message.serialize+"\n") + rescue Errno::EPIPE + raise IOError + end + end + + class UnprocessableMessage < RuntimeError + attr_accessor :message + def initialize(message = "Message expected") + @message = message + end + end + end +end diff --git a/lib/hydra/message.rb b/lib/hydra/message.rb index d0f38ae..a1c0d6d 100644 --- a/lib/hydra/message.rb +++ b/lib/hydra/message.rb @@ -1,7 +1,12 @@ module Hydra #:nodoc: class Message #:nodoc: - def self.build(str) - eval(str).new + def self.build(hash) + hash.delete(:class).new(hash) + end + + def serialize(opts = {}) + opts[:class] = self.class + opts.inspect end end end diff --git a/lib/hydra/message/runner_requests_file.rb b/lib/hydra/message/runner_requests_file.rb index e6100cb..4692ead 100644 --- a/lib/hydra/message/runner_requests_file.rb +++ b/lib/hydra/message/runner_requests_file.rb @@ -1,9 +1,7 @@ module Hydra #:nodoc: module Messages #:nodoc: + # Message indicating that a Runner needs a file to run class RunnerRequestsFile < Hydra::Message - def serialize - "Hydra::Messages::RunnerRequestsFile" - end end end end diff --git a/lib/hydra/pipe.rb b/lib/hydra/pipe.rb index 491949d..6bb9ff7 100644 --- a/lib/hydra/pipe.rb +++ b/lib/hydra/pipe.rb @@ -1,3 +1,4 @@ +require 'hydra/io' module Hydra #:nodoc: # Read and write between two processes via pipes. For example: # @pipe = Hydra::Pipe.new @@ -24,32 +25,13 @@ module Hydra #:nodoc: # One tube is for sending from parent to child, and the other # tube is for sending from child to parent. class Pipe + include Hydra::MessagingIO # Creates a new uninitialized pipe pair. def initialize @child_read, @parent_write = IO.pipe @parent_read, @child_write = IO.pipe end - # Read a line from a pipe. It will have a trailing newline. - def gets - force_identification - @reader.gets.chomp - end - - # Write a line to a pipe. It must have a trailing newline. - def write(str) - force_identification - unless str =~ /\n$/ - str += "\n" - end - begin - @writer.write(str) - return str - rescue Errno::EPIPE - raise Hydra::PipeError::Broken - end - end - # Identify this side of the pipe as the child. def identify_as_child @parent_write.close @@ -65,43 +47,5 @@ module Hydra #:nodoc: @reader = @parent_read @writer = @parent_write end - - # closes the pipes. Once a pipe is closed on one end, the other - # end will get a PipeError::Broken if it tries to write. - def close - done_reading - done_writing - end - - private - def done_writing #:nodoc: - @writer.close unless @writer.closed? - end - - def done_reading #:nodoc: - @reader.close unless @reader.closed? - end - - def force_identification #:nodoc: - raise PipeError::Unidentified if @reader.nil? or @writer.nil? - end - end - - module PipeError #:nodoc: - # Raised if you try to read or write to a pipe when it is unidentified. - # Use identify_as_parent and identify_as_child to identify a pipe. - class Unidentified < RuntimeError - def message #:nodoc: - "Must identify as child or parent" - end - end - # Raised when a pipe has been broken between two processes. - # This happens when a process exits, and is a signal that - # there is no more data to communicate. - class Broken < RuntimeError - def message #:nodoc: - "Other side closed the connection" - end - end end end diff --git a/lib/hydra/ssh.rb b/lib/hydra/ssh.rb index 4cc9ec3..cb4acfe 100644 --- a/lib/hydra/ssh.rb +++ b/lib/hydra/ssh.rb @@ -1,4 +1,5 @@ require 'open3' +require 'hydra/io' module Hydra #:nodoc: # Read and write with an ssh connection. For example: # @ssh = Hydra::SSH.new('nick@nite') @@ -17,6 +18,7 @@ module Hydra #:nodoc: # => "8" # the output from irb class SSH include Open3 + include Hydra::MessagingIO # Initialize new SSH connection. The single parameters is passed # directly to ssh for starting a connection. So you can do: @@ -25,24 +27,7 @@ module Hydra #:nodoc: # Hydra::SSH.new('-p 3022 user@server.com') # etc.. def initialize(connection_options) - @stdin, @stdout, @stderr = popen3("ssh #{connection_options}") - end - - # Write a string to ssh. This method returns the string passed to - # ssh. Note that if you do not add a newline at the end, it adds - # one for you, and the modified string is returned - def write(str) - unless str =~ /\n$/ - str += "\n" - end - @stdin.write(str) - return str - end - - # Read a line from ssh. This call blocks when there is nothing - # to read. - def gets - @stdout.gets.chomp + @writer, @reader, @error = popen3("ssh #{connection_options}") end end end diff --git a/test/helper.rb b/test/helper.rb index 2699a12..ebc5d09 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -8,3 +8,18 @@ require 'hydra' class Test::Unit::TestCase end + +module Hydra #:nodoc: + module Messages #:nodoc: + class TestMessage < Hydra::Message + attr_accessor :text + def initialize(opts = {}) + @text = opts.fetch(:text){ "test" } + end + def serialize + super(:text => @text) + end + end + end +end + diff --git a/test/test_pipe.rb b/test/test_pipe.rb index 6e91a41..6362e0e 100644 --- a/test/test_pipe.rb +++ b/test/test_pipe.rb @@ -6,39 +6,29 @@ class TestPipe < Test::Unit::TestCase @pipe = Hydra::Pipe.new end should "be able to write messages" do - Process.fork do + child = Process.fork do @pipe.identify_as_child - assert_equal "Test Message", @pipe.gets - @pipe.write "Message Received" - @pipe.write "Second Message" - @pipe.close + assert_equal "Test Message", @pipe.gets.text + @pipe.write Hydra::Messages::TestMessage.new(:text => "Message Received") + @pipe.write Hydra::Messages::TestMessage.new(:text => "Second Message") end @pipe.identify_as_parent - @pipe.write "Test Message" - assert_equal "Message Received", @pipe.gets - assert_equal "Second Message", @pipe.gets - assert_raise Hydra::PipeError::Broken do - @pipe.write "anybody home?" + @pipe.write Hydra::Messages::TestMessage.new(:text => "Test Message") + assert_equal "Message Received", @pipe.gets.text + assert_equal "Second Message", @pipe.gets.text + assert_raise IOError do + @pipe.write Hydra::Messages::TestMessage.new(:text => "anyone there?") end - @pipe.close end should "not allow writing if unidentified" do - assert_raise Hydra::PipeError::Unidentified do - @pipe.write "hey\n" + assert_raise IOError do + @pipe.write Hydra::Messages::TestMessage.new(:text => "Test Message") end end should "not allow reading if unidentified" do - assert_raise Hydra::PipeError::Unidentified do + assert_raise IOError do @pipe.gets end end - should "handle newlines" do - Process.fork do - @pipe.identify_as_child - @pipe.write "Message\n" - end - @pipe.identify_as_parent - assert_equal "Message", @pipe.gets - end end end diff --git a/test/test_runner.rb b/test/test_runner.rb index d89a41a..46c3c49 100644 --- a/test/test_runner.rb +++ b/test/test_runner.rb @@ -14,7 +14,9 @@ class TestRunner < Test::Unit::TestCase @message = Hydra::Message.build(@pipe.gets) assert @message.is_a?(Hydra::Messages::RunnerRequestsFile) end - should "return a result message after processing a file" + should "return a result message after processing a file" do + + end should "terminate when sent a shutdown message" end end