RUBY-232 handle authentication with connection pooling
This commit is contained in:
parent
285752a7ad
commit
7c4740c47c
|
@ -35,7 +35,7 @@ module Mongo
|
|||
STANDARD_HEADER_SIZE = 16
|
||||
RESPONSE_HEADER_SIZE = 20
|
||||
|
||||
attr_reader :logger, :size, :auths, :primary, :safe, :primary_pool, :host_to_try
|
||||
attr_reader :logger, :size, :auths, :primary, :safe, :primary_pool, :host_to_try, :pool_size
|
||||
|
||||
# Counter for generating unique request ids.
|
||||
@@current_request_id = 0
|
||||
|
@ -188,10 +188,11 @@ module Mongo
|
|||
#
|
||||
# @raise [AuthenticationError] raises an exception if any one
|
||||
# authentication fails.
|
||||
def apply_saved_authentication
|
||||
def apply_saved_authentication(opts={})
|
||||
return false if @auths.empty?
|
||||
@auths.each do |auth|
|
||||
self[auth['db_name']].authenticate(auth['username'], auth['password'], false)
|
||||
self[auth['db_name']].issue_authentication(auth['username'], auth['password'], false,
|
||||
:socket => opts[:socket])
|
||||
end
|
||||
true
|
||||
end
|
||||
|
@ -241,6 +242,14 @@ module Mongo
|
|||
true
|
||||
end
|
||||
|
||||
def authenticate_pools
|
||||
@primary_pool.authenticate_existing
|
||||
end
|
||||
|
||||
def logout_pools(db)
|
||||
@primary_pool.logout_existing(db)
|
||||
end
|
||||
|
||||
# Return a hash with all database names
|
||||
# and their respective sizes on disk.
|
||||
#
|
||||
|
@ -411,7 +420,13 @@ module Mongo
|
|||
request_id = add_message_headers(message, operation)
|
||||
packed_message = message.to_s
|
||||
begin
|
||||
sock = socket || (command ? checkout_writer : checkout_reader)
|
||||
if socket
|
||||
sock = socket
|
||||
checkin = false
|
||||
else
|
||||
sock = (command ? checkout_writer : checkout_reader)
|
||||
checkin = true
|
||||
end
|
||||
|
||||
result = ''
|
||||
@safe_mutexes[sock].synchronize do
|
||||
|
@ -419,7 +434,9 @@ module Mongo
|
|||
result = receive(sock, request_id)
|
||||
end
|
||||
ensure
|
||||
command ? checkin_writer(sock) : checkin_reader(sock)
|
||||
if checkin
|
||||
command ? checkin_writer(sock) : checkin_reader(sock)
|
||||
end
|
||||
end
|
||||
result
|
||||
end
|
||||
|
@ -588,7 +605,7 @@ module Mongo
|
|||
socket = TCPSocket.new(host, port)
|
||||
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
|
||||
|
||||
config = self['admin'].command({:ismaster => 1}, :sock => socket)
|
||||
config = self['admin'].command({:ismaster => 1}, :socket => socket)
|
||||
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
|
||||
close
|
||||
ensure
|
||||
|
@ -598,16 +615,13 @@ module Mongo
|
|||
config
|
||||
end
|
||||
|
||||
# Set the specified node as primary, and
|
||||
# apply any saved authentication credentials.
|
||||
# Set the specified node as primary.
|
||||
def set_primary(node)
|
||||
host, port = *node
|
||||
@primary = [host, port]
|
||||
@primary_pool = Pool.new(self, host, port, :size => @pool_size, :timeout => @timeout)
|
||||
apply_saved_authentication
|
||||
end
|
||||
|
||||
|
||||
## Low-level connection methods.
|
||||
|
||||
def receive(sock, expected_response)
|
||||
|
|
|
@ -92,7 +92,8 @@ module Mongo
|
|||
# @param [String] password
|
||||
# @param [Boolean] save_auth
|
||||
# Save this authentication to the connection object using Connection#add_auth. This
|
||||
# will ensure that the authentication will be applied on database reconnect.
|
||||
# will ensure that the authentication will be applied on database reconnect. Note
|
||||
# that this value must be true when using connection pooling.
|
||||
#
|
||||
# @return [Boolean]
|
||||
#
|
||||
|
@ -100,8 +101,19 @@ module Mongo
|
|||
#
|
||||
# @core authenticate authenticate-instance_method
|
||||
def authenticate(username, password, save_auth=true)
|
||||
doc = command({:getnonce => 1}, :check_response => false)
|
||||
raise "error retrieving nonce: #{doc}" unless ok?(doc)
|
||||
if @connection.pool_size > 1
|
||||
if !save_auth
|
||||
raise MongoArgumentError, "If using connection pooling, :save_auth must be set to true."
|
||||
end
|
||||
@connection.authenticate_pools
|
||||
end
|
||||
|
||||
issue_authentication(username, password, save_auth)
|
||||
end
|
||||
|
||||
def issue_authentication(username, password, save_auth=true, opts={})
|
||||
doc = command({:getnonce => 1}, :check_response => false, :socket => opts[:socket])
|
||||
raise MongoDBError, "Error retrieving nonce: #{doc}" unless ok?(doc)
|
||||
nonce = doc['nonce']
|
||||
|
||||
auth = BSON::OrderedHash.new
|
||||
|
@ -109,7 +121,7 @@ module Mongo
|
|||
auth['user'] = username
|
||||
auth['nonce'] = nonce
|
||||
auth['key'] = Mongo::Support.auth_key(username, password, nonce)
|
||||
if ok?(self.command(auth, :check_response => false))
|
||||
if ok?(self.command(auth, :check_response => false, :socket => opts[:socket]))
|
||||
if save_auth
|
||||
@connection.add_auth(@name, username, password)
|
||||
end
|
||||
|
@ -179,14 +191,22 @@ module Mongo
|
|||
end
|
||||
|
||||
# Deauthorizes use for this database for this connection. Also removes
|
||||
# any saved authorization in the connection class associated with this
|
||||
# any saved authentication in the connection class associated with this
|
||||
# database.
|
||||
#
|
||||
# @raise [MongoDBError] if logging out fails.
|
||||
#
|
||||
# @return [Boolean]
|
||||
def logout
|
||||
doc = command(:logout => 1)
|
||||
def logout(opts={})
|
||||
if @connection.pool_size > 1
|
||||
@connection.logout_pools(@name)
|
||||
end
|
||||
|
||||
issue_logout(opts)
|
||||
end
|
||||
|
||||
def issue_logout(opts={})
|
||||
doc = command({:logout => 1}, :socket => opts[:socket])
|
||||
if ok?(doc)
|
||||
@connection.remove_auth(@name)
|
||||
true
|
||||
|
@ -455,14 +475,14 @@ module Mongo
|
|||
#
|
||||
# @option opts [Boolean] :check_response (true) If +true+, raises an exception if the
|
||||
# command fails.
|
||||
# @option opts [Socket] :sock a socket to use for sending the command. This is mainly for internal use.
|
||||
# @option opts [Socket] :socket a socket to use for sending the command. This is mainly for internal use.
|
||||
#
|
||||
# @return [Hash]
|
||||
#
|
||||
# @core commands command_instance-method
|
||||
def command(selector, opts={})
|
||||
check_response = opts.fetch(:check_response, true)
|
||||
sock = opts[:sock]
|
||||
socket = opts[:socket]
|
||||
raise MongoArgumentError, "command must be given a selector" unless selector.is_a?(Hash) && !selector.empty?
|
||||
if selector.keys.length > 1 && RUBY_VERSION < '1.9' && selector.class != BSON::OrderedHash
|
||||
raise MongoArgumentError, "DB#command requires an OrderedHash when hash contains multiple keys"
|
||||
|
@ -470,7 +490,7 @@ module Mongo
|
|||
|
||||
begin
|
||||
result = Cursor.new(system_command_collection,
|
||||
:limit => -1, :selector => selector, :socket => sock).next_document
|
||||
:limit => -1, :selector => selector, :socket => socket).next_document
|
||||
rescue OperationFailure => ex
|
||||
raise OperationFailure, "Database command '#{selector.keys.first}' failed: #{ex.message}"
|
||||
end
|
||||
|
|
|
@ -167,6 +167,20 @@ module Mongo
|
|||
@read_secondary || @slave_ok
|
||||
end
|
||||
|
||||
def authenticate_pools
|
||||
super
|
||||
@secondary_pools.each do |pool|
|
||||
pool.authenticate_existing
|
||||
end
|
||||
end
|
||||
|
||||
def logout_pools(db)
|
||||
super
|
||||
@secondary_pools.each do |pool|
|
||||
pool.logout_existing(db)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def check_is_master(node)
|
||||
|
|
|
@ -37,6 +37,9 @@ module Mongo
|
|||
# Condition variable for signal and wait
|
||||
@queue = ConditionVariable.new
|
||||
|
||||
# Operations to perform on a socket
|
||||
@socket_ops = Hash.new { |h, k| h[k] = [] }
|
||||
|
||||
@sockets = []
|
||||
@checked_out = []
|
||||
end
|
||||
|
@ -75,11 +78,42 @@ module Mongo
|
|||
rescue => ex
|
||||
raise ConnectionFailure, "Failed to connect socket: #{ex}"
|
||||
end
|
||||
|
||||
# If any saved authentications exist, we want to apply those
|
||||
# when creating new sockets.
|
||||
@connection.apply_saved_authentication(:socket => socket)
|
||||
|
||||
@sockets << socket
|
||||
@checked_out << socket
|
||||
socket
|
||||
end
|
||||
|
||||
# If a use calls DB#authentication, and several sockets exist,
|
||||
# then we need a way to apply the authentication on each socket.
|
||||
# So we store the apply_authentication method, and this will be
|
||||
# applied right before the next use of each socket.
|
||||
def authenticate_existing
|
||||
@connection_mutex.synchronize do
|
||||
@sockets.each do |socket|
|
||||
@socket_ops[socket] << Proc.new do
|
||||
@connection.apply_saved_authentication(:socket => socket)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Store the logout op for each existing socket to be applied before
|
||||
# the next use of each socket.
|
||||
def logout_existing(db)
|
||||
@connection_mutex.synchronize do
|
||||
@sockets.each do |socket|
|
||||
@socket_ops[socket] << Proc.new do
|
||||
@connection.db(db).issue_logout(:socket => socket)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Checks out the first available socket from the pool.
|
||||
#
|
||||
# This method is called exclusively from #checkout;
|
||||
|
@ -110,14 +144,24 @@ module Mongo
|
|||
checkout_new_socket
|
||||
end
|
||||
|
||||
return socket if socket
|
||||
if socket
|
||||
|
||||
# Otherwise, wait
|
||||
if @logger
|
||||
@logger.warn "MONGODB Waiting for available connection; " +
|
||||
"#{@checked_out.size} of #{@size} connections checked out."
|
||||
# This call all procs, in order, scoped to existing sockets.
|
||||
# At the moment, we use this to lazily authenticate and
|
||||
# logout existing socket connections.
|
||||
@socket_ops[socket].reject! do |op|
|
||||
op.call
|
||||
end
|
||||
|
||||
return socket
|
||||
else
|
||||
# Otherwise, wait
|
||||
if @logger
|
||||
@logger.warn "MONGODB Waiting for available connection; " +
|
||||
"#{@checked_out.size} of #{@size} connections checked out."
|
||||
end
|
||||
@queue.wait(@connection_mutex)
|
||||
end
|
||||
@queue.wait(@connection_mutex)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
|
||||
require 'mongo'
|
||||
require 'thread'
|
||||
require 'test/unit'
|
||||
require './test/test_helper'
|
||||
|
||||
# NOTE: This test requires bouncing the server.
|
||||
# It also requires that a user exists on the admin database.
|
||||
class AuthenticationTest < Test::Unit::TestCase
|
||||
include Mongo
|
||||
|
||||
def setup
|
||||
@conn = standard_connection(:pool_size => 10)
|
||||
@db1 = @conn.db('mongo-ruby-test-auth1')
|
||||
@db2 = @conn.db('mongo-ruby-test-auth2')
|
||||
@admin = @conn.db('admin')
|
||||
end
|
||||
|
||||
def teardown
|
||||
@db1.authenticate('user1', 'secret')
|
||||
@db2.authenticate('user2', 'secret')
|
||||
@conn.drop_database('mongo-ruby-test-auth1')
|
||||
@conn.drop_database('mongo-ruby-test-auth2')
|
||||
end
|
||||
|
||||
def threaded_exec
|
||||
threads = []
|
||||
|
||||
100.times do
|
||||
threads << Thread.new do
|
||||
yield
|
||||
end
|
||||
end
|
||||
|
||||
100.times do |n|
|
||||
threads[n].join
|
||||
end
|
||||
end
|
||||
|
||||
def test_authenticate
|
||||
@admin.authenticate('bob', 'secret')
|
||||
@db1.add_user('user1', 'secret')
|
||||
@db2.add_user('user2', 'secret')
|
||||
@admin.logout
|
||||
|
||||
threaded_exec do
|
||||
assert_raise Mongo::OperationFailure do
|
||||
@db1['stuff'].insert({:a => 2}, :safe => true)
|
||||
end
|
||||
end
|
||||
|
||||
threaded_exec do
|
||||
assert_raise Mongo::OperationFailure do
|
||||
@db2['stuff'].insert({:a => 2}, :safe => true)
|
||||
end
|
||||
end
|
||||
|
||||
@db1.authenticate('user1', 'secret')
|
||||
@db2.authenticate('user2', 'secret')
|
||||
|
||||
threaded_exec do
|
||||
assert @db1['stuff'].insert({:a => 2}, :safe => true)
|
||||
end
|
||||
|
||||
threaded_exec do
|
||||
assert @db2['stuff'].insert({:a => 2}, :safe => true)
|
||||
end
|
||||
|
||||
puts "Please bounce the server."
|
||||
gets
|
||||
|
||||
# Here we reconnect.
|
||||
begin
|
||||
@db1['stuff'].find.to_a
|
||||
rescue Mongo::ConnectionFailure
|
||||
end
|
||||
|
||||
threaded_exec do
|
||||
assert @db1['stuff'].insert({:a => 2}, :safe => true)
|
||||
end
|
||||
|
||||
threaded_exec do
|
||||
assert @db2['stuff'].insert({:a => 2}, :safe => true)
|
||||
end
|
||||
|
||||
@db1.logout
|
||||
threaded_exec do
|
||||
assert_raise Mongo::OperationFailure do
|
||||
@db1['stuff'].insert({:a => 2}, :safe => true)
|
||||
end
|
||||
end
|
||||
|
||||
@db2.logout
|
||||
threaded_exec do
|
||||
assert_raise Mongo::OperationFailure do
|
||||
assert @db2['stuff'].insert({:a => 2}, :safe => true)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
|
@ -67,7 +67,6 @@ class ConnectionTest < Test::Unit::TestCase
|
|||
admin_db = new_mock_db
|
||||
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1}).twice
|
||||
@conn.expects(:[]).with('admin').returns(admin_db).twice
|
||||
@conn.expects(:apply_saved_authentication)
|
||||
@conn.connect
|
||||
end
|
||||
|
||||
|
|
|
@ -57,6 +57,7 @@ class DBTest < Test::Unit::TestCase
|
|||
|
||||
should "raise an error if logging out fails" do
|
||||
@db.expects(:command).returns({})
|
||||
@conn.expects(:pool_size).returns(1)
|
||||
assert_raise Mongo::MongoDBError do
|
||||
@db.logout
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue