diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 97a5dc7..b6f005c 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -30,6 +30,8 @@ module Mongo Mutex = ::Mutex ConditionVariable = ::ConditionVariable + Thread.abort_on_exception = true + DEFAULT_PORT = 27017 STANDARD_HEADER_SIZE = 16 RESPONSE_HEADER_SIZE = 20 @@ -487,8 +489,10 @@ module Mongo end result = '' - send_message_on_socket(packed_message, sock) - result = receive(sock, request_id, exhaust) + @safe_mutexes[sock].synchronize do + send_message_on_socket(packed_message, sock) + result = receive(sock, request_id, exhaust) + end ensure if should_checkin checkin(sock) @@ -628,6 +632,9 @@ module Mongo # Default maximum BSON object size @max_bson_size = Mongo::DEFAULT_MAX_BSON_SIZE + @safe_mutex_lock = Mutex.new + @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new} + # Determine whether to use SSL. @ssl = opts.fetch(:ssl, false) if @ssl diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index 10617fe..3328bec 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -23,7 +23,7 @@ module Mongo # Instantiates and manages connections to a MongoDB replica set. class ReplSetConnection < Connection attr_reader :secondaries, :arbiters, :secondary_pools, - :replica_set_name, :read_pool, :seeds, :primary_tag_pool, + :replica_set_name, :read_pool, :seeds, :tag_map, :refresh_interval, :refresh_mode # Create a connection to a MongoDB replica set. @@ -143,7 +143,7 @@ module Mongo # Maps @sockets_to_pools = {} - @primary_tag_pool = nil + @tag_map = nil # Replica set name if opts[:rs_name] @@ -197,7 +197,10 @@ module Mongo # to get the refresh lock. def refresh(opts={}) if !connected? - @logger.warn("Not connected") + log(:info, "Trying to refresh but not connected..." + + "skipping replica set health check.") + hard_refresh! + return true end log(:info, "Checking replica set connection health...") @@ -286,13 +289,6 @@ module Mongo @refresh_thread = nil end - if @nodes - @nodes.each do |member| - member.close - end - end - - @nodes = [] @read_pool = nil if @secondary_pools @@ -304,7 +300,7 @@ module Mongo @secondaries = [] @secondary_pools = [] @arbiters = [] - @primary_tag_pool = nil + @tag_map = nil @sockets_to_pools.clear end end @@ -359,7 +355,7 @@ module Mongo @primary_pool = manager.primary_pool @read_pool = manager.read_pool @secondary_pools = manager.secondary_pools - @primary_tag_pool = manager.primary_tag_pool + @tag_map = manager.tag_map @seeds = manager.seeds @manager = manager @nodes = manager.nodes @@ -407,7 +403,7 @@ module Mongo def checkout_tagged(tags) sync_synchronize(:SH) do tags.each do |k, v| - pool = @primary_tag_pool[{k.to_s => v}] + pool = @tag_map[{k.to_s => v}] if pool socket = pool.checkout @sockets_to_pools[socket] = pool @@ -472,7 +468,11 @@ module Mongo if pool = @sockets_to_pools[socket] pool.checkin(socket) elsif socket + begin socket.close + rescue IOError + log(:info, "Tried to close socket #{socket} but already closed.") + end end end diff --git a/lib/mongo/util/node.rb b/lib/mongo/util/node.rb index de639b9..7407a67 100644 --- a/lib/mongo/util/node.rb +++ b/lib/mongo/util/node.rb @@ -14,6 +14,7 @@ module Mongo end @address = "#{host}:#{port}" @config = nil + @socket = nil end def eql?(other) diff --git a/lib/mongo/util/pool_manager.rb b/lib/mongo/util/pool_manager.rb index a273d1c..b1a87d1 100644 --- a/lib/mongo/util/pool_manager.rb +++ b/lib/mongo/util/pool_manager.rb @@ -3,12 +3,13 @@ module Mongo attr_reader :connection, :seeds, :arbiters, :primary, :secondaries, :primary_pool, :read_pool, :secondary_pools, :hosts, :nodes, :max_bson_size, - :tags_to_pools, :primary_tag_pool, :members + :tags_to_pools, :tag_map, :members def initialize(connection, seeds) @connection = connection @seeds = seeds @previously_connected = false + @refresh_required = false end def inspect @@ -25,7 +26,7 @@ module Mongo initialize_pools(members) update_seed_list(members) set_read_pool - set_primary_tag_pools + set_tag_mappings @members = members @previously_connected = true @@ -36,8 +37,6 @@ module Mongo # to our view. If any of these isn't the case, # set @refresh_require to true, and return. def check_connection_health - @refresh_required = false - begin seed = get_valid_seed_node rescue ConnectionFailure @@ -108,6 +107,8 @@ module Mongo elsif member.last_state == :secondary && member.secondary? return true + else # This node isn't what it used to be. + return false end end end @@ -122,7 +123,7 @@ module Mongo @hosts = Set.new @members = Set.new @tags_to_pools = {} - @primary_tag_pool = {} + @tag_map = {} end # Connect to each member of the replica set @@ -195,12 +196,12 @@ module Mongo # If there's more than one pool associated with # a given tag, choose a close one using the bucket method. - def set_primary_tag_pools + def set_tag_mappings @tags_to_pools.each do |key, pool_list| if pool_list.length == 1 - @primary_tag_pool[key] = pool_list.first + @tag_map[key] = pool_list.first else - @primary_tag_pool[key] = nearby_pool_from_set(pool_list) + @tag_map[key] = nearby_pool_from_set(pool_list) end end end diff --git a/test/replica_sets/basic_test.rb b/test/replica_sets/basic_test.rb index d79cf9c..c590350 100644 --- a/test/replica_sets/basic_test.rb +++ b/test/replica_sets/basic_test.rb @@ -40,7 +40,7 @@ class ConnectTest < Test::Unit::TestCase assert @conn.secondary_pools.include?(@conn.read_pool) assert_equal seeds.sort {|a,b| a[1] <=> b[1]}, @conn.seeds.sort {|a,b| a[1] <=> b[1]} - assert_equal 5, @conn.tags_to_pools.keys.length + assert_equal 5, @conn.tag_map.keys.length assert_equal 90, @conn.refresh_interval assert_equal @conn.refresh_mode, :sync end diff --git a/test/replica_sets/pooled_insert_test.rb b/test/replica_sets/pooled_insert_test.rb deleted file mode 100644 index 90ed2cf..0000000 --- a/test/replica_sets/pooled_insert_test.rb +++ /dev/null @@ -1,58 +0,0 @@ -$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) -require './test/replica_sets/rs_test_helper' - -# NOTE: This test expects a replica set of three nodes to be running -# on the local host. -class ReplicaSetPooledInsertTest < Test::Unit::TestCase - include Mongo - - def setup - @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], - [RS.host, RS.ports[2]], :pool_size => 5, :timeout => 5) - @db = @conn.db(MONGO_TEST_DB) - @db.drop_collection("test-sets") - @coll = @db.collection("test-sets") - end - - def teardown - RS.restart_killed_nodes - @conn.close if @conn - end - - def test_insert - expected_results = [-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] - @coll.save({:a => -1}, :safe => true) - - RS.kill_primary - - threads = [] - 10.times do |i| - threads[i] = Thread.new do - rescue_connection_failure do - @coll.save({:a => i}, :safe => true) - end - end - end - - threads.each {|t| t.join} - - # Restart the old master and wait for sync - RS.restart_killed_nodes - sleep(1) - results = [] - - rescue_connection_failure do - @coll.find.each {|r| results << r} - expected_results.each do |a| - assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}" - end - end - - @coll.save({:a => 10}, :safe => true) - @coll.find.each {|r| results << r} - (expected_results + [10]).each do |a| - assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a} on second find" - end - end - -end diff --git a/test/replica_sets/refresh_test.rb b/test/replica_sets/refresh_test.rb index ecad881..9d7174f 100644 --- a/test/replica_sets/refresh_test.rb +++ b/test/replica_sets/refresh_test.rb @@ -20,12 +20,12 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase x.report("Connect") do 10.times do ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], - [RS.host, RS.ports[2]], :background_refresh => false) + [RS.host, RS.ports[2]], :refresh_mode => false) end end @con = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], - [RS.host, RS.ports[2]], :background_refresh => false) + [RS.host, RS.ports[2]], :refresh_mode => false) x.report("manager") do man = Mongo::PoolManager.new(@con, @con.seeds) @@ -41,7 +41,7 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase rescue_connection_failure do @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], - [RS.host, RS.ports[2]], :background_refresh => false) + [RS.host, RS.ports[2]], :refresh_mode => false) end assert_equal [], @conn.secondaries @@ -70,7 +70,7 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase rescue_connection_failure do @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], - [RS.host, RS.ports[2]], :refresh_interval => 2, :background_refresh => true) + [RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :sync) end assert_equal [], @conn.secondaries @@ -87,8 +87,9 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase def test_automated_refresh_with_removed_node @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], - [RS.host, RS.ports[2]], :refresh_interval => 2, :background_refresh => true) + [RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :sync) + p @conn.secondary_pools assert_equal 2, @conn.secondary_pools.length assert_equal 2, @conn.secondaries.length @@ -103,13 +104,13 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase def test_adding_and_removing_nodes @conn = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], - [RS.host, RS.ports[2]], :refresh_interval => 2, :background_refresh => true) + [RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async) RS.add_node sleep(5) @conn2 = ReplSetConnection.new([RS.host, RS.ports[0]], [RS.host, RS.ports[1]], - [RS.host, RS.ports[2]], :refresh_interval => 2, :background_refresh => true) + [RS.host, RS.ports[2]], :refresh_interval => 2, :refresh_mode => :async) assert @conn2.secondaries == @conn.secondaries assert_equal 3, @conn.secondary_pools.length diff --git a/test/tools/repl_set_manager.rb b/test/tools/repl_set_manager.rb index 2978ead..b916599 100644 --- a/test/tools/repl_set_manager.rb +++ b/test/tools/repl_set_manager.rb @@ -46,6 +46,7 @@ class ReplSetManager begin con = Mongo::Connection.new(@host, @start_port) rescue Mongo::ConnectionFailure + con = false end if con && ensure_up(1, con) @@ -187,10 +188,7 @@ class ReplSetManager def kill(node, signal=2) pid = @mongods[node]['pid'] puts "** Killing node with pid #{pid} at port #{@mongods[node]['port']}" - begin - get_connection(node)['admin'].command({'shutdown' => 1}) - rescue Mongo::ConnectionFailure - end + system("kill -2 #{pid}") @mongods[node]['up'] = false end @@ -300,10 +298,11 @@ class ReplSetManager private def initiate + puts "Initiating replica set..." con = get_connection attempt do - con['admin'].command({'replSetInitiate' => @config}) + p con['admin'].command({'replSetInitiate' => @config}) end con.close