Remove thread-local map and socket map (complexity creep).

This commit is contained in:
Kyle Banker 2011-11-18 15:47:06 -05:00
parent 177fad34ff
commit fa10508f07
11 changed files with 142 additions and 298 deletions

View File

@ -481,68 +481,6 @@ module Mongo
@max_bson_size
end
def get_local_reader
self.connections ||= {}
if !connected? && self.connections[self.object_id]
self.connections[self.object_id]
else
self.connections[self.object_id] = {}
end
self.connections[self.object_id][:reader] ||= checkout_reader
end
def get_local_writer
self.connections ||= {}
if !connected? && self.connections[self.object_id]
self.connections[self.object_id]
else
self.connections[self.object_id] = {}
end
self.connections[self.object_id][:writer] ||= checkout_writer
end
# Allow the current threads connection to return to the pool.
#
# Calling this method allows the socket that has been reserved
# for this thread to be returned to the pool. Other threads will
# then be able to re-use that socket. If your application uses many
# threads, or has long-running threads that infrequently perform MongoDB
# operations, then judicious use of this method can lead to performance gains.
# Care should be taken, however, to make sure that end_request is not called
# in the middle of a sequence of operations in which ordering is important. This
# could lead to unexpected results.
#
# One important case is when a thread is dying permanently. It is best to call
# end_request when you know a thread is finished, as otherwise its socket will
# not be reclaimed.
def end_request
if socket = self.connections[self.object_id][:reader]
checkin(socket)
end
if socket = self.connections[self.object_id][:writer]
checkin(socket)
end
end
# Used to close, check in, or refresh sockets held
# in thread-local variables.
def local_socket_done(socket)
if self.connections[self.object_id][:reader] == socket
if self.read_pool.sockets_low?
checkin(socket)
self.connections[self.object_id][:reader] = nil
end
end
if self.connections[self.object_id][:writer] == socket
if self.primary_pool && self.primary_pool.sockets_low?
checkin(socket)
self.connections[self.object_id][:writer] = nil
end
end
end
# Checkout a socket for reading (i.e., a secondary node).
# Note: this is overridden in ReplSetConnection.
def checkout_reader
@ -560,16 +498,12 @@ module Mongo
# Checkin a socket used for reading.
# Note: this is overridden in ReplSetConnection.
def checkin_reader(socket)
warn "Connection#checkin_writer is not deprecated and will be removed " +
"in driver v2.0. Use Connection#checkin instead."
checkin(socket)
end
# Checkin a socket used for writing.
# Note: this is overridden in ReplSetConnection.
def checkin_writer(socket)
warn "Connection#checkin_writer is not deprecated and will be removed " +
"in driver v2.0. Use Connection#checkin instead."
checkin(socket)
end

View File

@ -520,10 +520,10 @@ module Mongo
def checkout_socket_from_connection
@checkin_connection = true
if @command || @read_preference == :primary
@connection.get_local_writer
@connection.checkout_writer
else
@read_pool = @connection.read_pool
@connection.get_local_reader
@connection.checkout_reader
end
end
@ -556,7 +556,11 @@ module Mongo
@read_pool.checkin(sock)
@checkin_read_pool = false
elsif @checkin_connection
@connection.local_socket_done(sock)
if @command || @read_preference == :primary
@connection.checkin_writer(sock)
else
@connection.checkin_reader(sock)
end
@checkin_connection = false
end
end
@ -566,7 +570,11 @@ module Mongo
@read_pool.checkin(sock)
@checkin_read_pool = false
else
@connection.checkin(sock)
if @command || @read_preference == :primary
@connection.checkin_writer(sock)
else
@connection.checkin_reader(sock)
end
@checkin_connection = false
end
end

View File

