mongo-ruby-driver/lib/mongo/util/pool.rb

210 lines
5.9 KiB
Ruby

# encoding: UTF-8
# --
# Copyright (C) 2008-2011 10gen Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
class Pool
PING_ATTEMPTS = 6
attr_accessor :host, :port, :size, :timeout, :safe, :checked_out, :connection
# Create a new pool of connections.
#
def initialize(connection, host, port, opts={})
@connection = connection
@host, @port = host, port
# A Mongo::Node object.
@node = opts[:node]
# Pool size and timeout.
@size = opts[:size] || 1
@timeout = opts[:timeout] || 5.0
# Mutex for synchronizing pool access
@connection_mutex = Mutex.new
# Condition variable for signal and wait
@queue = ConditionVariable.new
# Operations to perform on a socket
@socket_ops = Hash.new { |h, k| h[k] = [] }
@sockets = []
@pids = {}
@checked_out = []
end
def close
@sockets.each do |sock|
begin
sock.close
rescue IOError => ex
warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}"
end
end
@host = @port = nil
@sockets.clear
@pids.clear
@checked_out.clear
end
def host_string
"#{@host}:#{@port}"
end
# Return the time it takes on average
# to do a round-trip against this node.
def ping_time
trials = []
begin
PING_ATTEMPTS.times do
t1 = Time.now
self.connection['admin'].command({:ping => 1}, :socket => @node.socket)
trials << (Time.now - t1) * 1000
end
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
return nil
end
trials.sort!
trials.delete_at(trials.length-1)
trials.delete_at(0)
total = 0.0
trials.each { |t| total += t }
(total / trials.length).floor
end
# Return a socket to the pool.
def checkin(socket)
@connection_mutex.synchronize do
@checked_out.delete(socket)
@queue.signal
end
true
end
# Adds a new socket to the pool and checks it out.
#
# This method is called exclusively from #checkout;
# therefore, it runs within a mutex.
def checkout_new_socket
begin
socket = TCPSocket.new(@host, @port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
rescue => ex
raise ConnectionFailure, "Failed to connect to host #{@host} and port #{@port}: #{ex}"
end
# If any saved authentications exist, we want to apply those
# when creating new sockets.
@connection.apply_saved_authentication(:socket => socket)
@sockets << socket
@pids[socket] = Process.pid
@checked_out << socket
socket
end
# If a user calls DB#authenticate, and several sockets exist,
# then we need a way to apply the authentication on each socket.
# So we store the apply_authentication method, and this will be
# applied right before the next use of each socket.
def authenticate_existing
@connection_mutex.synchronize do
@sockets.each do |socket|
@socket_ops[socket] << Proc.new do
@connection.apply_saved_authentication(:socket => socket)
end
end
end
end
# Store the logout op for each existing socket to be applied before
# the next use of each socket.
def logout_existing(db)
@connection_mutex.synchronize do
@sockets.each do |socket|
@socket_ops[socket] << Proc.new do
@connection.db(db).issue_logout(:socket => socket)
end
end
end
end
# Checks out the first available socket from the pool.
#
# If the pid has changed, remove the socket and check out
# new one.
#
# This method is called exclusively from #checkout;
# therefore, it runs within a mutex.
def checkout_existing_socket
socket = (@sockets - @checked_out).first
if @pids[socket] != Process.pid
@pids[socket] = nil
@sockets.delete(socket)
socket.close
checkout_new_socket
else
@checked_out << socket
socket
end
end
# Check out an existing socket or create a new socket if the maximum
# pool size has not been exceeded. Otherwise, wait for the next
# available socket.
def checkout
@connection.connect if !@connection.connected?
start_time = Time.now
loop do
if (Time.now - start_time) > @timeout
raise ConnectionTimeoutError, "could not obtain connection within " +
"#{@timeout} seconds. The max pool size is currently #{@size}; " +
"consider increasing the pool size or timeout."
end
@connection_mutex.synchronize do
socket = if @checked_out.size < @sockets.size
checkout_existing_socket
elsif @sockets.size < @size
checkout_new_socket
end
if socket
# This calls all procs, in order, scoped to existing sockets.
# At the moment, we use this to lazily authenticate and
# logout existing socket connections.
@socket_ops[socket].reject! do |op|
op.call
end
return socket
else
# Otherwise, wait
@queue.wait(@connection_mutex)
end
end
end
end
end
end