diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index e84b7ba..8da2b00 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -35,7 +35,7 @@ module Mongo STANDARD_HEADER_SIZE = 16 RESPONSE_HEADER_SIZE = 20 - attr_reader :logger, :size, :auths, :primary, :safe, :primary_pool, :host_to_try + attr_reader :logger, :size, :auths, :primary, :safe, :primary_pool, :host_to_try, :pool_size # Counter for generating unique request ids. @@current_request_id = 0 @@ -188,10 +188,11 @@ module Mongo # # @raise [AuthenticationError] raises an exception if any one # authentication fails. - def apply_saved_authentication + def apply_saved_authentication(opts={}) return false if @auths.empty? @auths.each do |auth| - self[auth['db_name']].authenticate(auth['username'], auth['password'], false) + self[auth['db_name']].issue_authentication(auth['username'], auth['password'], false, + :socket => opts[:socket]) end true end @@ -241,6 +242,14 @@ module Mongo true end + def authenticate_pools + @primary_pool.authenticate_existing + end + + def logout_pools(db) + @primary_pool.logout_existing(db) + end + # Return a hash with all database names # and their respective sizes on disk. # @@ -411,7 +420,13 @@ module Mongo request_id = add_message_headers(message, operation) packed_message = message.to_s begin - sock = socket || (command ? checkout_writer : checkout_reader) + if socket + sock = socket + checkin = false + else + sock = (command ? checkout_writer : checkout_reader) + checkin = true + end result = '' @safe_mutexes[sock].synchronize do @@ -419,7 +434,9 @@ module Mongo result = receive(sock, request_id) end ensure - command ? checkin_writer(sock) : checkin_reader(sock) + if checkin + command ? checkin_writer(sock) : checkin_reader(sock) + end end result end @@ -588,7 +605,7 @@ module Mongo socket = TCPSocket.new(host, port) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - config = self['admin'].command({:ismaster => 1}, :sock => socket) + config = self['admin'].command({:ismaster => 1}, :socket => socket) rescue OperationFailure, SocketError, SystemCallError, IOError => ex close ensure @@ -598,16 +615,13 @@ module Mongo config end - # Set the specified node as primary, and - # apply any saved authentication credentials. + # Set the specified node as primary. def set_primary(node) host, port = *node @primary = [host, port] @primary_pool = Pool.new(self, host, port, :size => @pool_size, :timeout => @timeout) - apply_saved_authentication end - ## Low-level connection methods. def receive(sock, expected_response) diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index 325f565..c43fb4c 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -92,7 +92,8 @@ module Mongo # @param [String] password # @param [Boolean] save_auth # Save this authentication to the connection object using Connection#add_auth. This - # will ensure that the authentication will be applied on database reconnect. + # will ensure that the authentication will be applied on database reconnect. Note + # that this value must be true when using connection pooling. # # @return [Boolean] # @@ -100,8 +101,19 @@ module Mongo # # @core authenticate authenticate-instance_method def authenticate(username, password, save_auth=true) - doc = command({:getnonce => 1}, :check_response => false) - raise "error retrieving nonce: #{doc}" unless ok?(doc) + if @connection.pool_size > 1 + if !save_auth + raise MongoArgumentError, "If using connection pooling, :save_auth must be set to true." + end + @connection.authenticate_pools + end + + issue_authentication(username, password, save_auth) + end + + def issue_authentication(username, password, save_auth=true, opts={}) + doc = command({:getnonce => 1}, :check_response => false, :socket => opts[:socket]) + raise MongoDBError, "Error retrieving nonce: #{doc}" unless ok?(doc) nonce = doc['nonce'] auth = BSON::OrderedHash.new @@ -109,7 +121,7 @@ module Mongo auth['user'] = username auth['nonce'] = nonce auth['key'] = Mongo::Support.auth_key(username, password, nonce) - if ok?(self.command(auth, :check_response => false)) + if ok?(self.command(auth, :check_response => false, :socket => opts[:socket])) if save_auth @connection.add_auth(@name, username, password) end @@ -121,7 +133,7 @@ module Mongo # Adds a stored Javascript function to the database which can executed # server-side in map_reduce, db.eval and $where clauses. - # + # # @param [String] function_name # @param [String] code # @@ -179,14 +191,22 @@ module Mongo end # Deauthorizes use for this database for this connection. Also removes - # any saved authorization in the connection class associated with this + # any saved authentication in the connection class associated with this # database. # # @raise [MongoDBError] if logging out fails. # # @return [Boolean] - def logout - doc = command(:logout => 1) + def logout(opts={}) + if @connection.pool_size > 1 + @connection.logout_pools(@name) + end + + issue_logout(opts) + end + + def issue_logout(opts={}) + doc = command({:logout => 1}, :socket => opts[:socket]) if ok?(doc) @connection.remove_auth(@name) true @@ -455,14 +475,14 @@ module Mongo # # @option opts [Boolean] :check_response (true) If +true+, raises an exception if the # command fails. - # @option opts [Socket] :sock a socket to use for sending the command. This is mainly for internal use. + # @option opts [Socket] :socket a socket to use for sending the command. This is mainly for internal use. # # @return [Hash] # # @core commands command_instance-method def command(selector, opts={}) check_response = opts.fetch(:check_response, true) - sock = opts[:sock] + socket = opts[:socket] raise MongoArgumentError, "command must be given a selector" unless selector.is_a?(Hash) && !selector.empty? if selector.keys.length > 1 && RUBY_VERSION < '1.9' && selector.class != BSON::OrderedHash raise MongoArgumentError, "DB#command requires an OrderedHash when hash contains multiple keys" @@ -470,7 +490,7 @@ module Mongo begin result = Cursor.new(system_command_collection, - :limit => -1, :selector => selector, :socket => sock).next_document + :limit => -1, :selector => selector, :socket => socket).next_document rescue OperationFailure => ex raise OperationFailure, "Database command '#{selector.keys.first}' failed: #{ex.message}" end diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index 4f4fa00..4203a34 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -167,6 +167,20 @@ module Mongo @read_secondary || @slave_ok end + def authenticate_pools + super + @secondary_pools.each do |pool| + pool.authenticate_existing + end + end + + def logout_pools(db) + super + @secondary_pools.each do |pool| + pool.logout_existing(db) + end + end + private def check_is_master(node) diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index f96d6c1..fbcb522 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -37,6 +37,9 @@ module Mongo # Condition variable for signal and wait @queue = ConditionVariable.new + # Operations to perform on a socket + @socket_ops = Hash.new { |h, k| h[k] = [] } + @sockets = [] @checked_out = [] end @@ -75,11 +78,42 @@ module Mongo rescue => ex raise ConnectionFailure, "Failed to connect socket: #{ex}" end + + # If any saved authentications exist, we want to apply those + # when creating new sockets. + @connection.apply_saved_authentication(:socket => socket) + @sockets << socket @checked_out << socket socket end + # If a use calls DB#authentication, and several sockets exist, + # then we need a way to apply the authentication on each socket. + # So we store the apply_authentication method, and this will be + # applied right before the next use of each socket. + def authenticate_existing + @connection_mutex.synchronize do + @sockets.each do |socket| + @socket_ops[socket] << Proc.new do + @connection.apply_saved_authentication(:socket => socket) + end + end + end + end + + # Store the logout op for each existing socket to be applied before + # the next use of each socket. + def logout_existing(db) + @connection_mutex.synchronize do + @sockets.each do |socket| + @socket_ops[socket] << Proc.new do + @connection.db(db).issue_logout(:socket => socket) + end + end + end + end + # Checks out the first available socket from the pool. # # This method is called exclusively from #checkout; @@ -110,14 +144,24 @@ module Mongo checkout_new_socket end - return socket if socket + if socket - # Otherwise, wait - if @logger - @logger.warn "MONGODB Waiting for available connection; " + - "#{@checked_out.size} of #{@size} connections checked out." + # This call all procs, in order, scoped to existing sockets. + # At the moment, we use this to lazily authenticate and + # logout existing socket connections. + @socket_ops[socket].reject! do |op| + op.call + end + + return socket + else + # Otherwise, wait + if @logger + @logger.warn "MONGODB Waiting for available connection; " + + "#{@checked_out.size} of #{@size} connections checked out." + end + @queue.wait(@connection_mutex) end - @queue.wait(@connection_mutex) end end end diff --git a/test/auxillary/threaded_authentication_test.rb b/test/auxillary/threaded_authentication_test.rb new file mode 100644 index 0000000..d60200a --- /dev/null +++ b/test/auxillary/threaded_authentication_test.rb @@ -0,0 +1,101 @@ +$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) +require 'mongo' +require 'thread' +require 'test/unit' +require './test/test_helper' + +# NOTE: This test requires bouncing the server. +# It also requires that a user exists on the admin database. +class AuthenticationTest < Test::Unit::TestCase + include Mongo + + def setup + @conn = standard_connection(:pool_size => 10) + @db1 = @conn.db('mongo-ruby-test-auth1') + @db2 = @conn.db('mongo-ruby-test-auth2') + @admin = @conn.db('admin') + end + + def teardown + @db1.authenticate('user1', 'secret') + @db2.authenticate('user2', 'secret') + @conn.drop_database('mongo-ruby-test-auth1') + @conn.drop_database('mongo-ruby-test-auth2') + end + + def threaded_exec + threads = [] + + 100.times do + threads << Thread.new do + yield + end + end + + 100.times do |n| + threads[n].join + end + end + + def test_authenticate + @admin.authenticate('bob', 'secret') + @db1.add_user('user1', 'secret') + @db2.add_user('user2', 'secret') + @admin.logout + + threaded_exec do + assert_raise Mongo::OperationFailure do + @db1['stuff'].insert({:a => 2}, :safe => true) + end + end + + threaded_exec do + assert_raise Mongo::OperationFailure do + @db2['stuff'].insert({:a => 2}, :safe => true) + end + end + + @db1.authenticate('user1', 'secret') + @db2.authenticate('user2', 'secret') + + threaded_exec do + assert @db1['stuff'].insert({:a => 2}, :safe => true) + end + + threaded_exec do + assert @db2['stuff'].insert({:a => 2}, :safe => true) + end + + puts "Please bounce the server." + gets + + # Here we reconnect. + begin + @db1['stuff'].find.to_a + rescue Mongo::ConnectionFailure + end + + threaded_exec do + assert @db1['stuff'].insert({:a => 2}, :safe => true) + end + + threaded_exec do + assert @db2['stuff'].insert({:a => 2}, :safe => true) + end + + @db1.logout + threaded_exec do + assert_raise Mongo::OperationFailure do + @db1['stuff'].insert({:a => 2}, :safe => true) + end + end + + @db2.logout + threaded_exec do + assert_raise Mongo::OperationFailure do + assert @db2['stuff'].insert({:a => 2}, :safe => true) + end + end + end + +end diff --git a/test/unit/connection_test.rb b/test/unit/connection_test.rb index a23977f..0491f40 100644 --- a/test/unit/connection_test.rb +++ b/test/unit/connection_test.rb @@ -67,7 +67,6 @@ class ConnectionTest < Test::Unit::TestCase admin_db = new_mock_db admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1}).twice @conn.expects(:[]).with('admin').returns(admin_db).twice - @conn.expects(:apply_saved_authentication) @conn.connect end diff --git a/test/unit/db_test.rb b/test/unit/db_test.rb index 9e60102..e84086b 100644 --- a/test/unit/db_test.rb +++ b/test/unit/db_test.rb @@ -57,6 +57,7 @@ class DBTest < Test::Unit::TestCase should "raise an error if logging out fails" do @db.expects(:command).returns({}) + @conn.expects(:pool_size).returns(1) assert_raise Mongo::MongoDBError do @db.logout end