@ -25,21 +25,23 @@ module Mongo
connection = opts.fetch(:connection, :writer)
add_message_headers(message, operation)
packed_message = message.to_s
if connection == :writer
sock = checkout_writer
else
sock = checkout_reader
end
begin
add_message_headers(message, operation)
packed_message = message.to_s
if connection == :writer
sock = get_local_writer
else
sock = get_local_reader
end
send_message_on_socket(packed_message, sock)
local_socket_done(sock)
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
checkin(sock)
raise ex
ensure
if connection == :writer
checkin_writer(sock)
else
checkin_reader(sock)
end
end
end
@ -64,13 +66,13 @@ module Mongo
last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY)
packed_message = message.append!(last_error_message).to_s
sock = checkout_writer
begin
sock = get_local_writer
send_message_on_socket(packed_message, sock)
docs, num_received, cursor_id = receive(sock, last_error_id)
local_socket_done(sock)
checkin_writer(sock)
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
checkin(sock)
checkin_writer(sock)
raise ex
end
@ -101,30 +103,34 @@ module Mongo
read=:primary, exhaust=false)
request_id = add_message_headers(message, operation)
packed_message = message.to_s
begin
if socket
sock = socket
should_checkin = false
if socket
sock = socket
should_checkin = false
else
if command || read == :primary
sock = checkout_writer
elsif read == :secondary
sock = checkout_reader
else
if command
sock = get_local_writer
elsif read == :primary
sock = get_local_writer
elsif read == :secondary
sock = get_local_reader
else
sock = checkout_tagged(read)
end
should_checkin = true
sock = checkout_tagged(read)
end
should_checkin = true
end
result = ''
result = ''
begin
send_message_on_socket(packed_message, sock)
result = receive(sock, request_id, exhaust)
local_socket_done(sock) if should_checkin
rescue ConnectionFailure, OperationFailure, OperationTimeout => ex
checkin(sock) if should_checkin
raise ex
ensure
if should_checkin
if command || read == :primary
checkin_writer(sock)
elsif read == :secondary
checkin_reader(sock)
else
# TODO: sock = checkout_tagged(read)
end
end
end
result
end
@ -281,7 +287,7 @@ module Mongo
total_bytes_sent
rescue => ex
close
raise ConnectionFailure, "Operation failed with the following exception: #{ex}"
raise ConnectionFailure, "Operation failed with the following exception: #{ex}:#{ex.message}"
end
end

View File

