From fa10508f07a753301cab49f78c673c8adb1a3f9f Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Fri, 18 Nov 2011 15:47:06 -0500 Subject: [PATCH] Remove thread-local map and socket map (complexity creep). --- lib/mongo/connection.rb | 66 -------- lib/mongo/cursor.rb | 16 +- lib/mongo/networking.rb | 78 ++++----- lib/mongo/repl_set_connection.rb | 150 +++--------------- lib/mongo/util/node.rb | 1 - lib/mongo/util/pool.rb | 31 +--- test/replica_sets/refresh_test.rb | 64 +++++--- .../replica_sets/refresh_with_threads_test.rb | 13 +- test/threading_test.rb | 3 - test/tools/repl_set_manager.rb | 6 +- test/unit/read_test.rb | 12 +- 11 files changed, 142 insertions(+), 298 deletions(-) diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index bce3fe2..d48b7f1 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -481,68 +481,6 @@ module Mongo @max_bson_size end - def get_local_reader - self.connections ||= {} - if !connected? && self.connections[self.object_id] - self.connections[self.object_id] - else - self.connections[self.object_id] = {} - end - self.connections[self.object_id][:reader] ||= checkout_reader - end - - def get_local_writer - self.connections ||= {} - if !connected? && self.connections[self.object_id] - self.connections[self.object_id] - else - self.connections[self.object_id] = {} - end - self.connections[self.object_id][:writer] ||= checkout_writer - end - - # Allow the current thread’s connection to return to the pool. - # - # Calling this method allows the socket that has been reserved - # for this thread to be returned to the pool. Other threads will - # then be able to re-use that socket. If your application uses many - # threads, or has long-running threads that infrequently perform MongoDB - # operations, then judicious use of this method can lead to performance gains. - # Care should be taken, however, to make sure that end_request is not called - # in the middle of a sequence of operations in which ordering is important. This - # could lead to unexpected results. - # - # One important case is when a thread is dying permanently. It is best to call - # end_request when you know a thread is finished, as otherwise its socket will - # not be reclaimed. - def end_request - if socket = self.connections[self.object_id][:reader] - checkin(socket) - end - - if socket = self.connections[self.object_id][:writer] - checkin(socket) - end - end - - # Used to close, check in, or refresh sockets held - # in thread-local variables. - def local_socket_done(socket) - if self.connections[self.object_id][:reader] == socket - if self.read_pool.sockets_low? - checkin(socket) - self.connections[self.object_id][:reader] = nil - end - end - - if self.connections[self.object_id][:writer] == socket - if self.primary_pool && self.primary_pool.sockets_low? - checkin(socket) - self.connections[self.object_id][:writer] = nil - end - end - end - # Checkout a socket for reading (i.e., a secondary node). # Note: this is overridden in ReplSetConnection. def checkout_reader @@ -560,16 +498,12 @@ module Mongo # Checkin a socket used for reading. # Note: this is overridden in ReplSetConnection. def checkin_reader(socket) - warn "Connection#checkin_writer is not deprecated and will be removed " + - "in driver v2.0. Use Connection#checkin instead." checkin(socket) end # Checkin a socket used for writing. # Note: this is overridden in ReplSetConnection. def checkin_writer(socket) - warn "Connection#checkin_writer is not deprecated and will be removed " + - "in driver v2.0. Use Connection#checkin instead." checkin(socket) end diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index a4c6a84..7001a1d 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -520,10 +520,10 @@ module Mongo def checkout_socket_from_connection @checkin_connection = true if @command || @read_preference == :primary - @connection.get_local_writer + @connection.checkout_writer else @read_pool = @connection.read_pool - @connection.get_local_reader + @connection.checkout_reader end end @@ -556,7 +556,11 @@ module Mongo @read_pool.checkin(sock) @checkin_read_pool = false elsif @checkin_connection - @connection.local_socket_done(sock) + if @command || @read_preference == :primary + @connection.checkin_writer(sock) + else + @connection.checkin_reader(sock) + end @checkin_connection = false end end @@ -566,7 +570,11 @@ module Mongo @read_pool.checkin(sock) @checkin_read_pool = false else - @connection.checkin(sock) + if @command || @read_preference == :primary + @connection.checkin_writer(sock) + else + @connection.checkin_reader(sock) + end @checkin_connection = false end end diff --git a/lib/mongo/networking.rb b/lib/mongo/networking.rb index 5aebffc..c80e45a 100644 --- a/lib/mongo/networking.rb +++ b/lib/mongo/networking.rb @@ -25,21 +25,23 @@ module Mongo connection = opts.fetch(:connection, :writer) + add_message_headers(message, operation) + packed_message = message.to_s + + if connection == :writer + sock = checkout_writer + else + sock = checkout_reader + end + begin - add_message_headers(message, operation) - packed_message = message.to_s - - if connection == :writer - sock = get_local_writer - else - sock = get_local_reader - end - send_message_on_socket(packed_message, sock) - local_socket_done(sock) - rescue ConnectionFailure, OperationFailure, OperationTimeout => ex - checkin(sock) - raise ex + ensure + if connection == :writer + checkin_writer(sock) + else + checkin_reader(sock) + end end end @@ -64,13 +66,13 @@ module Mongo last_error_id = add_message_headers(last_error_message, Mongo::Constants::OP_QUERY) packed_message = message.append!(last_error_message).to_s + sock = checkout_writer begin - sock = get_local_writer send_message_on_socket(packed_message, sock) docs, num_received, cursor_id = receive(sock, last_error_id) - local_socket_done(sock) + checkin_writer(sock) rescue ConnectionFailure, OperationFailure, OperationTimeout => ex - checkin(sock) + checkin_writer(sock) raise ex end @@ -101,30 +103,34 @@ module Mongo read=:primary, exhaust=false) request_id = add_message_headers(message, operation) packed_message = message.to_s - begin - if socket - sock = socket - should_checkin = false + if socket + sock = socket + should_checkin = false + else + if command || read == :primary + sock = checkout_writer + elsif read == :secondary + sock = checkout_reader else - if command - sock = get_local_writer - elsif read == :primary - sock = get_local_writer - elsif read == :secondary - sock = get_local_reader - else - sock = checkout_tagged(read) - end - should_checkin = true + sock = checkout_tagged(read) end + should_checkin = true + end - result = '' + result = '' + begin send_message_on_socket(packed_message, sock) result = receive(sock, request_id, exhaust) - local_socket_done(sock) if should_checkin - rescue ConnectionFailure, OperationFailure, OperationTimeout => ex - checkin(sock) if should_checkin - raise ex + ensure + if should_checkin + if command || read == :primary + checkin_writer(sock) + elsif read == :secondary + checkin_reader(sock) + else + # TODO: sock = checkout_tagged(read) + end + end end result end @@ -281,7 +287,7 @@ module Mongo total_bytes_sent rescue => ex close - raise ConnectionFailure, "Operation failed with the following exception: #{ex}" + raise ConnectionFailure, "Operation failed with the following exception: #{ex}:#{ex.message}" end end diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index 1e36ace..f005c3f 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -105,6 +105,7 @@ module Mongo # No connection manager by default. @manager = nil + @pool_mutex = Mutex.new if ![:sync, :async, false].include?(@refresh_mode) raise MongoArgumentError, @@ -129,7 +130,6 @@ module Mongo @refresh_version = 0 # Maps - @sockets_to_pools = {} @threads_to_sockets = Hash.new { |h, k| h[k] = Hash.new } @tag_map = nil @@ -264,13 +264,7 @@ module Mongo # Close the connection to the database. def close @connected = false - - if @refresh_thread - @refresh_thread = nil - end - - @manager.close if @manager - @sockets_to_pools.clear + @manager.close(:soft => true) if @manager @threads_to_sockets.clear end @@ -308,95 +302,6 @@ module Mongo end end - def get_local_reader - self.connections ||= {} - if !connected? && self.connections[self.object_id] - self.connections[self.object_id] - else - self.connections[self.object_id] = {} - end - socket = self.connections[self.object_id][:reader] ||= checkout_reader - if self.read_pool != @sockets_to_pools[socket] - checkin(socket) - socket = self.connections[self.object_id][:reader] = checkout_reader - end - - @threads_to_sockets[Thread.current][:reader] = socket - end - - def get_local_writer - self.connections ||= {} - if !connected? && self.connections[self.object_id] - self.connections[self.object_id] - else - self.connections[self.object_id] = {} - end - socket = self.connections[self.object_id][:writer] ||= checkout_writer - if self.primary_pool != @sockets_to_pools[socket] - checkin(socket) - socket = self.connections[self.object_id][:writer] = checkout_writer - end - @threads_to_sockets[Thread.current][:writer] = socket - end - - # Allow the current thread’s connection to return to the pool. - # - # Calling this method allows the socket that has been reserved - # for this thread to be returned to the pool. Other threads will - # then be able to re-use that socket. If your application uses many - # threads, or has long-running threads that infrequently perform MongoDB - # operations, then judicious use of this method can lead to performance gains. - # Care should be taken, however, to make sure that end_request is not called - # in the middle of a sequence of operations in which ordering is important. This - # could lead to unexpected results. - # - # One important case is when a thread is dying permanently. It is best to call - # end_request when you know a thread is finished, as otherwise its socket will - # not be reclaimed. - def end_request - if socket = self.connections[self.object_id][:reader] - checkin(socket) - end - - if socket = self.connections[self.object_id][:writer] - checkin(socket) - end - end - - # Used to close, check in, or refresh sockets held - # in thread-local variables. - def local_socket_done(socket) - if self.connections[self.object_id][:reader] == socket - if self.read_pool.sockets_low? || - self.read_pool != @sockets_to_pools[socket] - checkin(socket) - self.connections[self.object_id][:reader] = nil - end - end - - if self.connections[self.object_id][:writer] == socket - if self.primary_pool && - (self.primary_pool.sockets_low? || - self.primary_pool != @sockets_to_pools[socket]) - checkin(socket) - self.connections[self.object_id][:writer] = nil - end - end - - if (Time.now - @last_cleanup) > CLEANUP_INTERVAL && - @cleanup_lock.try_lock - @threads_to_sockets.each do |thread, sockets| - if !thread.alive? - checkin(sockets[:reader]) - checkin(sockets[:writer]) - @threads_to_sockets.delete(thread) - end - end - - @cleanup_lock.unlock - end - end - # Checkout a socket for reading (i.e., a secondary node). # Note that @read_pool might point to the primary pool # if no read pool has been defined. @@ -433,25 +338,21 @@ module Mongo end end - def checkin(socket) - if pool = @sockets_to_pools[socket] - pool.checkin(socket) - if !@sockets_to_pools[socket] - close_socket(socket) - end - elsif socket + # Checkin a socket used for reading. + def checkin_reader(socket) + if !self.read_pool.checkin(socket) && + !self.primary_pool.checkin(socket) close_socket(socket) end + sync_refresh + end - @sockets_to_pools.delete(socket) - - # Refresh synchronously every @refresh_interval seconds - # if synchronous refresh mode is enabled. - if @refresh_mode == :sync && - ((Time.now - @last_refresh) > @refresh_interval) - refresh - @last_refresh = Time.now + # Checkin a socket used for writing. + def checkin_writer(socket) + if !self.primary_pool.checkin(socket) + close_socket(socket) end + sync_refresh end def close_socket(socket) @@ -466,7 +367,6 @@ module Mongo begin if pool socket = pool.checkout - @sockets_to_pools[socket] = pool socket end rescue ConnectionFailure => ex @@ -577,10 +477,8 @@ module Mongo # Given a pool manager, update this connection's # view of the replica set. def update_config(new_manager) - old_manager = @manager @manager = new_manager @seeds = @manager.seeds.dup - @sockets_to_pools.clear @refresh_version += 1 end @@ -591,11 +489,12 @@ module Mongo if @refresh_mode == :async return if @refresh_thread && @refresh_thread.alive? @refresh_thread = Thread.new do - while true && @connected do + while true do sleep(@refresh_interval) refresh end end + @refresh_thread.priority = 1000 end @last_refresh = Time.now @@ -611,7 +510,6 @@ module Mongo pool = self.tag_map[{k.to_s => v}] if pool socket = pool.checkout - @sockets_to_pools[socket] = pool return socket end end @@ -620,18 +518,12 @@ module Mongo "Could not find a connection tagged with #{tags}." end - # Checkin a socket used for reading. - def checkin_reader(socket) - warn "ReplSetConnection#checkin_writer is deprecated and will be removed " + - "in driver v2.0. Use ReplSetConnection#checkin instead." - checkin(socket) - end - - # Checkin a socket used for writing. - def checkin_writer(socket) - warn "ReplSetConnection#checkin_writer is deprecated and will be removed " + - "in driver v2.0. Use ReplSetConnection#checkin instead." - checkin(socket) + def sync_refresh + if @refresh_mode == :sync && + ((Time.now - @last_refresh) > @refresh_interval) + @last_refresh = Time.now + refresh + end end end end diff --git a/lib/mongo/util/node.rb b/lib/mongo/util/node.rb index db3ac78..38c2ff1 100644 --- a/lib/mongo/util/node.rb +++ b/lib/mongo/util/node.rb @@ -36,7 +36,6 @@ module Mongo def connect begin socket = nil - puts "Creating connection in node to #{@host}:#{@port}" if @connection.connect_timeout Mongo::TimeoutHandler.timeout(@connection.connect_timeout, OperationTimeout) do socket = @connection.socket_class.new(@host, @port) diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index a8dac53..3783e5c 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -17,13 +17,11 @@ module Mongo class Pool - PRUNE_INTERVAL = 300 PING_ATTEMPTS = 6 MAX_PING_TIME = 1_000_000 attr_accessor :host, :port, :address, - :size, :timeout, :safe, :checked_out, :connection, - :sockets_low + :size, :timeout, :safe, :checked_out, :connection # Create a new pool of connections. def initialize(connection, host, port, opts={}) @@ -50,7 +48,6 @@ module Mongo # Operations to perform on a socket @socket_ops = Hash.new { |h, k| h[k] = [] } - @sockets_low = true @sockets = [] @pids = {} @checked_out = [] @@ -74,7 +71,7 @@ module Mongo end sockets_to_close.each do |sock| begin - sock.close + sock.close unless sock.closed? rescue IOError => ex warn "IOError when attempting to close socket connected to #{@host}:#{@port}: #{ex.inspect}" end @@ -90,10 +87,6 @@ module Mongo @closed end - def sockets_low? - @sockets_low - end - def inspect "#" @@ -156,8 +149,11 @@ module Mongo # Return a socket to the pool. def checkin(socket) @connection_mutex.synchronize do - @checked_out.delete(socket) - @queue.signal + if @checked_out.delete(socket) + @queue.signal + else + return false + end end true end @@ -263,18 +259,7 @@ module Mongo end @connection_mutex.synchronize do - if @size > 1000 - if @sockets.size > 0.7 * @size - @sockets_low = true - else - @sockets_low = false - end - - if (Time.now - @last_pruning) > PRUNE_INTERVAL - prune - @last_pruning = Time.now - end - end + #prune socket = if @checked_out.size < @sockets.size checkout_existing_socket diff --git a/test/replica_sets/refresh_test.rb b/test/replica_sets/refresh_test.rb index fa0a215..f80c70f 100644 --- a/test/replica_sets/refresh_test.rb +++ b/test/replica_sets/refresh_test.rb @@ -18,13 +18,17 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase Benchmark.bm do |x| x.report("Connect") do 10.times do - ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]], - [self.rs.host, self.rs.ports[2]], :refresh_mode => false) + ReplSetConnection.new([self.rs.host, self.rs.ports[0]], + [self.rs.host, self.rs.ports[1]], + [self.rs.host, self.rs.ports[2]], + :refresh_mode => false) end end - @con = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]], - [self.rs.host, self.rs.ports[2]], :refresh_mode => false) + @con = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], + [self.rs.host, self.rs.ports[1]], + [self.rs.host, self.rs.ports[2]], + :refresh_mode => false) x.report("manager") do man = Mongo::PoolManager.new(@con, @con.seeds) @@ -39,8 +43,10 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase self.rs.kill_all_secondaries rescue_connection_failure do - @conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]], - [self.rs.host, self.rs.ports[2]], :refresh_mode => false) + @conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], + [self.rs.host, self.rs.ports[1]], + [self.rs.host, self.rs.ports[2]], + :refresh_mode => false) end assert_equal [], @conn.secondaries @@ -68,33 +74,51 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase self.rs.kill_all_secondaries rescue_connection_failure do - @conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]], - [self.rs.host, self.rs.ports[2]], :refresh_interval => 2, :refresh_mode => :async) + @conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], + [self.rs.host, self.rs.ports[1]], + [self.rs.host, self.rs.ports[2]], + :refresh_interval => 2, + :refresh_mode => :sync) end assert_equal [], @conn.secondaries assert @conn.connected? assert_equal @conn.read_pool, @conn.primary_pool + old_refresh_version = @conn.refresh_version self.rs.restart_killed_nodes sleep(4) + @conn['foo']['bar'].find_one + @conn['foo']['bar'].insert({:a => 1}) + puts "Old: #{old_refresh_version} New: #{@conn.refresh_version}" - assert @conn.read_pool != @conn.primary_pool, "Read pool and primary pool are identical." - assert @conn.secondaries.length > 0, "No secondaries have been added." + assert @conn.refresh_version > old_refresh_version, + "Refresh version hasn't changed." + assert @conn.secondaries.length > 0, + "No secondaries have been added." + assert @conn.read_pool != @conn.primary_pool, + "Read pool and primary pool are identical." end def test_automated_refresh_with_removed_node - @conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]], - [self.rs.host, self.rs.ports[2]], :refresh_interval => 2, :refresh_mode => :async) + @conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], + [self.rs.host, self.rs.ports[1]], + [self.rs.host, self.rs.ports[2]], + :refresh_interval => 2, + :refresh_mode => :sync) - assert_equal 2, @conn.secondary_pools.length - assert_equal 2, @conn.secondaries.length + num_secondaries = @conn.secondary_pools.length + old_refresh_version = @conn.refresh_version n = self.rs.remove_secondary_node sleep(4) + @conn['foo']['bar'].find_one + puts "Old: #{old_refresh_version} New: #{@conn.refresh_version}" - assert_equal 1, @conn.secondaries.length - assert_equal 1, @conn.secondary_pools.length + assert @conn.refresh_version > old_refresh_version, + "Refresh version hasn't changed." + assert_equal num_secondaries - 1, @conn.secondaries.length + assert_equal num_secondaries - 1, @conn.secondary_pools.length self.rs.add_node(n) end @@ -103,17 +127,19 @@ class ReplicaSetRefreshTest < Test::Unit::TestCase @conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]], [self.rs.host, self.rs.ports[2]], - :refresh_interval => 2, :refresh_mode => :async) + :refresh_interval => 2, :refresh_mode => :sync) self.rs.add_node sleep(4) + @conn['foo']['bar'].find_one @conn2 = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]], [self.rs.host, self.rs.ports[2]], - :refresh_interval => 2, :refresh_mode => :async) + :refresh_interval => 2, :refresh_mode => :sync) - assert @conn2.secondaries == @conn.secondaries + assert @conn2.secondaries.sort == @conn.secondaries.sort, + "Second connection secondaries not equal to first." assert_equal 3, @conn.secondary_pools.length assert_equal 3, @conn.secondaries.length diff --git a/test/replica_sets/refresh_with_threads_test.rb b/test/replica_sets/refresh_with_threads_test.rb index 8f19495..f86800d 100644 --- a/test/replica_sets/refresh_with_threads_test.rb +++ b/test/replica_sets/refresh_with_threads_test.rb @@ -18,7 +18,8 @@ class ReplicaSetRefreshWithThreadsTest < Test::Unit::TestCase @conn = ReplSetConnection.new([self.rs.host, self.rs.ports[0]], [self.rs.host, self.rs.ports[1]], [self.rs.host, self.rs.ports[2]], - :refresh_interval => 2, :refresh_mode => :async, + :refresh_interval => 5, + :refresh_mode => :sync, :read => :secondary) @duplicate = @conn[MONGO_TEST_DB]['duplicate'] @unique = @conn[MONGO_TEST_DB]['unique'] @@ -29,9 +30,9 @@ class ReplicaSetRefreshWithThreadsTest < Test::Unit::TestCase @unique.create_index("test", :unique => true) threads = [] - 100.times do + 10.times do threads << Thread.new do - 100.times do |i| + 1000.times do |i| if i % 2 == 0 assert_raise Mongo::OperationFailure do @unique.insert({"test" => "insert"}, :safe => true) @@ -40,17 +41,15 @@ class ReplicaSetRefreshWithThreadsTest < Test::Unit::TestCase @duplicate.insert({"test" => "insert"}, :safe => true) end end - @conn.end_request end end self.rs.add_node - sleep(4) + threads.each {|t| t.join } + config = @conn['admin'].command({:ismaster => 1}) assert_equal 3, @conn.secondary_pools.length assert_equal 3, @conn.secondaries.length - - threads.each {|t| t.join } end end diff --git a/test/threading_test.rb b/test/threading_test.rb index f4ec8d2..357432b 100644 --- a/test/threading_test.rb +++ b/test/threading_test.rb @@ -39,7 +39,6 @@ class TestThreading < Test::Unit::TestCase @duplicate.update({"test" => "insert"}, {"$set" => {"test" => "update"}}, :safe => true) times << Time.now - t1 end - @@con.end_request end end end @@ -61,7 +60,6 @@ class TestThreading < Test::Unit::TestCase else @duplicate.insert({"test" => "insert"}, :safe => true) end - @@con.end_request end end @@ -87,7 +85,6 @@ class TestThreading < Test::Unit::TestCase sum += document["x"] end assert_equal 499500, sum - @@con.end_request end end diff --git a/test/tools/repl_set_manager.rb b/test/tools/repl_set_manager.rb index 74a4916..71d6d40 100644 --- a/test/tools/repl_set_manager.rb +++ b/test/tools/repl_set_manager.rb @@ -248,8 +248,8 @@ class ReplSetManager raise ex end if status['members'].all? { |m| m['health'] == 1 && - [1, 2, 7].include?(m['state']) } && - status['members'].any? { |m| m['state'] == 1 } + [1, 2, 7].include?(m['state']) } && + status['members'].any? { |m| m['state'] == 1 } connections = [] states = [] @@ -281,7 +281,7 @@ class ReplSetManager con.close raise Mongo::OperationFailure end - end + end return false end diff --git a/test/unit/read_test.rb b/test/unit/read_test.rb index dba4d93..1d00a03 100644 --- a/test/unit/read_test.rb +++ b/test/unit/read_test.rb @@ -73,12 +73,11 @@ class ReadTest < Test::Unit::TestCase should "use default value on query" do @cursor = @col.find({:a => 1}) sock = mock() - sock.expects(:close).twice - read_pool = stub(:sockets_low? => false) + read_pool = stub(:checkin => true) @con.stubs(:read_pool).returns(read_pool) - primary_pool = stub(:sockets_low? => false) + primary_pool = stub(:checkin => true) @con.stubs(:primary_pool).returns(primary_pool) - @con.expects(:checkout_reader).twice.returns(sock) + @con.expects(:checkout_reader).returns(sock) @con.expects(:receive_message).with do |o, m, l, s, c, r| r == nil end.returns([[], 0, 0]) @@ -89,10 +88,9 @@ class ReadTest < Test::Unit::TestCase should "allow override default value on query" do @cursor = @col.find({:a => 1}, :read => :primary) sock = mock() - sock.expects(:close).twice - primary_pool = stub(:sockets_low? => false) + primary_pool = stub(:checkin => true) @con.stubs(:primary_pool).returns(primary_pool) - @con.expects(:checkout_writer).twice.returns(sock) + @con.expects(:checkout_writer).returns(sock) @con.expects(:receive_message).with do |o, m, l, s, c, r| r == nil end.returns([[], 0, 0])