From 01f28b47ff9e56aca2ce799e6a8229e72a380494 Mon Sep 17 00:00:00 2001 From: Tyler Brock Date: Tue, 3 Apr 2012 15:40:38 -0400 Subject: [PATCH] RUBY-429 non-blocking IO for socket timeouts Should greatly improve performance for highly threaded applications using connection and operation timeouts. --- lib/mongo.rb | 3 -- lib/mongo/connection.rb | 23 ++-------- lib/mongo/db.rb | 1 - lib/mongo/exceptions.rb | 2 +- lib/mongo/networking.rb | 26 ++++------- lib/mongo/util/node.rb | 18 ++------ lib/mongo/util/pool.rb | 2 +- lib/mongo/util/ssl_socket.rb | 46 ++++++++++++++----- lib/mongo/util/tcp_socket.rb | 84 ++++++++++++++++++++++++++++++++-- test/timeout_test.rb | 38 +++++++++++++++ test/unit/node_test.rb | 1 + test/unit/pool_manager_test.rb | 3 +- 12 files changed, 176 insertions(+), 71 deletions(-) create mode 100644 test/timeout_test.rb diff --git a/lib/mongo.rb b/lib/mongo.rb index 3187b49..ca31f1d 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -78,6 +78,3 @@ if RUBY_PLATFORM =~ /java/ require 'mongo/gridfs/grid_io_fix' end require 'mongo/gridfs/grid_file_system' - -require 'timeout' -Mongo::TimeoutHandler = Timeout diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 618e938..ee48648 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -26,7 +26,7 @@ module Mongo include Mongo::Logging include Mongo::Networking - TCPSocket = ::TCPSocket + TCPSocket = Mongo::TCPSocket Mutex = ::Mutex ConditionVariable = ::ConditionVariable @@ -67,7 +67,7 @@ module Mongo # logging negatively impacts performance; therefore, it should not be used for high-performance apps. # @option opts [Integer] :pool_size (1) The maximum number of socket self.connections allowed per # connection pool. Note: this setting is relevant only for multi-threaded applications. - # @option opts [Float] :pool_timeout (5.0) When all of the self.connections a pool are checked out, + # @option opts [Float] :timeout (5.0) When all of the self.connections a pool are checked out, # this is the number of seconds to wait for a new connection to be released before throwing an exception. # Note: this setting is relevant only for multi-threaded applications (which in Ruby are rare). # @option opts [Float] :op_timeout (nil) The number of seconds to wait for a read operation to time out. @@ -622,23 +622,10 @@ module Mongo socket = nil config = nil - if @connect_timeout - Mongo::TimeoutHandler.timeout(@connect_timeout, OperationTimeout) do - socket = @socket_class.new(host, port) - socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - end - else - socket = @socket_class.new(host, port) - socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - end + socket = @socket_class.new(host, port, @op_timeout, @connect_timeout) + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - if @connect_timeout - Mongo::TimeoutHandler.timeout(@connect_timeout, OperationTimeout) do - config = self['admin'].command({:ismaster => 1}, :socket => socket) - end - else - config = self['admin'].command({:ismaster => 1}, :socket => socket) - end + config = self['admin'].command({:ismaster => 1}, :socket => socket) rescue OperationFailure, SocketError, SystemCallError, IOError close ensure diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index 1711879..9dc872a 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -17,7 +17,6 @@ # ++ require 'socket' -require 'timeout' require 'thread' module Mongo diff --git a/lib/mongo/exceptions.rb b/lib/mongo/exceptions.rb index 3e39fdc..4b49614 100644 --- a/lib/mongo/exceptions.rb +++ b/lib/mongo/exceptions.rb @@ -71,7 +71,7 @@ module Mongo class OperationFailure < MongoDBError; end # Raised when a socket read operation times out. - class OperationTimeout < ::Timeout::Error; end + class OperationTimeout < SocketError; end # Raised when a client attempts to perform an invalid operation. class InvalidOperation < MongoDBError; end diff --git a/lib/mongo/networking.rb b/lib/mongo/networking.rb index ef8b3d0..62cee62 100644 --- a/lib/mongo/networking.rb +++ b/lib/mongo/networking.rb @@ -300,11 +300,11 @@ module Mongo # @return [Integer] number of bytes sent def send_message_on_socket(packed_message, socket) begin - total_bytes_sent = socket.send(packed_message, 0) + total_bytes_sent = socket.send(packed_message) if total_bytes_sent != packed_message.size packed_message.slice!(0, total_bytes_sent) while packed_message.size > 0 - byte_sent = socket.send(packed_message, 0) + byte_sent = socket.send(packed_message) total_bytes_sent += byte_sent packed_message.slice!(0, byte_sent) end @@ -320,22 +320,15 @@ module Mongo # Requires length and an available socket. def receive_message_on_socket(length, socket) begin - if @op_timeout - message = nil - Mongo::TimeoutHandler.timeout(@op_timeout, OperationTimeout) do - message = receive_data(length, socket) - end - else message = receive_data(length, socket) - end - rescue => ex - close + rescue OperationTimeout, ConnectionFailure => ex + close - if ex.class == OperationTimeout - raise OperationTimeout, "Timed out waiting on socket read." - else - raise ConnectionFailure, "Operation failed with the following exception: #{ex}" - end + if ex.class == OperationTimeout + raise OperationTimeout, "Timed out waiting on socket read." + else + raise ConnectionFailure, "Operation failed with the following exception: #{ex}" + end end message end @@ -343,6 +336,7 @@ module Mongo def receive_data(length, socket) message = new_binary_string socket.read(length, message) + raise ConnectionFailure, "connection closed" unless message && message.length > 0 if message.length < length chunk = new_binary_string diff --git a/lib/mongo/util/node.rb b/lib/mongo/util/node.rb index 9d6a49e..5915d49 100644 --- a/lib/mongo/util/node.rb +++ b/lib/mongo/util/node.rb @@ -36,13 +36,9 @@ module Mongo def connect begin socket = nil - if @connection.connect_timeout - Mongo::TimeoutHandler.timeout(@connection.connect_timeout, OperationTimeout) do - socket = @connection.socket_class.new(@host, @port) - end - else - socket = @connection.socket_class.new(@host, @port) - end + socket = @connection.socket_class.new(@host, @port, + @connection.op_timeout, @connection.connect_timeout + ) if socket.nil? return nil @@ -84,13 +80,7 @@ module Mongo # matches with the name provided. def set_config begin - if @connection.connect_timeout - Mongo::TimeoutHandler.timeout(@connection.connect_timeout, OperationTimeout) do - @config = @connection['admin'].command({:ismaster => 1}, :socket => @socket) - end - else - @config = @connection['admin'].command({:ismaster => 1}, :socket => @socket) - end + @config = @connection['admin'].command({:ismaster => 1}, :socket => @socket) if @config['msg'] && @logger @connection.log(:warn, "#{config['msg']}") diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index f28ca08..76bb723 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -156,7 +156,7 @@ module Mongo # therefore, it runs within a mutex. def checkout_new_socket begin - socket = self.connection.socket_class.new(@host, @port) + socket = @connection.socket_class.new(@host, @port, @connection.op_timeout) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) socket.pool = self rescue => ex diff --git a/lib/mongo/util/ssl_socket.rb b/lib/mongo/util/ssl_socket.rb index 9861ae1..20d366b 100644 --- a/lib/mongo/util/ssl_socket.rb +++ b/lib/mongo/util/ssl_socket.rb @@ -1,4 +1,6 @@ +require 'socket' require 'openssl' +require 'timeout' module Mongo @@ -9,31 +11,51 @@ module Mongo attr_accessor :pool - def initialize(host, port) - @socket = ::TCPSocket.new(host, port) + def initialize(host, port, op_timeout=nil, connect_timeout=nil) + @op_timeout = op_timeout + @connect_timeout = connect_timeout + + @socket = ::TCPSocket.new(host, port) @ssl = OpenSSL::SSL::SSLSocket.new(@socket) @ssl.sync_close = true - @ssl.connect + + connect end - def setsockopt(key, value, n) - @socket.setsockopt(key, value, n) + def connect + if @connect_timeout + Timeout::timeout(@connect_timeout, ConnectionTimeoutError) do + @ssl.connect + end + else + @ssl.connect + end end - # Write to the SSL socket. - # - # @param buffer a buffer to send. - # @param flags socket flags. Because Ruby's SSL - def send(buffer, flags=0) - @ssl.syswrite(buffer) + def send(data) + @ssl.syswrite(data) end def read(length, buffer) - @ssl.sysread(length, buffer) + if @op_timeout + Timeout::timeout(@op_timeout, OperationTimeout) do + @ssl.sysread(length, buffer) + end + else + @ssl.sysread(length, buffer) + end + end + + def setsockopt(key, value, n) + @ssl.setsockopt(key, value, n) end def close @ssl.close end + + def closed? + @ssl.closed? + end end end diff --git a/lib/mongo/util/tcp_socket.rb b/lib/mongo/util/tcp_socket.rb index 7d8d97d..953eed7 100644 --- a/lib/mongo/util/tcp_socket.rb +++ b/lib/mongo/util/tcp_socket.rb @@ -1,6 +1,82 @@ -module Mongo - class TCPSocket < ::TCPSocket - attr_accessor :pool +require 'socket' - end +module Mongo + # Wrapper class for Socket + # + # Emulates TCPSocket with operation and connection timeout + # sans Timeout::timeout + # + class TCPSocket + attr_accessor :pool + + def initialize(host, port, op_timeout=nil, connect_timeout=nil) + @op_timeout = op_timeout + @connect_timeout = connect_timeout + + # TODO: Prefer ipv6 if server is ipv6 enabled + @host = Socket.getaddrinfo(host, nil, Socket::AF_INET).first[3] + @port = port + @socket_address = Socket.pack_sockaddr_in(@port, @host) + @socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) + + connect + end + + def connect + # Connect nonblock is broken in current versions of JRuby + if RUBY_PLATFORM == 'java' + require 'timeout' + if @connect_timeout + Timeout::timeout(@connect_timeout, OperationTimeout) do + @socket.connect(@socket_address) + end + else + @socket.connect(@socket_address) + end + else + # Try to connect for @connect_timeout seconds + begin + @socket.connect_nonblock(@socket_address) + rescue Errno::EINPROGRESS + # Block until there is a response or error + resp = IO.select([@socket], [@socket], [@socket], @connect_timeout) + if resp.nil? + raise ConnectionTimeoutError + end + end + + # If there was a failure this will raise an Error + begin + @socket.connect_nonblock(@socket_address) + rescue Errno::EISCONN + # Successfully connected + end + end + end + + def send(data) + @socket.write(data) + end + + def read(maxlen, buffer) + # Block on data to read for @op_timeout seconds + if IO.select([@socket], nil, nil, @op_timeout) + @socket.readpartial(maxlen, buffer) + else + raise OperationTimeout + end + end + + def setsockopt(key, value, n) + @socket.setsockopt(key, value, n) + end + + def close + @socket.close + end + + def closed? + @socket.closed? + end + end end diff --git a/test/timeout_test.rb b/test/timeout_test.rb new file mode 100644 index 0000000..bb6fcfc --- /dev/null +++ b/test/timeout_test.rb @@ -0,0 +1,38 @@ +require './test/test_helper' + +class TestTimeout < Test::Unit::TestCase + def test_op_timeout + connection = standard_connection(:op_timeout => 1) + + coll = connection.db(MONGO_TEST_DB).collection("test") + coll.insert({:a => 1}) + + # Should not timeout + assert coll.find_one({"$where" => "sleep(100); return true;"}) + + # Should timeout + assert_raise Mongo::OperationTimeout do + coll.find_one({"$where" => "sleep(3 * 1000); return true;"}) + end + + coll.remove + end +=begin + def test_ssl_op_timeout + connection = standard_connection(:op_timeout => 1, :ssl => true) + + coll = connection.db(MONGO_TEST_DB).collection("test") + coll.insert({:a => 1}) + + # Should not timeout + assert coll.find_one({"$where" => "sleep(100); return true;"}) + + # Should timeout + assert_raise Mongo::OperationTimeout do + coll.find_one({"$where" => "sleep(5 * 1000); return true;"}) + end + + coll.remove + end +=end +end diff --git a/test/unit/node_test.rb b/test/unit/node_test.rb index c3b2be7..931396e 100644 --- a/test/unit/node_test.rb +++ b/test/unit/node_test.rb @@ -15,6 +15,7 @@ class NodeTest < Test::Unit::TestCase admin_db = new_mock_db admin_db.stubs(:command).returns({'ok' => 1, 'ismaster' => 1}) @connection.stubs(:[]).with('admin').returns(admin_db) + @connection.stubs(:op_timeout).returns(nil) @connection.stubs(:connect_timeout).returns(nil) @connection.expects(:log) diff --git a/test/unit/pool_manager_test.rb b/test/unit/pool_manager_test.rb index 4ca11ac..3673925 100644 --- a/test/unit/pool_manager_test.rb +++ b/test/unit/pool_manager_test.rb @@ -10,7 +10,8 @@ class PoolManagerTest < Test::Unit::TestCase @db = new_mock_db @connection = stub("Connection") - @connection.stubs(:connect_timeout).returns(5000) + @connection.stubs(:connect_timeout).returns(5) + @connection.stubs(:op_timeout).returns(5) @connection.stubs(:pool_size).returns(2) @connection.stubs(:pool_timeout).returns(100) @connection.stubs(:seeds).returns(['localhost:30000'])