@ -105,6 +105,7 @@ module Mongo
# No connection manager by default.
@manager = nil
@pool_mutex = Mutex.new
if ![:sync, :async, false].include?(@refresh_mode)
raise MongoArgumentError,
@ -129,7 +130,6 @@ module Mongo
@refresh_version = 0
# Maps
@sockets_to_pools = {}
@threads_to_sockets = Hash.new { |h, k| h[k] = Hash.new }
@tag_map = nil
@ -264,13 +264,7 @@ module Mongo
# Close the connection to the database.
def close
@connected = false
if @refresh_thread
@refresh_thread = nil
end
@manager.close if @manager
@sockets_to_pools.clear
@manager.close(:soft => true) if @manager
@threads_to_sockets.clear
end
@ -308,95 +302,6 @@ module Mongo
end
end
def get_local_reader
self.connections ||= {}
if !connected? && self.connections[self.object_id]
self.connections[self.object_id]
else
self.connections[self.object_id] = {}
end
socket = self.connections[self.object_id][:reader] ||= checkout_reader
if self.read_pool != @sockets_to_pools[socket]
checkin(socket)
socket = self.connections[self.object_id][:reader] = checkout_reader
end
@threads_to_sockets[Thread.current][:reader] = socket
end
def get_local_writer
self.connections ||= {}
if !connected? && self.connections[self.object_id]
self.connections[self.object_id]
else
self.connections[self.object_id] = {}
end
socket = self.connections[self.object_id][:writer] ||= checkout_writer
if self.primary_pool != @sockets_to_pools[socket]
checkin(socket)
socket = self.connections[self.object_id][:writer] = checkout_writer
end
@threads_to_sockets[Thread.current][:writer] = socket
end
# Allow the current threads connection to return to the pool.
#
# Calling this method allows the socket that has been reserved
# for this thread to be returned to the pool. Other threads will
# then be able to re-use that socket. If your application uses many
# threads, or has long-running threads that infrequently perform MongoDB
# operations, then judicious use of this method can lead to performance gains.
# Care should be taken, however, to make sure that end_request is not called
# in the middle of a sequence of operations in which ordering is important. This
# could lead to unexpected results.
#
# One important case is when a thread is dying permanently. It is best to call
# end_request when you know a thread is finished, as otherwise its socket will
# not be reclaimed.
def end_request
if socket = self.connections[self.object_id][:reader]
checkin(socket)
end
if socket = self.connections[self.object_id][:writer]
checkin(socket)
end
end
# Used to close, check in, or refresh sockets held
# in thread-local variables.
def local_socket_done(socket)
if self.connections[self.object_id][:reader] == socket
if self.read_pool.sockets_low? ||
self.read_pool != @sockets_to_pools[socket]
checkin(socket)
self.connections[self.object_id][:reader] = nil
end
end
if self.connections[self.object_id][:writer] == socket
if self.primary_pool &&
(self.primary_pool.sockets_low? ||
self.primary_pool != @sockets_to_pools[socket])
checkin(socket)
self.connections[self.object_id][:writer] = nil
end
end
if (Time.now - @last_cleanup) > CLEANUP_INTERVAL &&
@cleanup_lock.try_lock
@threads_to_sockets.each do |thread, sockets|
if !thread.alive?
checkin(sockets[:reader])
checkin(sockets[:writer])
@threads_to_sockets.delete(thread)
end
end
@cleanup_lock.unlock
end
end
# Checkout a socket for reading (i.e., a secondary node).
# Note that @read_pool might point to the primary pool
# if no read pool has been defined.
@ -433,25 +338,21 @@ module Mongo
end
end
def checkin(socket)
if pool = @sockets_to_pools[socket]
pool.checkin(socket)
if !@sockets_to_pools[socket]
close_socket(socket)
end
elsif socket
# Checkin a socket used for reading.
def checkin_reader(socket)
if !self.read_pool.checkin(socket) &&
!self.primary_pool.checkin(socket)
close_socket(socket)
end
sync_refresh
end
@sockets_to_pools.delete(socket)
# Refresh synchronously every @refresh_interval seconds
# if synchronous refresh mode is enabled.
if @refresh_mode == :sync &&
((Time.now - @last_refresh) > @refresh_interval)
refresh
@last_refresh = Time.now
# Checkin a socket used for writing.
def checkin_writer(socket)
if !self.primary_pool.checkin(socket)
close_socket(socket)
end
sync_refresh
end
def close_socket(socket)
@ -466,7 +367,6 @@ module Mongo
begin
if pool
socket = pool.checkout
@sockets_to_pools[socket] = pool
socket
end
rescue ConnectionFailure => ex
@ -577,10 +477,8 @@ module Mongo
# Given a pool manager, update this connection's
# view of the replica set.
def update_config(new_manager)
old_manager = @manager
@manager = new_manager
@seeds = @manager.seeds.dup
@sockets_to_pools.clear
@refresh_version += 1
end
@ -591,11 +489,12 @@ module Mongo
if @refresh_mode == :async
return if @refresh_thread && @refresh_thread.alive?
@refresh_thread = Thread.new do
while true && @connected do
while true do
sleep(@refresh_interval)
refresh
end
end
@refresh_thread.priority = 1000
end
@last_refresh = Time.now
@ -611,7 +510,6 @@ module Mongo
pool = self.tag_map[{k.to_s => v}]
if pool
socket = pool.checkout
@sockets_to_pools[socket] = pool
return socket
end
end
@ -620,18 +518,12 @@ module Mongo
"Could not find a connection tagged with #{tags}."
end
# Checkin a socket used for reading.
def checkin_reader(socket)
warn "ReplSetConnection#checkin_writer is deprecated and will be removed " +
"in driver v2.0. Use ReplSetConnection#checkin instead."
checkin(socket)
end
# Checkin a socket used for writing.
def checkin_writer(socket)
warn "ReplSetConnection#checkin_writer is deprecated and will be removed " +
"in driver v2.0. Use ReplSetConnection#checkin instead."
checkin(socket)
def sync_refresh
if @refresh_mode == :sync &&
((Time.now - @last_refresh) > @refresh_interval)
@last_refresh = Time.now
refresh
end
end
end
end

View File

@ -36,7 +36,6 @@ module Mongo
def connect
begin
socket = nil
puts "Creating connection in node to #{@host}:#{@port}"
if @connection.connect_timeout
Mongo::TimeoutHandler.timeout(@connection.connect_timeout, OperationTimeout) do
socket = @connection.socket_class.new(@host, @port)

View File

