diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index b6f005c..eb9f49d 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -565,6 +565,13 @@ module Mongo end alias :primary? :read_primary? + # The socket pool that this connection reads from. + # + # @return [Mongo::Pool] + def read_pool + @primary_pool + end + # The value of the read preference. Because # this is a single-node connection, the value # is +:primary+, and the connection will read diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index a0fbd15..60b593f 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -98,6 +98,10 @@ module Mongo else @command = false end + + @checkin_read_pool = false + @checkin_connection = false + @read_pool = nil end # Guess whether the cursor is alive on the server. @@ -460,10 +464,15 @@ module Mongo def send_initial_query message = construct_query_message payload = instrument_payload if @logger + sock = @socket || checkout_socket_from_connection instrument(:find, payload) do + begin results, @n_received, @cursor_id = @connection.receive_message( - Mongo::Constants::OP_QUERY, message, nil, @socket, @command, - @read_preference, @options & OP_QUERY_EXHAUST != 0) + Mongo::Constants::OP_QUERY, message, nil, sock, @command, + nil, @options & OP_QUERY_EXHAUST != 0) + ensure + checkin_socket(sock) unless @socket + end @returned += @n_received @cache += results @query_run = true @@ -491,13 +500,63 @@ module Mongo # Cursor id. message.put_long(@cursor_id) log(:debug, "cursor.refresh() for cursor #{@cursor_id}") if @logger + sock = @socket || checkout_socket_for_op_get_more + + begin results, @n_received, @cursor_id = @connection.receive_message( - Mongo::Constants::OP_GET_MORE, message, nil, @socket, @command, @read_preference) + Mongo::Constants::OP_GET_MORE, message, nil, sock, @command, nil) + ensure + checkin_socket(sock) unless @socket + end @returned += @n_received @cache += results close_cursor_if_query_complete end + def checkout_socket_from_connection + @checkin_connection = true + if @read_preference == :primary + @connection.checkout_writer + else + @read_pool = @connection.read_pool + @connection.checkout_reader + end + end + + def checkout_socket_for_op_get_more + if @read_pool && (@read_pool != @connection.read_pool) + checkout_socket_from_read_pool + else + checkout_socket_from_connection + end + end + + def checkout_socket_from_read_pool + new_pool = @connection.secondary_pools.detect do |pool| + pool.host == @read_pool.host && pool.port == @read_pool.port + end + if new_pool + @read_pool = new_pool + sock = new_pool.checkout + @checkin_read_pool = true + return sock + else + raise Mongo::OperationFailure, "Failure to continue iterating " + + "cursor because the the replica set member persisting this " + + " cursor cannot be found." + end + end + + def checkin_socket(sock) + if @checkin_read_pool + @read_pool.checkin(sock) + @checkin_read_pool = false + elsif @checkin_connection + @connection.checkin(sock) + @checkin_connection = false + end + end + def construct_query_message message = BSON::ByteBuffer.new message.put_int(@options) diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index c0381a7..c1402dd 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -119,6 +119,7 @@ module Mongo # Refresh @refresh_mode = opts.fetch(:refresh_mode, :sync) @refresh_interval = opts[:refresh_interval] || 90 + @last_refresh = Time.now if ![:sync, :async, false].include?(@refresh_mode) raise MongoArgumentError, @@ -339,6 +340,80 @@ module Mongo end end + # Checkout a socket for reading (i.e., a secondary node). + # Note that @read_pool might point to the primary pool + # if no read pool has been defined. + def checkout_reader + connect unless connected? + socket = get_socket_from_pool(@read_pool) + + if !socket + refresh + socket = get_socket_from_pool(@primary_pool) + end + + if socket + socket + else + raise ConnectionFailure.new("Could not connect to a node for reading.") + end + end + + # Checkout a socket for writing (i.e., a primary node). + def checkout_writer + connect unless connected? + socket = get_socket_from_pool(@primary_pool) + + if !socket + refresh + socket = get_socket_from_pool(@primary_pool) + end + + if socket + socket + else + raise ConnectionFailure.new("Could not connect to primary node.") + end + end + + def checkin(socket) + sync_synchronize(:SH) do + if pool = @sockets_to_pools[socket] + pool.checkin(socket) + elsif socket + begin + socket.close + rescue IOError + log(:info, "Tried to close socket #{socket} but already closed.") + end + end + end + + # Refresh synchronously every @refresh_interval seconds + # if synchronous refresh mode is enabled. + if @refresh_mode == :sync && + ((Time.now - @last_refresh) > @refresh_interval) + refresh + @last_refresh = Time.now + end + end + + def get_socket_from_pool(pool) + begin + sync_synchronize(:SH) do + if pool + socket = pool.checkout + @sockets_to_pools[socket] = pool + socket + end + end + + rescue ConnectionFailure => ex + log(:info, "Failed to checkout from #{pool} with #{ex.class}; #{ex.message}") + return nil + end + end + private # Given a pool manager, update this connection's @@ -377,25 +452,6 @@ module Mongo @last_refresh = Time.now end - # Checkout a socket for reading (i.e., a secondary node). - # Note that @read_pool might point to the primary pool - # if no read pool has been defined. - def checkout_reader - connect unless connected? - socket = get_socket_from_pool(@read_pool) - - if !socket - refresh - socket = get_socket_from_pool(@primary_pool) - end - - if socket - socket - else - raise ConnectionFailure.new("Could not connect to a node for reading.") - end - end - # Checkout a socket connected to a node with one of # the provided tags. If no such node exists, raise # an exception. @@ -417,39 +473,6 @@ module Mongo "Could not find a connection tagged with #{tags}." end - # Checkout a socket for writing (i.e., a primary node). - def checkout_writer - connect unless connected? - socket = get_socket_from_pool(@primary_pool) - - if !socket - refresh - socket = get_socket_from_pool(@primary_pool) - end - - if socket - socket - else - raise ConnectionFailure.new("Could not connect to primary node.") - end - end - - def get_socket_from_pool(pool) - begin - sync_synchronize(:SH) do - if pool - socket = pool.checkout - @sockets_to_pools[socket] = pool - socket - end - end - - rescue ConnectionFailure => ex - log(:info, "Failed to checkout from #{pool} with #{ex.class}; #{ex.message}") - return nil - end - end - # Checkin a socket used for reading. def checkin_reader(socket) warn "ReplSetConnection#checkin_writer is deprecated and will be removed " + @@ -463,27 +486,5 @@ module Mongo "in driver v2.0. Use ReplSetConnection#checkin instead." checkin(socket) end - - def checkin(socket) - sync_synchronize(:SH) do - if pool = @sockets_to_pools[socket] - pool.checkin(socket) - elsif socket - begin - socket.close - rescue IOError - log(:info, "Tried to close socket #{socket} but already closed.") - end - end - end - - # Refresh synchronously every @refresh_interval seconds - # if synchronous refresh mode is enabled. - if @refresh_mode == :sync && - ((Time.now - @last_refresh) > @refresh_interval) - refresh - @last_refresh = Time.now - end - end end end diff --git a/lib/mongo/util/pool.rb b/lib/mongo/util/pool.rb index 4b1d2fc..c70ab31 100644 --- a/lib/mongo/util/pool.rb +++ b/lib/mongo/util/pool.rb @@ -18,6 +18,7 @@ module Mongo class Pool PING_ATTEMPTS = 6 + MAX_PING_TIME = 1_000_000 attr_accessor :host, :port, :address, :size, :timeout, :safe, :checked_out, :connection @@ -35,7 +36,7 @@ module Mongo @address = "#{@host}:#{@port}" # Pool size and timeout. - @size = opts[:size] || 1 + @size = opts[:size] || 10 @timeout = opts[:timeout] || 5.0 # Mutex for synchronizing pool access @@ -52,6 +53,7 @@ module Mongo @checked_out = [] @ping_time = nil @last_ping = nil + @closed = false end def close @@ -67,9 +69,14 @@ module Mongo @sockets.clear @pids.clear @checked_out.clear + @closed = true end end + def closed? + @closed + end + def inspect "#" @@ -101,14 +108,12 @@ module Mongo # to do a round-trip against this node. def refresh_ping_time trials = [] - begin - PING_ATTEMPTS.times do - t1 = Time.now - self.connection['admin'].command({:ping => 1}, :socket => @node.socket) - trials << (Time.now - t1) * 1000 - end - rescue OperationFailure, SocketError, SystemCallError, IOError => ex - return nil + PING_ATTEMPTS.times do + t1 = Time.now + if !self.ping + return MAX_PING_TIME + end + trials << (Time.now - t1) * 1000 end trials.sort! @@ -123,6 +128,14 @@ module Mongo (total / trials.length).ceil end + def ping + begin + return self.connection['admin'].command({:ping => 1}, :socket => @node.socket) + rescue OperationFailure, SocketError, SystemCallError, IOError => ex + return false + end + end + # Return a socket to the pool. def checkin(socket) @connection_mutex.synchronize do diff --git a/test/connection_test.rb b/test/connection_test.rb index 9b3599a..dd7caae 100644 --- a/test/connection_test.rb +++ b/test/connection_test.rb @@ -318,7 +318,7 @@ class TestConnection < Test::Unit::TestCase TCPSocket.stubs(:new).returns(fake_socket) @con.primary_pool.checkout_new_socket - assert_equal [], @con.primary_pool.close + assert @con.primary_pool.close end end end diff --git a/test/cursor_test.rb b/test/cursor_test.rb index 65a95aa..9ade4fb 100644 --- a/test/cursor_test.rb +++ b/test/cursor_test.rb @@ -54,7 +54,7 @@ class CursorTest < Test::Unit::TestCase if @@version >= "2.0" @@coll.remove data = "1" * 100_000 - 10_000.times do |n| + 5000.times do |n| @@coll.insert({:n => n, :data => data}) end @@ -65,7 +65,7 @@ class CursorTest < Test::Unit::TestCase c = Cursor.new(@@coll) c.add_option(OP_QUERY_EXHAUST) - 9999.times do + 4999.times do c.next end assert c.has_next? diff --git a/test/unit/collection_test.rb b/test/unit/collection_test.rb index 88ac6a2..b49b58d 100644 --- a/test/unit/collection_test.rb +++ b/test/unit/collection_test.rb @@ -36,6 +36,7 @@ class CollectionTest < Test::Unit::TestCase @conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false) @db = @conn['testing'] @coll = @db.collection('books') + @conn.expects(:checkout_writer).returns(mock()) @conn.expects(:receive_message).with do |op, msg, log, sock| op == 2004 end.returns([[], 0, 0]) diff --git a/test/unit/read_test.rb b/test/unit/read_test.rb index 0a34668..b27f2ef 100644 --- a/test/unit/read_test.rb +++ b/test/unit/read_test.rb @@ -10,7 +10,7 @@ class ReadTest < Test::Unit::TestCase end - context "Read mode on connection: " do + context "Read mode on replica set connection: " do setup do @read_preference = :secondary @con = Mongo::ReplSetConnection.new(['localhost', 27017], :read => @read_preference, :connect => false) @@ -71,19 +71,27 @@ class ReadTest < Test::Unit::TestCase end should "use default value on query" do + @cursor = @col.find({:a => 1}) + sock = mock() + sock.expects(:close) + @con.expects(:checkout_reader).returns(sock) @con.expects(:receive_message).with do |o, m, l, s, c, r| - r == :secondary + r == nil end.returns([[], 0, 0]) - @col.find_one({:a => 1}) + @cursor.next end should "allow override default value on query" do + @cursor = @col.find({:a => 1}, :read => :primary) + sock = mock() + sock.expects(:close) + @con.expects(:checkout_writer).returns(sock) @con.expects(:receive_message).with do |o, m, l, s, c, r| - r == :primary + r == nil end.returns([[], 0, 0]) - @col.find_one({:a => 1}, :read => :primary) + @cursor.next end should "allow override alternate value on query" do