Ensure that cursor requiring getmore ops will not

be affected by replica set refresh.

Prep for sending commands to secondaries.
This commit is contained in:
Kyle Banker 2011-10-17 14:41:09 -04:00
parent 1aad8d1e14
commit 7a11bb18a9
8 changed files with 183 additions and 94 deletions

View File

@ -565,6 +565,13 @@ module Mongo
end end
alias :primary? :read_primary? alias :primary? :read_primary?
# The socket pool that this connection reads from.
#
# @return [Mongo::Pool]
def read_pool
@primary_pool
end
# The value of the read preference. Because # The value of the read preference. Because
# this is a single-node connection, the value # this is a single-node connection, the value
# is +:primary+, and the connection will read # is +:primary+, and the connection will read

View File

@ -98,6 +98,10 @@ module Mongo
else else
@command = false @command = false
end end
@checkin_read_pool = false
@checkin_connection = false
@read_pool = nil
end end
# Guess whether the cursor is alive on the server. # Guess whether the cursor is alive on the server.
@ -460,10 +464,15 @@ module Mongo
def send_initial_query def send_initial_query
message = construct_query_message message = construct_query_message
payload = instrument_payload if @logger payload = instrument_payload if @logger
sock = @socket || checkout_socket_from_connection
instrument(:find, payload) do instrument(:find, payload) do
begin
results, @n_received, @cursor_id = @connection.receive_message( results, @n_received, @cursor_id = @connection.receive_message(
Mongo::Constants::OP_QUERY, message, nil, @socket, @command, Mongo::Constants::OP_QUERY, message, nil, sock, @command,
@read_preference, @options & OP_QUERY_EXHAUST != 0) nil, @options & OP_QUERY_EXHAUST != 0)
ensure
checkin_socket(sock) unless @socket
end
@returned += @n_received @returned += @n_received
@cache += results @cache += results
@query_run = true @query_run = true
@ -491,13 +500,63 @@ module Mongo
# Cursor id. # Cursor id.
message.put_long(@cursor_id) message.put_long(@cursor_id)
log(:debug, "cursor.refresh() for cursor #{@cursor_id}") if @logger log(:debug, "cursor.refresh() for cursor #{@cursor_id}") if @logger
sock = @socket || checkout_socket_for_op_get_more
begin
results, @n_received, @cursor_id = @connection.receive_message( results, @n_received, @cursor_id = @connection.receive_message(
Mongo::Constants::OP_GET_MORE, message, nil, @socket, @command, @read_preference) Mongo::Constants::OP_GET_MORE, message, nil, sock, @command, nil)
ensure
checkin_socket(sock) unless @socket
end
@returned += @n_received @returned += @n_received
@cache += results @cache += results
close_cursor_if_query_complete close_cursor_if_query_complete
end end
def checkout_socket_from_connection
@checkin_connection = true
if @read_preference == :primary
@connection.checkout_writer
else
@read_pool = @connection.read_pool
@connection.checkout_reader
end
end
def checkout_socket_for_op_get_more
if @read_pool && (@read_pool != @connection.read_pool)
checkout_socket_from_read_pool
else
checkout_socket_from_connection
end
end
def checkout_socket_from_read_pool
new_pool = @connection.secondary_pools.detect do |pool|
pool.host == @read_pool.host && pool.port == @read_pool.port
end
if new_pool
@read_pool = new_pool
sock = new_pool.checkout
@checkin_read_pool = true
return sock
else
raise Mongo::OperationFailure, "Failure to continue iterating " +
"cursor because the the replica set member persisting this " +
" cursor cannot be found."
end
end
def checkin_socket(sock)
if @checkin_read_pool
@read_pool.checkin(sock)
@checkin_read_pool = false
elsif @checkin_connection
@connection.checkin(sock)
@checkin_connection = false
end
end
def construct_query_message def construct_query_message
message = BSON::ByteBuffer.new message = BSON::ByteBuffer.new
message.put_int(@options) message.put_int(@options)

View File

@ -119,6 +119,7 @@ module Mongo
# Refresh # Refresh
@refresh_mode = opts.fetch(:refresh_mode, :sync) @refresh_mode = opts.fetch(:refresh_mode, :sync)
@refresh_interval = opts[:refresh_interval] || 90 @refresh_interval = opts[:refresh_interval] || 90
@last_refresh = Time.now
if ![:sync, :async, false].include?(@refresh_mode) if ![:sync, :async, false].include?(@refresh_mode)
raise MongoArgumentError, raise MongoArgumentError,
@ -339,6 +340,80 @@ module Mongo
end end
end end
# Checkout a socket for reading (i.e., a secondary node).
# Note that @read_pool might point to the primary pool
# if no read pool has been defined.
def checkout_reader
connect unless connected?
socket = get_socket_from_pool(@read_pool)
if !socket
refresh
socket = get_socket_from_pool(@primary_pool)
end
if socket
socket
else
raise ConnectionFailure.new("Could not connect to a node for reading.")
end
end
# Checkout a socket for writing (i.e., a primary node).
def checkout_writer
connect unless connected?
socket = get_socket_from_pool(@primary_pool)
if !socket
refresh
socket = get_socket_from_pool(@primary_pool)
end
if socket
socket
else
raise ConnectionFailure.new("Could not connect to primary node.")
end
end
def checkin(socket)
sync_synchronize(:SH) do
if pool = @sockets_to_pools[socket]
pool.checkin(socket)
elsif socket
begin
socket.close
rescue IOError
log(:info, "Tried to close socket #{socket} but already closed.")
end
end
end
# Refresh synchronously every @refresh_interval seconds
# if synchronous refresh mode is enabled.
if @refresh_mode == :sync &&
((Time.now - @last_refresh) > @refresh_interval)
refresh
@last_refresh = Time.now
end
end
def get_socket_from_pool(pool)
begin
sync_synchronize(:SH) do
if pool
socket = pool.checkout
@sockets_to_pools[socket] = pool
socket
end
end
rescue ConnectionFailure => ex
log(:info, "Failed to checkout from #{pool} with #{ex.class}; #{ex.message}")
return nil
end
end
private private
# Given a pool manager, update this connection's # Given a pool manager, update this connection's
@ -377,25 +452,6 @@ module Mongo
@last_refresh = Time.now @last_refresh = Time.now
end end
# Checkout a socket for reading (i.e., a secondary node).
# Note that @read_pool might point to the primary pool
# if no read pool has been defined.
def checkout_reader
connect unless connected?
socket = get_socket_from_pool(@read_pool)
if !socket
refresh
socket = get_socket_from_pool(@primary_pool)
end
if socket
socket
else
raise ConnectionFailure.new("Could not connect to a node for reading.")
end
end
# Checkout a socket connected to a node with one of # Checkout a socket connected to a node with one of
# the provided tags. If no such node exists, raise # the provided tags. If no such node exists, raise
# an exception. # an exception.
@ -417,39 +473,6 @@ module Mongo
"Could not find a connection tagged with #{tags}." "Could not find a connection tagged with #{tags}."
end end
# Checkout a socket for writing (i.e., a primary node).
def checkout_writer
connect unless connected?
socket = get_socket_from_pool(@primary_pool)
if !socket
refresh
socket = get_socket_from_pool(@primary_pool)
end
if socket
socket
else
raise ConnectionFailure.new("Could not connect to primary node.")
end
end
def get_socket_from_pool(pool)
begin
sync_synchronize(:SH) do
if pool
socket = pool.checkout
@sockets_to_pools[socket] = pool
socket
end
end
rescue ConnectionFailure => ex
log(:info, "Failed to checkout from #{pool} with #{ex.class}; #{ex.message}")
return nil
end
end
# Checkin a socket used for reading. # Checkin a socket used for reading.
def checkin_reader(socket) def checkin_reader(socket)
warn "ReplSetConnection#checkin_writer is deprecated and will be removed " + warn "ReplSetConnection#checkin_writer is deprecated and will be removed " +
@ -463,27 +486,5 @@ module Mongo
"in driver v2.0. Use ReplSetConnection#checkin instead." "in driver v2.0. Use ReplSetConnection#checkin instead."
checkin(socket) checkin(socket)
end end
def checkin(socket)
sync_synchronize(:SH) do
if pool = @sockets_to_pools[socket]
pool.checkin(socket)
elsif socket
begin
socket.close
rescue IOError
log(:info, "Tried to close socket #{socket} but already closed.")
end
end
end
# Refresh synchronously every @refresh_interval seconds
# if synchronous refresh mode is enabled.
if @refresh_mode == :sync &&
((Time.now - @last_refresh) > @refresh_interval)
refresh
@last_refresh = Time.now
end
end
end end
end end

View File