@ -17,13 +17,11 @@
module Mongo
class Pool
PRUNE_INTERVAL = 300
PING_ATTEMPTS = 6
MAX_PING_TIME = 1_000_000
attr_accessor :host, :port, :address,
:size, :timeout, :safe, :checked_out, :connection,
:sockets_low
:size, :timeout, :safe, :checked_out, :connection
# Create a new pool of connections.
def initialize(connection, host, port, opts={})
@ -50,7 +48,6 @@ module Mongo
# Operations to perform on a socket
@socket_ops = Hash.new { |h, k| h[k] = [] }
@sockets_low = true
@sockets = []
@pids = {}
@checked_out = []
@ -74,7 +71,7 @@ module Mongo
end
sockets_to_close.each do |sock|
begin
sock.close
sock.close unless sock.closed?
rescue IOError => ex
warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}"
end
@ -90,10 +87,6 @@ module Mongo
@closed
end
def sockets_low?
@sockets_low
end
def inspect
"#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " +
"@ping_time=#{@ping_time} #{@checked_out.size}/#{@size} sockets available.>"
@ -156,8 +149,11 @@ module Mongo
# Return a socket to the pool.
def checkin(socket)
@connection_mutex.synchronize do
@checked_out.delete(socket)
@queue.signal
if @checked_out.delete(socket)
@queue.signal
else
return false
end
end
true
end
@ -263,18 +259,7 @@ module Mongo
end
@connection_mutex.synchronize do
if @size > 1000
if @sockets.size > 0.7 * @size
@sockets_low = true
else
@sockets_low = false
end
if (Time.now - @last_pruning) > PRUNE_INTERVAL
prune
@last_pruning = Time.now
end
end
#prune
socket = if @checked_out.size < @sockets.size
checkout_existing_socket

View File

@ -18,13 +18,17 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
Benchmark.bm do |x|
x.report("Connect") do
10.times do
ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]], :refresh_mode => false)
ReplSetConnection.new([self.rs.host, self.rs.ports[0]],
[self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]],
:refresh_mode => false)
end
end
@con = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]], :refresh_mode => false)
@con = ReplSetConnection.new([self.rs.host, self.rs.ports[0]],
[self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]],
:refresh_mode => false)
x.report("manager") do
man = Mongo::PoolManager.new(@con, @con.seeds)
@ -39,8 +43,10 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
self.rs.kill_all_secondaries
rescue_connection_failure do
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]], :refresh_mode => false)
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]],
[self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]],
:refresh_mode => false)
end
assert_equal [], @conn.secondaries
@ -68,33 +74,51 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
self.rs.kill_all_secondaries
rescue_connection_failure do
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]],
[self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]],
:refresh_interval => 2,
:refresh_mode => :sync)
end
assert_equal [], @conn.secondaries
assert @conn.connected?
assert_equal @conn.read_pool, @conn.primary_pool
old_refresh_version = @conn.refresh_version
self.rs.restart_killed_nodes
sleep(4)
@conn['foo']['bar'].find_one
@conn['foo']['bar'].insert({:a => 1})
puts "Old: #{old_refresh_version} New: #{@conn.refresh_version}"
assert @conn.read_pool != @conn.primary_pool, "Read pool and primary pool are identical."
assert @conn.secondaries.length > 0, "No secondaries have been added."
assert @conn.refresh_version > old_refresh_version,
"Refresh version hasn't changed."
assert @conn.secondaries.length > 0,
"No secondaries have been added."
assert @conn.read_pool != @conn.primary_pool,
"Read pool and primary pool are identical."
end
def test_automated_refresh_with_removed_node
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]], :refresh_interval => 2, :refresh_mode => :async)
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]],
[self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]],
:refresh_interval => 2,
:refresh_mode => :sync)
assert_equal 2, @conn.secondary_pools.length
assert_equal 2, @conn.secondaries.length
num_secondaries = @conn.secondary_pools.length
old_refresh_version = @conn.refresh_version
n = self.rs.remove_secondary_node
sleep(4)
@conn['foo']['bar'].find_one
puts "Old: #{old_refresh_version} New: #{@conn.refresh_version}"
assert_equal 1, @conn.secondaries.length
assert_equal 1, @conn.secondary_pools.length
assert @conn.refresh_version > old_refresh_version,
"Refresh version hasn't changed."
assert_equal num_secondaries - 1, @conn.secondaries.length
assert_equal num_secondaries - 1, @conn.secondary_pools.length
self.rs.add_node(n)
end
@ -103,17 +127,19 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]],
[self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]],
:refresh_interval => 2, :refresh_mode => :async)
:refresh_interval => 2, :refresh_mode => :sync)
self.rs.add_node
sleep(4)
@conn['foo']['bar'].find_one
@conn2 = ReplSetConnection.new([self.rs.host, self.rs.ports[0]],
[self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]],
:refresh_interval => 2, :refresh_mode => :async)
:refresh_interval => 2, :refresh_mode => :sync)
assert @conn2.secondaries == @conn.secondaries
assert @conn2.secondaries.sort == @conn.secondaries.sort,
"Second connection secondaries not equal to first."
assert_equal 3, @conn.secondary_pools.length
assert_equal 3, @conn.secondaries.length

