From f668678fd1ecf5e7bad80a98117139cb24421bc4 Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Thu, 3 Nov 2011 18:37:23 -0400 Subject: [PATCH] Store sockets in thread-local variables when possible. Allow connection pools to grow if needed. All this minimizes the number of locks required and reduces the waiting time for these locks. --- lib/mongo/connection.rb | 66 +++++++++++++++++++++++++++----- lib/mongo/cursor.rb | 26 ++++++++++--- lib/mongo/networking.rb | 38 +++++++++--------- lib/mongo/repl_set_connection.rb | 37 +++++++++++++++++- lib/mongo/util/core_ext.rb | 25 ++++++++++++ lib/mongo/util/pool.rb | 63 +++++++++++++++++++++++++----- test/threading_test.rb | 19 ++++++--- 7 files changed, 222 insertions(+), 52 deletions(-) diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 3d98bc4..eee2397 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -19,10 +19,9 @@ require 'set' require 'socket' require 'thread' - module Mongo - # Instantiates and manages connections to MongoDB. + # Instantiates and manages self.connections to MongoDB. class Connection include Mongo::Logging include Mongo::Networking @@ -35,11 +34,12 @@ module Mongo DEFAULT_PORT = 27017 + mongo_thread_local_accessor :connections + attr_reader :logger, :size, :auths, :primary, :safe, :host_to_try, :pool_size, :connect_timeout, :pool_timeout, :primary_pool, :socket_class - # Create a connection to single MongoDB instance. # # You may specify whether connection to slave is permitted. @@ -63,9 +63,9 @@ module Mongo # to a single, slave node. # @option opts [Logger, #debug] :logger (nil) A Logger instance for debugging driver ops. Note that # logging negatively impacts performance; therefore, it should not be used for high-performance apps. - # @option opts [Integer] :pool_size (1) The maximum number of socket connections allowed per + # @option opts [Integer] :pool_size (1) The maximum number of socket self.connections allowed per # connection pool. Note: this setting is relevant only for multi-threaded applications. - # @option opts [Float] :pool_timeout (5.0) When all of the connections a pool are checked out, + # @option opts [Float] :pool_timeout (5.0) When all of the self.connections a pool are checked out, # this is the number of seconds to wait for a new connection to be released before throwing an exception. # Note: this setting is relevant only for multi-threaded applications (which in Ruby are rare). # @option opts [Float] :op_timeout (nil) The number of seconds to wait for a read operation to time out. @@ -80,7 +80,7 @@ module Mongo # @example localhost, 27017 # Connection.new("localhost") # - # @example localhost, 3000, max 5 connections, with max 5 seconds of wait time. + # @example localhost, 3000, max 5 self.connections, with max 5 seconds of wait time. # Connection.new("localhost", 3000, :pool_size => 5, :timeout => 5) # # @example localhost, 3000, where this node may be a slave @@ -91,7 +91,7 @@ module Mongo # @raise [ReplicaSetConnectionError] This is raised if a replica set name is specified and the # driver fails to connect to a replica set with that name. # - # @core connections + # @core self.connections def initialize(host=nil, port=nil, opts={}) @host_to_try = format_pair(host, port) @@ -481,6 +481,55 @@ module Mongo @max_bson_size end + + def get_local_reader + self.connections ||= {} + self.connections[self.object_id] ||= {} + self.connections[self.object_id][:reader] ||= checkout_reader + + # Thread.current[:connections] ||= {} + # Thread.current[:connections][self.object_id] ||= {} + # Thread.current[:connections][self.object_id][:reader] ||= checkout_reader + end + + def get_local_writer + self.connections ||= {} + self.connections[self.object_id] ||= {} + self.connections[self.object_id][:writer] ||= checkout_writer + # Thread.current[:connections] ||= {} + # Thread.current[:connections][self.object_id] ||= {} + # Thread.current[:connections][self.object_id][:writer] ||= checkout_writer + end + + # Used to close, check in, or refresh sockets held + # in thread-local variables. + def local_socket_done(socket) + #checkin(socket) + puts "Done. Threads: #{Thread.list.size}, pool_size: #{self.pool_size}" + if self.connections[self.object_id][:reader] == socket + if self.read_pool.sockets_low? + puts "***SOCKETS ARE LOW! READER****" + checkin(socket) + self.connections[self.object_id][:reader] = nil + end + end + + if self.connections[self.object_id][:writer] == socket + if self.primary_pool && self.primary_pool.sockets_low? + puts "***SOCKETS ARE LOW! WRITER****" + checkin(socket) + self.connections[self.object_id][:writer] = nil + end + end + + # if Thread.current[:connections][self.object_id][:reader] == socket + # Thread.current[:connections][self.object_id][:reader] = nil + # end + # if Thread.current[:connections][self.object_id][:writer] == socket + # Thread.current[:connections][self.object_id][:writer] = nil + # end + end + # Checkout a socket for reading (i.e., a secondary node). # Note: this is overridden in ReplSetConnection. def checkout_reader @@ -525,9 +574,6 @@ module Mongo # Default maximum BSON object size @max_bson_size = Mongo::DEFAULT_MAX_BSON_SIZE - @safe_mutex_lock = Mutex.new - @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new} - # Determine whether to use SSL. @ssl = opts.fetch(:ssl, false) if @ssl diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index cf642fd..058c27d 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -470,9 +470,11 @@ module Mongo results, @n_received, @cursor_id = @connection.receive_message( Mongo::Constants::OP_QUERY, message, nil, sock, @command, nil, @options & OP_QUERY_EXHAUST != 0) - ensure - checkin_socket(sock) unless @socket + rescue ConnectionFailure, OperationFailure, OperationTimeout => ex + force_checkin_socket(sock) + raise ex end + checkin_socket(sock) unless @socket @returned += @n_received @cache += results @query_run = true @@ -505,9 +507,11 @@ module Mongo begin results, @n_received, @cursor_id = @connection.receive_message( Mongo::Constants::OP_GET_MORE, message, nil, sock, @command, nil) - ensure - checkin_socket(sock) unless @socket + rescue ConnectionFailure, OperationFailure, OperationTimeout => ex + force_checkin_socket(sock) + raise ex end + checkin_socket(sock) unless @socket @returned += @n_received @cache += results close_cursor_if_query_complete @@ -516,10 +520,10 @@ module Mongo def checkout_socket_from_connection @checkin_connection = true if @command || @read_preference == :primary - @connection.checkout_writer + @connection.get_local_writer else @read_pool = @connection.read_pool - @connection.checkout_reader + @connection.get_local_reader end end @@ -552,6 +556,16 @@ module Mongo @read_pool.checkin(sock) @checkin_read_pool = false elsif @checkin_connection + @connection.local_socket_done(sock) + @checkin_connection = false + end + end + + def force_checkin_socket(sock) + if @checkin_read_pool + @read_pool.checkin(sock) + @checkin_read_pool = false + else @connection.checkin(sock) @checkin_connection = false end diff --git a/lib/mongo/networking.rb b/lib/mongo/networking.rb index 0b4afff..5aebffc 100644 --- a/lib/mongo/networking.rb +++ b/lib/mongo/networking.rb @@ -30,14 +30,16 @@ module Mongo packed_message = message.to_s if connection == :writer - socket = checkout_writer + sock = get_local_writer else - socket = checkout_reader + sock = get_local_reader end - send_message_on_socket(packed_message, socket) - ensure - checkin(socket) + send_message_on_socket(packed_message, sock) + local_socket_done(sock) + rescue ConnectionFailure, OperationFailure, OperationTimeout => ex + checkin(sock) + raise ex end end @@ -63,11 +65,13 @@ module Mongo packed_message = message.append!(last_error_message).to_s begin - sock = checkout_writer + sock = get_local_writer send_message_on_socket(packed_message, sock) docs, num_received, cursor_id = receive(sock, last_error_id) - ensure + local_socket_done(sock) + rescue ConnectionFailure, OperationFailure, OperationTimeout => ex checkin(sock) + raise ex end if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg']) @@ -103,11 +107,11 @@ module Mongo should_checkin = false else if command - sock = checkout_writer + sock = get_local_writer elsif read == :primary - sock = checkout_writer + sock = get_local_writer elsif read == :secondary - sock = checkout_reader + sock = get_local_reader else sock = checkout_tagged(read) end @@ -115,14 +119,12 @@ module Mongo end result = '' - @safe_mutexes[sock].synchronize do - send_message_on_socket(packed_message, sock) - result = receive(sock, request_id, exhaust) - end - ensure - if should_checkin - checkin(sock) - end + send_message_on_socket(packed_message, sock) + result = receive(sock, request_id, exhaust) + local_socket_done(sock) if should_checkin + rescue ConnectionFailure, OperationFailure, OperationTimeout => ex + checkin(sock) if should_checkin + raise ex end result end diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index f6779fa..3081aaa 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -20,7 +20,8 @@ module Mongo # Instantiates and manages connections to a MongoDB replica set. class ReplSetConnection < Connection - attr_reader :replica_set_name, :seeds, :refresh_interval, :refresh_mode + attr_reader :replica_set_name, :seeds, :refresh_interval, :refresh_mode, + :refresh_version # Create a connection to a MongoDB replica set. # @@ -122,7 +123,8 @@ module Mongo @connected = false # Store the refresher thread - @refresh_thread = nil + @refresh_thread = nil + @refresh_version = 0 # Maps @sockets_to_pools = {} @@ -301,6 +303,36 @@ module Mongo end end + def get_local_reader + Thread.current[:connections] ||= {} + Thread.current[:connections][self.object_id] ||= {} + Thread.current[:connections][self.object_id][:version] ||= self.refresh_version + Thread.current[:connections][self.object_id][:reader] ||= checkout_reader + end + + def get_local_writer + Thread.current[:connections] ||= {} + Thread.current[:connections][self.object_id] ||= {} + Thread.current[:connections][self.object_id][:version] ||= self.refresh_version + Thread.current[:connections][self.object_id][:writer] ||= checkout_writer + end + + # Used to close, check in, or refresh sockets held + # in thread-local variables. + def local_socket_done + if Thread.current[:connections][self.object_id][:version] != self.refresh_version + checkin(Thread.current[:connections][self.object_id][:reader]) + Thread.current[:connections][self.object_id][:reader] ||= checkout_reader + end + end + + def checkin_sockets + checkin(Thread.current[:connections][self.object_id][:reader]) + checkin(Thread.current[:connections][self.object_id][:writer]) + Thread.current[:connections][self.object_id][:writer] = nil + Thread.current[:connections][self.object_id][:reader] = nil + end + # Checkout a socket for reading (i.e., a secondary node). # Note that @read_pool might point to the primary pool # if no read pool has been defined. @@ -480,6 +512,7 @@ module Mongo @manager = new_manager @seeds = @manager.seeds.dup @sockets_to_pools.clear + @refresh_version += 1 old_manager.close if old_manager end diff --git a/lib/mongo/util/core_ext.rb b/lib/mongo/util/core_ext.rb index 49067b2..4899d0a 100644 --- a/lib/mongo/util/core_ext.rb +++ b/lib/mongo/util/core_ext.rb @@ -58,3 +58,28 @@ class String end end + +#:nodoc: +class Class + def mongo_thread_local_accessor name, options = {} + m = Module.new + m.module_eval do + class_variable_set :"@@#{name}", Hash.new {|h,k| h[k] = options[:default] } + end + m.module_eval %{ + + def #{name} + @@#{name}[Thread.current.object_id] + end + + def #{name}=(val) + @@#{name}[Thread.current.object_id] = val + end + } + + class_eval do + include m + extend m + end + end +end diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index 74ceffe..29aee92 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -21,7 +21,8 @@ module Mongo MAX_PING_TIME = 1_000_000 attr_accessor :host, :port, :address, - :size, :timeout, :safe, :checked_out, :connection + :size, :timeout, :safe, :checked_out, :connection, + :sockets_low # Create a new pool of connections. def initialize(connection, host, port, opts={}) @@ -36,7 +37,7 @@ module Mongo @address = "#{@host}:#{@port}" # Pool size and timeout. - @size = opts[:size] || 10 + @size = opts[:size] || 10000 @timeout = opts[:timeout] || 5.0 # Mutex for synchronizing pool access @@ -48,9 +49,11 @@ module Mongo # Operations to perform on a socket @socket_ops = Hash.new { |h, k| h[k] = [] } + @sockets_low = true @sockets = [] @pids = {} @checked_out = [] + @threads = {} @ping_time = nil @last_ping = nil @closed = false @@ -76,6 +79,10 @@ module Mongo @closed end + def sockets_low? + @sockets_low + end + def inspect "#" @@ -138,7 +145,9 @@ module Mongo # Return a socket to the pool. def checkin(socket) @connection_mutex.synchronize do + puts "deleting #{socket}, size: #{@checked_out.size}" @checked_out.delete(socket) + puts "size now: #{@checked_out.size}" @queue.signal end true @@ -166,6 +175,7 @@ module Mongo @sockets << socket @pids[socket] = Process.pid @checked_out << socket + @threads[socket] = Thread.current.object_id socket end @@ -211,10 +221,36 @@ module Mongo checkout_new_socket else @checked_out << socket + @threads[socket] = Thread.current.object_id socket end end + def cleanup + return unless @sockets.size > @size + puts "-----CLEANUP*****" + alive = {} + Thread.list.each do |t| + if t.alive? + alive[t.object_id] = true + end + end + + @checked_out.each do |socket| + if !alive[@threads[socket]] + @checked_out.delete(socket) + if @sockets.size > @size + puts "CLEANING: #{socket}" + socket.close + @sockets.delete(socket) + end + end + end + + + + end + # Check out an existing socket or create a new socket if the maximum # pool size has not been exceeded. Otherwise, wait for the next # available socket. @@ -228,21 +264,28 @@ module Mongo "consider increasing the pool size or timeout." end + puts "CHECKING OUT" + #@sockets_low = @checked_out.size > @size / 2 @connection_mutex.synchronize do + if @sockets.size > 0.7 * @size + @sockets_low = true + else + @sockets_low = false + end socket = if @checked_out.size < @sockets.size + p "checkout existing from size #{@sockets.size}" checkout_existing_socket - elsif @sockets.size < @size + else checkout_new_socket end if socket - - # This calls all procs, in order, scoped to existing sockets. - # At the moment, we use this to lazily authenticate and - # logout existing socket connections. - @socket_ops[socket].reject! do |op| - op.call - end + # This calls all procs, in order, scoped to existing sockets. + # At the moment, we use this to lazily authenticate and + # logout existing socket connections. + @socket_ops[socket].reject! do |op| + op.call + end return socket else diff --git a/test/threading_test.rb b/test/threading_test.rb index 113b2ea..c7914d9 100644 --- a/test/threading_test.rb +++ b/test/threading_test.rb @@ -4,7 +4,7 @@ class TestThreading < Test::Unit::TestCase include Mongo - @@db = standard_connection(:pool_size => 1, :timeout => 30).db(MONGO_TEST_DB) + @@db = standard_connection(:pool_size => 10, :timeout => 30).db(MONGO_TEST_DB) @@coll = @@db.collection('thread-test-collection') def set_up_safe_data @@ -21,16 +21,23 @@ class TestThreading < Test::Unit::TestCase end def test_safe_update + times = [] set_up_safe_data threads = [] 100.times do |i| threads[i] = Thread.new do - if i % 2 == 0 - assert_raise Mongo::OperationFailure do - @unique.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true) + 10.times do + if i % 2 == 0 + assert_raise Mongo::OperationFailure do + t1 = Time.now + @unique.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true) + times << Time.now - t1 + end + else + t1 = Time.now + @duplicate.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true) + times << Time.now - t1 end - else - @duplicate.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true) end end end