@ -18,6 +18,7 @@
module Mongo module Mongo
class Pool class Pool
PING_ATTEMPTS = 6 PING_ATTEMPTS = 6
MAX_PING_TIME = 1_000_000
attr_accessor :host, :port, :address, attr_accessor :host, :port, :address,
:size, :timeout, :safe, :checked_out, :connection :size, :timeout, :safe, :checked_out, :connection
@ -35,7 +36,7 @@ module Mongo
@address = "#{@host}:#{@port}" @address = "#{@host}:#{@port}"
# Pool size and timeout. # Pool size and timeout.
@size = opts[:size] || 1 @size = opts[:size] || 10
@timeout = opts[:timeout] || 5.0 @timeout = opts[:timeout] || 5.0
# Mutex for synchronizing pool access # Mutex for synchronizing pool access
@ -52,6 +53,7 @@ module Mongo
@checked_out = [] @checked_out = []
@ping_time = nil @ping_time = nil
@last_ping = nil @last_ping = nil
@closed = false
end end
def close def close
@ -67,9 +69,14 @@ module Mongo
@sockets.clear @sockets.clear
@pids.clear @pids.clear
@checked_out.clear @checked_out.clear
@closed = true
end end
end end
def closed?
@closed
end
def inspect def inspect
"#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " + "#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " +
"@ping_time=#{@ping_time} #{@checked_out.size}/#{@size} sockets available.>" "@ping_time=#{@ping_time} #{@checked_out.size}/#{@size} sockets available.>"
@ -101,14 +108,12 @@ module Mongo
# to do a round-trip against this node. # to do a round-trip against this node.
def refresh_ping_time def refresh_ping_time
trials = [] trials = []
begin
PING_ATTEMPTS.times do PING_ATTEMPTS.times do
t1 = Time.now t1 = Time.now
self.connection['admin'].command({:ping => 1}, :socket => @node.socket) if !self.ping
trials << (Time.now - t1) * 1000 return MAX_PING_TIME
end end
rescue OperationFailure, SocketError, SystemCallError, IOError => ex trials << (Time.now - t1) * 1000
return nil
end end
trials.sort! trials.sort!
@ -123,6 +128,14 @@ module Mongo
(total / trials.length).ceil (total / trials.length).ceil
end end
def ping
begin
return self.connection['admin'].command({:ping => 1}, :socket => @node.socket)
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
return false
end
end
# Return a socket to the pool. # Return a socket to the pool.
def checkin(socket) def checkin(socket)
@connection_mutex.synchronize do @connection_mutex.synchronize do

View File

@ -318,7 +318,7 @@ class TestConnection < Test::Unit::TestCase
TCPSocket.stubs(:new).returns(fake_socket) TCPSocket.stubs(:new).returns(fake_socket)
@con.primary_pool.checkout_new_socket @con.primary_pool.checkout_new_socket
assert_equal [], @con.primary_pool.close assert @con.primary_pool.close
end end
end end
end end

View File

@ -54,7 +54,7 @@ class CursorTest < Test::Unit::TestCase
if @@version >= "2.0" if @@version >= "2.0"
@@coll.remove @@coll.remove
data = "1" * 100_000 data = "1" * 100_000
10_000.times do |n| 5000.times do |n|
@@coll.insert({:n => n, :data => data}) @@coll.insert({:n => n, :data => data})
end end
@ -65,7 +65,7 @@ class CursorTest < Test::Unit::TestCase
c = Cursor.new(@@coll) c = Cursor.new(@@coll)
c.add_option(OP_QUERY_EXHAUST) c.add_option(OP_QUERY_EXHAUST)
9999.times do 4999.times do
c.next c.next
end end
assert c.has_next? assert c.has_next?

View File

@ -36,6 +36,7 @@ class CollectionTest < Test::Unit::TestCase
@conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false) @conn = Connection.new('localhost', 27017, :logger => @logger, :connect => false)
@db = @conn['testing'] @db = @conn['testing']
@coll = @db.collection('books') @coll = @db.collection('books')
@conn.expects(:checkout_writer).returns(mock())
@conn.expects(:receive_message).with do |op, msg, log, sock| @conn.expects(:receive_message).with do |op, msg, log, sock|
op == 2004 op == 2004
end.returns([[], 0, 0]) end.returns([[], 0, 0])

View File

@ -10,7 +10,7 @@ class ReadTest < Test::Unit::TestCase
end end
context "Read mode on connection: " do context "Read mode on replica set connection: " do
setup do setup do
@read_preference = :secondary @read_preference = :secondary
@con = Mongo::ReplSetConnection.new(['localhost', 27017], :read => @read_preference, :connect => false) @con = Mongo::ReplSetConnection.new(['localhost', 27017], :read => @read_preference, :connect => false)
@ -71,19 +71,27 @@ class ReadTest < Test::Unit::TestCase
end end
should "use default value on query" do should "use default value on query" do
@cursor = @col.find({:a => 1})
sock = mock()
sock.expects(:close)
@con.expects(:checkout_reader).returns(sock)
@con.expects(:receive_message).with do |o, m, l, s, c, r| @con.expects(:receive_message).with do |o, m, l, s, c, r|
r == :secondary r == nil
end.returns([[], 0, 0]) end.returns([[], 0, 0])
@col.find_one({:a => 1}) @cursor.next
end end
should "allow override default value on query" do should "allow override default value on query" do
@cursor = @col.find({:a => 1}, :read => :primary)
sock = mock()
sock.expects(:close)
@con.expects(:checkout_writer).returns(sock)
@con.expects(:receive_message).with do |o, m, l, s, c, r| @con.expects(:receive_message).with do |o, m, l, s, c, r|
r == :primary r == nil
end.returns([[], 0, 0]) end.returns([[], 0, 0])
@col.find_one({:a => 1}, :read => :primary) @cursor.next
end end
should "allow override alternate value on query" do should "allow override alternate value on query" do