View File

@ -18,7 +18,8 @@ class ReplicaSetRefreshWithThreadsTest < Test::Unit::TestCase
@conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]],
[self.rs.host, self.rs.ports[1]],
[self.rs.host, self.rs.ports[2]],
:refresh_interval => 2, :refresh_mode => :async,
:refresh_interval => 5,
:refresh_mode => :sync,
:read => :secondary)
@duplicate = @conn[MONGO_TEST_DB]['duplicate']
@unique = @conn[MONGO_TEST_DB]['unique']
@ -29,9 +30,9 @@ class ReplicaSetRefreshWithThreadsTest < Test::Unit::TestCase
@unique.create_index("test", :unique => true)
threads = []
100.times do
10.times do
threads << Thread.new do
100.times do |i|
1000.times do |i|
if i % 2 == 0
assert_raise Mongo::OperationFailure do
@unique.insert({"test" => "insert"}, :safe => true)
@ -40,17 +41,15 @@ class ReplicaSetRefreshWithThreadsTest < Test::Unit::TestCase
@duplicate.insert({"test" => "insert"}, :safe => true)
end
end
@conn.end_request
end
end
self.rs.add_node
sleep(4)
threads.each {|t| t.join }
config = @conn['admin'].command({:ismaster => 1})
assert_equal 3, @conn.secondary_pools.length
assert_equal 3, @conn.secondaries.length
threads.each {|t| t.join }
end
end

View File

@ -39,7 +39,6 @@ class TestThreading < Test::Unit::TestCase
@duplicate.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true)
times << Time.now - t1
end
@@con.end_request
end
end
end
@ -61,7 +60,6 @@ class TestThreading < Test::Unit::TestCase
else
@duplicate.insert({"test" => "insert"}, :safe => true)
end
@@con.end_request
end
end
@ -87,7 +85,6 @@ class TestThreading < Test::Unit::TestCase
sum += document["x"]
end
assert_equal 499500, sum
@@con.end_request
end
end

View File

@ -248,8 +248,8 @@ class ReplSetManager
raise ex
end
if status['members'].all? { |m| m['health'] == 1 &&
[1, 2, 7].include?(m['state']) } &&
status['members'].any? { |m| m['state'] == 1 }
[1, 2, 7].include?(m['state']) } &&
status['members'].any? { |m| m['state'] == 1 }
connections = []
states = []
@ -281,7 +281,7 @@ class ReplSetManager
con.close
raise Mongo::OperationFailure
end
end
end
return false
end

View File

@ -73,12 +73,11 @@ class ReadTest < Test::Unit::TestCase
should "use default value on query" do
@cursor = @col.find({:a => 1})
sock = mock()
sock.expects(:close).twice
read_pool = stub(:sockets_low? => false)
read_pool = stub(:checkin => true)
@con.stubs(:read_pool).returns(read_pool)
primary_pool = stub(:sockets_low? => false)
primary_pool = stub(:checkin => true)
@con.stubs(:primary_pool).returns(primary_pool)
@con.expects(:checkout_reader).twice.returns(sock)
@con.expects(:checkout_reader).returns(sock)
@con.expects(:receive_message).with do |o, m, l, s, c, r|
r == nil
end.returns([[], 0, 0])
@ -89,10 +88,9 @@ class ReadTest < Test::Unit::TestCase
should "allow override default value on query" do
@cursor = @col.find({:a => 1}, :read => :primary)
sock = mock()
sock.expects(:close).twice
primary_pool = stub(:sockets_low? => false)
primary_pool = stub(:checkin => true)
@con.stubs(:primary_pool).returns(primary_pool)
@con.expects(:checkout_writer).twice.returns(sock)
@con.expects(:checkout_writer).returns(sock)
@con.expects(:receive_message).with do |o, m, l, s, c, r|
r == nil
end.returns([[], 0, 0])