RUBY-429 non-blocking IO for socket timeouts
Should greatly improve performance for highly threaded applications using connection and operation timeouts.
This commit is contained in:
parent
aab3cf7b74
commit
01f28b47ff
@ -78,6 +78,3 @@ if RUBY_PLATFORM =~ /java/
|
|||||||
require 'mongo/gridfs/grid_io_fix'
|
require 'mongo/gridfs/grid_io_fix'
|
||||||
end
|
end
|
||||||
require 'mongo/gridfs/grid_file_system'
|
require 'mongo/gridfs/grid_file_system'
|
||||||
|
|
||||||
require 'timeout'
|
|
||||||
Mongo::TimeoutHandler = Timeout
|
|
||||||
|
@ -26,7 +26,7 @@ module Mongo
|
|||||||
include Mongo::Logging
|
include Mongo::Logging
|
||||||
include Mongo::Networking
|
include Mongo::Networking
|
||||||
|
|
||||||
TCPSocket = ::TCPSocket
|
TCPSocket = Mongo::TCPSocket
|
||||||
Mutex = ::Mutex
|
Mutex = ::Mutex
|
||||||
ConditionVariable = ::ConditionVariable
|
ConditionVariable = ::ConditionVariable
|
||||||
|
|
||||||
@ -67,7 +67,7 @@ module Mongo
|
|||||||
# logging negatively impacts performance; therefore, it should not be used for high-performance apps.
|
# logging negatively impacts performance; therefore, it should not be used for high-performance apps.
|
||||||
# @option opts [Integer] :pool_size (1) The maximum number of socket self.connections allowed per
|
# @option opts [Integer] :pool_size (1) The maximum number of socket self.connections allowed per
|
||||||
# connection pool. Note: this setting is relevant only for multi-threaded applications.
|
# connection pool. Note: this setting is relevant only for multi-threaded applications.
|
||||||
# @option opts [Float] :pool_timeout (5.0) When all of the self.connections a pool are checked out,
|
# @option opts [Float] :timeout (5.0) When all of the self.connections a pool are checked out,
|
||||||
# this is the number of seconds to wait for a new connection to be released before throwing an exception.
|
# this is the number of seconds to wait for a new connection to be released before throwing an exception.
|
||||||
# Note: this setting is relevant only for multi-threaded applications (which in Ruby are rare).
|
# Note: this setting is relevant only for multi-threaded applications (which in Ruby are rare).
|
||||||
# @option opts [Float] :op_timeout (nil) The number of seconds to wait for a read operation to time out.
|
# @option opts [Float] :op_timeout (nil) The number of seconds to wait for a read operation to time out.
|
||||||
@ -622,23 +622,10 @@ module Mongo
|
|||||||
socket = nil
|
socket = nil
|
||||||
config = nil
|
config = nil
|
||||||
|
|
||||||
if @connect_timeout
|
socket = @socket_class.new(host, port, @op_timeout, @connect_timeout)
|
||||||
Mongo::TimeoutHandler.timeout(@connect_timeout, OperationTimeout) do
|
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
||||||
socket = @socket_class.new(host, port)
|
|
||||||
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
|
||||||
end
|
|
||||||
else
|
|
||||||
socket = @socket_class.new(host, port)
|
|
||||||
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
|
||||||
end
|
|
||||||
|
|
||||||
if @connect_timeout
|
config = self['admin'].command({:ismaster => 1}, :socket => socket)
|
||||||
Mongo::TimeoutHandler.timeout(@connect_timeout, OperationTimeout) do
|
|
||||||
config = self['admin'].command({:ismaster => 1}, :socket => socket)
|
|
||||||
end
|
|
||||||
else
|
|
||||||
config = self['admin'].command({:ismaster => 1}, :socket => socket)
|
|
||||||
end
|
|
||||||
rescue OperationFailure, SocketError, SystemCallError, IOError
|
rescue OperationFailure, SocketError, SystemCallError, IOError
|
||||||
close
|
close
|
||||||
ensure
|
ensure
|
||||||
|
@ -17,7 +17,6 @@
|
|||||||
# ++
|
# ++
|
||||||
|
|
||||||
require 'socket'
|
require 'socket'
|
||||||
require 'timeout'
|
|
||||||
require 'thread'
|
require 'thread'
|
||||||
|
|
||||||
module Mongo
|
module Mongo
|
||||||
|
@ -71,7 +71,7 @@ module Mongo
|
|||||||
class OperationFailure < MongoDBError; end
|
class OperationFailure < MongoDBError; end
|
||||||
|
|
||||||
# Raised when a socket read operation times out.
|
# Raised when a socket read operation times out.
|
||||||
class OperationTimeout < ::Timeout::Error; end
|
class OperationTimeout < SocketError; end
|
||||||
|
|
||||||
# Raised when a client attempts to perform an invalid operation.
|
# Raised when a client attempts to perform an invalid operation.
|
||||||
class InvalidOperation < MongoDBError; end
|
class InvalidOperation < MongoDBError; end
|
||||||
|
@ -300,11 +300,11 @@ module Mongo
|
|||||||
# @return [Integer] number of bytes sent
|
# @return [Integer] number of bytes sent
|
||||||
def send_message_on_socket(packed_message, socket)
|
def send_message_on_socket(packed_message, socket)
|
||||||
begin
|
begin
|
||||||
total_bytes_sent = socket.send(packed_message, 0)
|
total_bytes_sent = socket.send(packed_message)
|
||||||
if total_bytes_sent != packed_message.size
|
if total_bytes_sent != packed_message.size
|
||||||
packed_message.slice!(0, total_bytes_sent)
|
packed_message.slice!(0, total_bytes_sent)
|
||||||
while packed_message.size > 0
|
while packed_message.size > 0
|
||||||
byte_sent = socket.send(packed_message, 0)
|
byte_sent = socket.send(packed_message)
|
||||||
total_bytes_sent += byte_sent
|
total_bytes_sent += byte_sent
|
||||||
packed_message.slice!(0, byte_sent)
|
packed_message.slice!(0, byte_sent)
|
||||||
end
|
end
|
||||||
@ -320,22 +320,15 @@ module Mongo
|
|||||||
# Requires length and an available socket.
|
# Requires length and an available socket.
|
||||||
def receive_message_on_socket(length, socket)
|
def receive_message_on_socket(length, socket)
|
||||||
begin
|
begin
|
||||||
if @op_timeout
|
|
||||||
message = nil
|
|
||||||
Mongo::TimeoutHandler.timeout(@op_timeout, OperationTimeout) do
|
|
||||||
message = receive_data(length, socket)
|
|
||||||
end
|
|
||||||
else
|
|
||||||
message = receive_data(length, socket)
|
message = receive_data(length, socket)
|
||||||
end
|
rescue OperationTimeout, ConnectionFailure => ex
|
||||||
rescue => ex
|
close
|
||||||
close
|
|
||||||
|
|
||||||
if ex.class == OperationTimeout
|
if ex.class == OperationTimeout
|
||||||
raise OperationTimeout, "Timed out waiting on socket read."
|
raise OperationTimeout, "Timed out waiting on socket read."
|
||||||
else
|
else
|
||||||
raise ConnectionFailure, "Operation failed with the following exception: #{ex}"
|
raise ConnectionFailure, "Operation failed with the following exception: #{ex}"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
message
|
message
|
||||||
end
|
end
|
||||||
@ -343,6 +336,7 @@ module Mongo
|
|||||||
def receive_data(length, socket)
|
def receive_data(length, socket)
|
||||||
message = new_binary_string
|
message = new_binary_string
|
||||||
socket.read(length, message)
|
socket.read(length, message)
|
||||||
|
|
||||||
raise ConnectionFailure, "connection closed" unless message && message.length > 0
|
raise ConnectionFailure, "connection closed" unless message && message.length > 0
|
||||||
if message.length < length
|
if message.length < length
|
||||||
chunk = new_binary_string
|
chunk = new_binary_string
|
||||||
|
@ -36,13 +36,9 @@ module Mongo
|
|||||||
def connect
|
def connect
|
||||||
begin
|
begin
|
||||||
socket = nil
|
socket = nil
|
||||||
if @connection.connect_timeout
|
socket = @connection.socket_class.new(@host, @port,
|
||||||
Mongo::TimeoutHandler.timeout(@connection.connect_timeout, OperationTimeout) do
|
@connection.op_timeout, @connection.connect_timeout
|
||||||
socket = @connection.socket_class.new(@host, @port)
|
)
|
||||||
end
|
|
||||||
else
|
|
||||||
socket = @connection.socket_class.new(@host, @port)
|
|
||||||
end
|
|
||||||
|
|
||||||
if socket.nil?
|
if socket.nil?
|
||||||
return nil
|
return nil
|
||||||
@ -84,13 +80,7 @@ module Mongo
|
|||||||
# matches with the name provided.
|
# matches with the name provided.
|
||||||
def set_config
|
def set_config
|
||||||
begin
|
begin
|
||||||
if @connection.connect_timeout
|
@config = @connection['admin'].command({:ismaster => 1}, :socket => @socket)
|
||||||
Mongo::TimeoutHandler.timeout(@connection.connect_timeout, OperationTimeout) do
|
|
||||||
@config = @connection['admin'].command({:ismaster => 1}, :socket => @socket)
|
|
||||||
end
|
|
||||||
else
|
|
||||||
@config = @connection['admin'].command({:ismaster => 1}, :socket => @socket)
|
|
||||||
end
|
|
||||||
|
|
||||||
if @config['msg'] && @logger
|
if @config['msg'] && @logger
|
||||||
@connection.log(:warn, "#{config['msg']}")
|
@connection.log(:warn, "#{config['msg']}")
|
||||||
|
@ -156,7 +156,7 @@ module Mongo
|
|||||||
# therefore, it runs within a mutex.
|
# therefore, it runs within a mutex.
|
||||||
def checkout_new_socket
|
def checkout_new_socket
|
||||||
begin
|
begin
|
||||||
socket = self.connection.socket_class.new(@host, @port)
|
socket = @connection.socket_class.new(@host, @port, @connection.op_timeout)
|
||||||
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
||||||
socket.pool = self
|
socket.pool = self
|
||||||
rescue => ex
|
rescue => ex
|
||||||
|
@ -1,4 +1,6 @@
|
|||||||
|
require 'socket'
|
||||||
require 'openssl'
|
require 'openssl'
|
||||||
|
require 'timeout'
|
||||||
|
|
||||||
module Mongo
|
module Mongo
|
||||||
|
|
||||||
@ -9,31 +11,51 @@ module Mongo
|
|||||||
|
|
||||||
attr_accessor :pool
|
attr_accessor :pool
|
||||||
|
|
||||||
def initialize(host, port)
|
def initialize(host, port, op_timeout=nil, connect_timeout=nil)
|
||||||
@socket = ::TCPSocket.new(host, port)
|
@op_timeout = op_timeout
|
||||||
|
@connect_timeout = connect_timeout
|
||||||
|
|
||||||
|
@socket = ::TCPSocket.new(host, port)
|
||||||
@ssl = OpenSSL::SSL::SSLSocket.new(@socket)
|
@ssl = OpenSSL::SSL::SSLSocket.new(@socket)
|
||||||
@ssl.sync_close = true
|
@ssl.sync_close = true
|
||||||
@ssl.connect
|
|
||||||
|
connect
|
||||||
end
|
end
|
||||||
|
|
||||||
def setsockopt(key, value, n)
|
def connect
|
||||||
@socket.setsockopt(key, value, n)
|
if @connect_timeout
|
||||||
|
Timeout::timeout(@connect_timeout, ConnectionTimeoutError) do
|
||||||
|
@ssl.connect
|
||||||
|
end
|
||||||
|
else
|
||||||
|
@ssl.connect
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Write to the SSL socket.
|
def send(data)
|
||||||
#
|
@ssl.syswrite(data)
|
||||||
# @param buffer a buffer to send.
|
|
||||||
# @param flags socket flags. Because Ruby's SSL
|
|
||||||
def send(buffer, flags=0)
|
|
||||||
@ssl.syswrite(buffer)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def read(length, buffer)
|
def read(length, buffer)
|
||||||
@ssl.sysread(length, buffer)
|
if @op_timeout
|
||||||
|
Timeout::timeout(@op_timeout, OperationTimeout) do
|
||||||
|
@ssl.sysread(length, buffer)
|
||||||
|
end
|
||||||
|
else
|
||||||
|
@ssl.sysread(length, buffer)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def setsockopt(key, value, n)
|
||||||
|
@ssl.setsockopt(key, value, n)
|
||||||
end
|
end
|
||||||
|
|
||||||
def close
|
def close
|
||||||
@ssl.close
|
@ssl.close
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def closed?
|
||||||
|
@ssl.closed?
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -1,6 +1,82 @@
|
|||||||
module Mongo
|
require 'socket'
|
||||||
class TCPSocket < ::TCPSocket
|
|
||||||
attr_accessor :pool
|
|
||||||
|
|
||||||
end
|
module Mongo
|
||||||
|
# Wrapper class for Socket
|
||||||
|
#
|
||||||
|
# Emulates TCPSocket with operation and connection timeout
|
||||||
|
# sans Timeout::timeout
|
||||||
|
#
|
||||||
|
class TCPSocket
|
||||||
|
attr_accessor :pool
|
||||||
|
|
||||||
|
def initialize(host, port, op_timeout=nil, connect_timeout=nil)
|
||||||
|
@op_timeout = op_timeout
|
||||||
|
@connect_timeout = connect_timeout
|
||||||
|
|
||||||
|
# TODO: Prefer ipv6 if server is ipv6 enabled
|
||||||
|
@host = Socket.getaddrinfo(host, nil, Socket::AF_INET).first[3]
|
||||||
|
@port = port
|
||||||
|
@socket_address = Socket.pack_sockaddr_in(@port, @host)
|
||||||
|
@socket = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
|
||||||
|
|
||||||
|
connect
|
||||||
|
end
|
||||||
|
|
||||||
|
def connect
|
||||||
|
# Connect nonblock is broken in current versions of JRuby
|
||||||
|
if RUBY_PLATFORM == 'java'
|
||||||
|
require 'timeout'
|
||||||
|
if @connect_timeout
|
||||||
|
Timeout::timeout(@connect_timeout, OperationTimeout) do
|
||||||
|
@socket.connect(@socket_address)
|
||||||
|
end
|
||||||
|
else
|
||||||
|
@socket.connect(@socket_address)
|
||||||
|
end
|
||||||
|
else
|
||||||
|
# Try to connect for @connect_timeout seconds
|
||||||
|
begin
|
||||||
|
@socket.connect_nonblock(@socket_address)
|
||||||
|
rescue Errno::EINPROGRESS
|
||||||
|
# Block until there is a response or error
|
||||||
|
resp = IO.select([@socket], [@socket], [@socket], @connect_timeout)
|
||||||
|
if resp.nil?
|
||||||
|
raise ConnectionTimeoutError
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# If there was a failure this will raise an Error
|
||||||
|
begin
|
||||||
|
@socket.connect_nonblock(@socket_address)
|
||||||
|
rescue Errno::EISCONN
|
||||||
|
# Successfully connected
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def send(data)
|
||||||
|
@socket.write(data)
|
||||||
|
end
|
||||||
|
|
||||||
|
def read(maxlen, buffer)
|
||||||
|
# Block on data to read for @op_timeout seconds
|
||||||
|
if IO.select([@socket], nil, nil, @op_timeout)
|
||||||
|
@socket.readpartial(maxlen, buffer)
|
||||||
|
else
|
||||||
|
raise OperationTimeout
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def setsockopt(key, value, n)
|
||||||
|
@socket.setsockopt(key, value, n)
|
||||||
|
end
|
||||||
|
|
||||||
|
def close
|
||||||
|
@socket.close
|
||||||
|
end
|
||||||
|
|
||||||
|
def closed?
|
||||||
|
@socket.closed?
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
38
test/timeout_test.rb
Normal file
38
test/timeout_test.rb
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
require './test/test_helper'
|
||||||
|
|
||||||
|
class TestTimeout < Test::Unit::TestCase
|
||||||
|
def test_op_timeout
|
||||||
|
connection = standard_connection(:op_timeout => 1)
|
||||||
|
|
||||||
|
coll = connection.db(MONGO_TEST_DB).collection("test")
|
||||||
|
coll.insert({:a => 1})
|
||||||
|
|
||||||
|
# Should not timeout
|
||||||
|
assert coll.find_one({"$where" => "sleep(100); return true;"})
|
||||||
|
|
||||||
|
# Should timeout
|
||||||
|
assert_raise Mongo::OperationTimeout do
|
||||||
|
coll.find_one({"$where" => "sleep(3 * 1000); return true;"})
|
||||||
|
end
|
||||||
|
|
||||||
|
coll.remove
|
||||||
|
end
|
||||||
|
=begin
|
||||||
|
def test_ssl_op_timeout
|
||||||
|
connection = standard_connection(:op_timeout => 1, :ssl => true)
|
||||||
|
|
||||||
|
coll = connection.db(MONGO_TEST_DB).collection("test")
|
||||||
|
coll.insert({:a => 1})
|
||||||
|
|
||||||
|
# Should not timeout
|
||||||
|
assert coll.find_one({"$where" => "sleep(100); return true;"})
|
||||||
|
|
||||||
|
# Should timeout
|
||||||
|
assert_raise Mongo::OperationTimeout do
|
||||||
|
coll.find_one({"$where" => "sleep(5 * 1000); return true;"})
|
||||||
|
end
|
||||||
|
|
||||||
|
coll.remove
|
||||||
|
end
|
||||||
|
=end
|
||||||
|
end
|
@ -15,6 +15,7 @@ class NodeTest < Test::Unit::TestCase
|
|||||||
admin_db = new_mock_db
|
admin_db = new_mock_db
|
||||||
admin_db.stubs(:command).returns({'ok' => 1, 'ismaster' => 1})
|
admin_db.stubs(:command).returns({'ok' => 1, 'ismaster' => 1})
|
||||||
@connection.stubs(:[]).with('admin').returns(admin_db)
|
@connection.stubs(:[]).with('admin').returns(admin_db)
|
||||||
|
@connection.stubs(:op_timeout).returns(nil)
|
||||||
@connection.stubs(:connect_timeout).returns(nil)
|
@connection.stubs(:connect_timeout).returns(nil)
|
||||||
@connection.expects(:log)
|
@connection.expects(:log)
|
||||||
|
|
||||||
|
@ -10,7 +10,8 @@ class PoolManagerTest < Test::Unit::TestCase
|
|||||||
@db = new_mock_db
|
@db = new_mock_db
|
||||||
|
|
||||||
@connection = stub("Connection")
|
@connection = stub("Connection")
|
||||||
@connection.stubs(:connect_timeout).returns(5000)
|
@connection.stubs(:connect_timeout).returns(5)
|
||||||
|
@connection.stubs(:op_timeout).returns(5)
|
||||||
@connection.stubs(:pool_size).returns(2)
|
@connection.stubs(:pool_size).returns(2)
|
||||||
@connection.stubs(:pool_timeout).returns(100)
|
@connection.stubs(:pool_timeout).returns(100)
|
||||||
@connection.stubs(:seeds).returns(['localhost:30000'])
|
@connection.stubs(:seeds).returns(['localhost:30000'])
|
||||||
|
Loading…
Reference in New Issue
Block a user