RUBY-320 RUBY-284 initial cascading read API. Mapping sockets to pools.
This commit is contained in:
parent
a6ea525e3d
commit
45c40e7267
@ -84,6 +84,8 @@ module Mongo
|
||||
unless pk_factory
|
||||
@safe = opts.fetch(:safe, @db.safe)
|
||||
end
|
||||
read = opts.fetch(:read, @db.read_preference)
|
||||
@read_preference = read.is_a?(Hash) ? read.dup : read
|
||||
@pk_factory = pk_factory || opts[:pk] || BSON::ObjectId
|
||||
@hint = nil
|
||||
end
|
||||
@ -200,6 +202,7 @@ module Mongo
|
||||
return_key = opts.delete(:return_key)
|
||||
transformer = opts.delete(:transformer)
|
||||
show_disk_loc = opts.delete(:max_scan)
|
||||
read = opts.delete(:read) || @read_preference
|
||||
|
||||
if timeout == false && !block_given?
|
||||
raise ArgumentError, "Collection#find must be invoked with a block when timeout is disabled."
|
||||
@ -214,19 +217,20 @@ module Mongo
|
||||
raise RuntimeError, "Unknown options [#{opts.inspect}]" unless opts.empty?
|
||||
|
||||
cursor = Cursor.new(self, {
|
||||
:selector => selector,
|
||||
:fields => fields,
|
||||
:skip => skip,
|
||||
:selector => selector,
|
||||
:fields => fields,
|
||||
:skip => skip,
|
||||
:limit => limit,
|
||||
:order => sort,
|
||||
:hint => hint,
|
||||
:snapshot => snapshot,
|
||||
:timeout => timeout,
|
||||
:order => sort,
|
||||
:hint => hint,
|
||||
:snapshot => snapshot,
|
||||
:timeout => timeout,
|
||||
:batch_size => batch_size,
|
||||
:transformer => transformer,
|
||||
:max_scan => max_scan,
|
||||
:show_disk_loc => show_disk_loc,
|
||||
:return_key => return_key
|
||||
:return_key => return_key,
|
||||
:read => read
|
||||
})
|
||||
|
||||
if block_given?
|
||||
@ -681,6 +685,13 @@ module Mongo
|
||||
end
|
||||
end
|
||||
|
||||
# The value of the read preference. This will be
|
||||
# either +:primary+, +:secondary+, or an object
|
||||
# representing the tags to be read from.
|
||||
def read_preference
|
||||
@read_preference
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def new_group(opts={})
|
||||
|
@ -310,7 +310,7 @@ module Mongo
|
||||
#
|
||||
# @core databases []-instance_method
|
||||
def [](db_name)
|
||||
DB.new(db_name, self, :safe => @safe)
|
||||
DB.new(db_name, self)
|
||||
end
|
||||
|
||||
# Drop a database.
|
||||
@ -410,11 +410,7 @@ module Mongo
|
||||
|
||||
send_message_on_socket(packed_message, socket)
|
||||
ensure
|
||||
if connection == :writer
|
||||
checkin_writer(socket)
|
||||
else
|
||||
checkin_reader(socket)
|
||||
end
|
||||
checkin(socket)
|
||||
end
|
||||
end
|
||||
|
||||
@ -446,7 +442,7 @@ module Mongo
|
||||
docs, num_received, cursor_id = receive(sock, last_error_id)
|
||||
end
|
||||
ensure
|
||||
checkin_writer(sock)
|
||||
checkin(sock)
|
||||
end
|
||||
|
||||
if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg'])
|
||||
@ -470,16 +466,24 @@ module Mongo
|
||||
# @return [Array]
|
||||
# An array whose indexes include [0] documents returned, [1] number of document received,
|
||||
# and [3] a cursor_id.
|
||||
def receive_message(operation, message, log_message=nil, socket=nil, command=false)
|
||||
def receive_message(operation, message, log_message=nil, socket=nil, command=false, read=:primary)
|
||||
request_id = add_message_headers(message, operation)
|
||||
packed_message = message.to_s
|
||||
begin
|
||||
if socket
|
||||
sock = socket
|
||||
checkin = false
|
||||
should_checkin = false
|
||||
else
|
||||
sock = (command ? checkout_writer : checkout_reader)
|
||||
checkin = true
|
||||
if command
|
||||
sock = checkout_writer
|
||||
elsif read == :primary
|
||||
sock = checkout_writer
|
||||
elsif read == :secondary
|
||||
sock = checkout_reader
|
||||
else
|
||||
sock = checkout_tagged(read)
|
||||
end
|
||||
should_checkin = true
|
||||
end
|
||||
|
||||
result = ''
|
||||
@ -488,8 +492,8 @@ module Mongo
|
||||
result = receive(sock, request_id)
|
||||
end
|
||||
ensure
|
||||
if checkin
|
||||
command ? checkin_writer(sock) : checkin_reader(sock)
|
||||
if should_checkin
|
||||
checkin(sock)
|
||||
end
|
||||
end
|
||||
result
|
||||
@ -559,6 +563,14 @@ module Mongo
|
||||
end
|
||||
alias :primary? :read_primary?
|
||||
|
||||
# The value of the read preference. Because
|
||||
# this is a single-node connection, the value
|
||||
# is +:primary+, and the connection will read
|
||||
# from whichever type of node it's connected to.
|
||||
def read_preference
|
||||
:primary
|
||||
end
|
||||
|
||||
# Close the connection to the database.
|
||||
def close
|
||||
@primary_pool.close if @primary_pool
|
||||
@ -591,14 +603,21 @@ module Mongo
|
||||
# Checkin a socket used for reading.
|
||||
# Note: this is overridden in ReplSetConnection.
|
||||
def checkin_reader(socket)
|
||||
if @primary_pool
|
||||
@primary_pool.checkin(socket)
|
||||
end
|
||||
warn "Connection#checkin_writer is not deprecated and will be remove " +
|
||||
"in driver v2.0. Use Connection#checkin instead."
|
||||
checkin(socket)
|
||||
end
|
||||
|
||||
# Checkin a socket used for writing.
|
||||
# Note: this is overridden in ReplSetConnection.
|
||||
def checkin_writer(socket)
|
||||
warn "Connection#checkin_writer is not deprecated and will be remove " +
|
||||
"in driver v2.0. Use Connection#checkin instead."
|
||||
checkin(socket)
|
||||
end
|
||||
|
||||
# Check a socket back into its pool.
|
||||
def checkin(socket)
|
||||
if @primary_pool
|
||||
@primary_pool.checkin(socket)
|
||||
end
|
||||
@ -671,7 +690,7 @@ module Mongo
|
||||
# Global safe option. This is false by default.
|
||||
@safe = opts[:safe] || false
|
||||
|
||||
# Create a mutex when a new key, in this case a socket,
|
||||
# Create a mutex when a new key, in this case a socket,
|
||||
# is added to the hash.
|
||||
@safe_mutexes = Hash.new { |h, k| h[k] = Mutex.new }
|
||||
|
||||
|
@ -70,6 +70,8 @@ module Mongo
|
||||
@query_run = false
|
||||
|
||||
@transformer = opts[:transformer]
|
||||
read = opts[:read] || collection.read_preference
|
||||
@read_preference = read.is_a?(Hash) ? read.dup : read
|
||||
batch_size(opts[:batch_size] || 0)
|
||||
|
||||
@full_collection_name = "#{@collection.db.name}.#{@collection.name}"
|
||||
@ -448,7 +450,7 @@ module Mongo
|
||||
message.put_long(@cursor_id)
|
||||
@logger.debug("MONGODB cursor.refresh() for cursor #{@cursor_id}") if @logger
|
||||
results, @n_received, @cursor_id = @connection.receive_message(
|
||||
Mongo::Constants::OP_GET_MORE, message, nil, @socket, @command)
|
||||
Mongo::Constants::OP_GET_MORE, message, nil, @socket, @command, @read_preference)
|
||||
@returned += @n_received
|
||||
@cache += results
|
||||
close_cursor_if_query_complete
|
||||
@ -464,7 +466,7 @@ module Mongo
|
||||
message = construct_query_message
|
||||
@connection.instrument(:find, instrument_payload) do
|
||||
results, @n_received, @cursor_id = @connection.receive_message(
|
||||
Mongo::Constants::OP_QUERY, message, nil, @socket, @command)
|
||||
Mongo::Constants::OP_QUERY, message, nil, @socket, @command, @read_preference)
|
||||
@returned += @n_received
|
||||
@cache += results
|
||||
@query_run = true
|
||||
|
@ -82,6 +82,8 @@ module Mongo
|
||||
@strict = opts[:strict]
|
||||
@pk_factory = opts[:pk]
|
||||
@safe = opts.fetch(:safe, @connection.safe)
|
||||
read = opts.fetch(:read, @connection.read_preference)
|
||||
@read_preference = read.is_a?(Hash) ? read.dup : read
|
||||
@cache_time = opts[:cache_time] || 300 #5 minutes.
|
||||
end
|
||||
|
||||
@ -609,6 +611,13 @@ module Mongo
|
||||
doc
|
||||
end
|
||||
|
||||
# The value of the read preference. This will be
|
||||
# either +:primary+, +:secondary+, or an object
|
||||
# representing the tags to be read from.
|
||||
def read_preference
|
||||
@read_preference
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def system_command_collection
|
||||
|
@ -48,6 +48,9 @@ module Mongo
|
||||
# Raised on failures in connection to the database server.
|
||||
class ConnectionTimeoutError < MongoRubyError; end
|
||||
|
||||
# Raised when no tags in a read preference maps to a given connection.
|
||||
class NodeWithTagsNotFound < MongoRubyError; end
|
||||
|
||||
# Raised when a connection operation fails.
|
||||
class ConnectionFailure < MongoDBError; end
|
||||
|
||||
|
@ -112,7 +112,14 @@ module Mongo
|
||||
@refresh_interval = opts[:refresh_interval] || 90
|
||||
|
||||
# Are we allowing reads from secondaries?
|
||||
@read_secondary = opts.fetch(:read_secondary, false)
|
||||
if opts[:read_secondary]
|
||||
warn ":read_secondary options has now been deprecated and will " +
|
||||
"be removed in driver v2.0. Use the :read option instead."
|
||||
@read_secondary = opts.fetch(:read_secondary, false)
|
||||
@read = :secondary
|
||||
else
|
||||
@read = opts.fetch(:read, :primary)
|
||||
end
|
||||
|
||||
# Lock around changes to the global config
|
||||
@connection_lock = Mutex.new
|
||||
@ -121,6 +128,10 @@ module Mongo
|
||||
# Store the refresher thread
|
||||
@refresh_thread = nil
|
||||
|
||||
# Maps
|
||||
@sockets_to_pools = {}
|
||||
@tags_to_pools = {}
|
||||
|
||||
# Replica set name
|
||||
if opts[:rs_name]
|
||||
warn ":rs_name option has been deprecated and will be removed in 2.0. " +
|
||||
@ -226,6 +237,10 @@ module Mongo
|
||||
end
|
||||
alias :primary? :read_primary?
|
||||
|
||||
def read_preference
|
||||
@read
|
||||
end
|
||||
|
||||
# Close the connection to the database.
|
||||
def close
|
||||
super
|
||||
@ -254,6 +269,8 @@ module Mongo
|
||||
@secondaries = []
|
||||
@secondary_pools = []
|
||||
@arbiters = []
|
||||
@tags_to_pools.clear
|
||||
@sockets_to_pools.clear
|
||||
end
|
||||
|
||||
# If a ConnectionFailure is raised, this method will be called
|
||||
@ -265,11 +282,15 @@ module Mongo
|
||||
"Use ReplSetConnection#close instead."
|
||||
end
|
||||
|
||||
# Is it okay to connect to a slave?
|
||||
# Returns +true+ if it's okay to read from a secondary node.
|
||||
# Since this is a replica set, this must always be true.
|
||||
#
|
||||
# @return [Boolean]
|
||||
# This method exist primarily so that Cursor objects will
|
||||
# generate query messages with a slaveOkay value of +true+.
|
||||
#
|
||||
# @return [Boolean] +true+
|
||||
def slave_ok?
|
||||
@read_secondary
|
||||
true
|
||||
end
|
||||
|
||||
def authenticate_pools
|
||||
@ -299,25 +320,29 @@ module Mongo
|
||||
|
||||
# Checkout a socket for reading (i.e., a secondary node).
|
||||
# Note that @read_pool might point to the primary pool
|
||||
# if no read pool has been defined. That's okay; we don't
|
||||
# want to have to check for the existence of the @read_pool
|
||||
# because that introduces concurrency issues.
|
||||
# if no read pool has been defined.
|
||||
def checkout_reader
|
||||
connect unless connected?
|
||||
|
||||
if @read_secondary && @read_pool
|
||||
begin
|
||||
return @read_pool.checkout
|
||||
rescue NoMethodError
|
||||
warn "Read pool was not available."
|
||||
socket = @read_pool.checkout
|
||||
@sockets_to_pools[socket] = @read_pool
|
||||
return socket
|
||||
end
|
||||
|
||||
# Checkout a socket connected to a node with one of
|
||||
# the provided tags. If no such node exists, raise
|
||||
# an exception.
|
||||
def checkout_tagged(tags)
|
||||
tags.each do |k, v|
|
||||
if pool = @tags_to_pools[{k.to_s => v}]
|
||||
socket = pool.checkout
|
||||
@sockets_to_pools[socket] = pool
|
||||
return socket
|
||||
end
|
||||
end
|
||||
|
||||
begin
|
||||
return @primary_pool.checkout
|
||||
rescue NoMethodError
|
||||
raise ConnectionFailure, "Not connected to any nodes."
|
||||
end
|
||||
raise NodeWithTagsNotFound,
|
||||
"Could not find a connection tagged with #{tags}."
|
||||
end
|
||||
|
||||
# Checkout a socket for writing (i.e., a primary node).
|
||||
@ -326,7 +351,9 @@ module Mongo
|
||||
|
||||
if @primary_pool
|
||||
begin
|
||||
return @primary_pool.checkout
|
||||
socket = @primary_pool.checkout
|
||||
@sockets_to_pools[socket] = @primary
|
||||
return socket
|
||||
rescue NoMethodError
|
||||
end
|
||||
end
|
||||
@ -336,17 +363,21 @@ module Mongo
|
||||
|
||||
# Checkin a socket used for reading.
|
||||
def checkin_reader(socket)
|
||||
if @read_secondary
|
||||
@read_pool.checkin(socket)
|
||||
else
|
||||
checkin_writer(socket)
|
||||
end
|
||||
warn "ReplSetConnection#checkin_writer is not deprecated and will be remove " +
|
||||
"in driver v2.0. Use ReplSetConnection#checkin instead."
|
||||
checkin(socket)
|
||||
end
|
||||
|
||||
# Checkin a socket used for writing.
|
||||
def checkin_writer(socket)
|
||||
if @primary_pool
|
||||
@primary_pool.checkin(socket)
|
||||
warn "ReplSetConnection#checkin_writer is not deprecated and will be remove " +
|
||||
"in driver v2.0. Use ReplSetConnection#checkin instead."
|
||||
checkin(socket)
|
||||
end
|
||||
|
||||
def checkin(socket)
|
||||
if pool = @sockets_to_pools[socket]
|
||||
pool.checkin(socket)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -5,9 +5,9 @@ class CursorTest < Test::Unit::TestCase
|
||||
setup do
|
||||
@logger = mock()
|
||||
@logger.stubs(:debug)
|
||||
@connection = stub(:class => Connection, :logger => @logger, :slave_ok? => false)
|
||||
@db = stub(:name => "testing", :slave_ok? => false, :connection => @connection)
|
||||
@collection = stub(:db => @db, :name => "items")
|
||||
@connection = stub(:class => Connection, :logger => @logger, :slave_ok? => false, :read_preference => :primary)
|
||||
@db = stub(:name => "testing", :slave_ok? => false, :connection => @connection, :read_preference => :primary)
|
||||
@collection = stub(:db => @db, :name => "items", :read_preference => :primary)
|
||||
@cursor = Cursor.new(@collection)
|
||||
end
|
||||
|
||||
@ -102,7 +102,7 @@ class CursorTest < Test::Unit::TestCase
|
||||
@logger.stubs(:debug)
|
||||
@connection = stub(:class => Connection, :logger => @logger, :slave_ok? => false)
|
||||
@db = stub(:slave_ok? => true, :name => "testing", :connection => @connection)
|
||||
@collection = stub(:db => @db, :name => "items")
|
||||
@collection = stub(:db => @db, :name => "items", :read_preference => :primary)
|
||||
end
|
||||
|
||||
should "when an array should return a hash with each key" do
|
||||
|
@ -15,8 +15,10 @@ class DBTest < Test::Unit::TestCase
|
||||
setup do
|
||||
@conn = stub()
|
||||
@conn.stubs(:safe)
|
||||
@conn.stubs(:read_preference)
|
||||
@db = DB.new("testing", @conn)
|
||||
@db.stubs(:safe)
|
||||
@db.stubs(:read_preference)
|
||||
@collection = mock()
|
||||
@db.stubs(:system_command_collection).returns(@collection)
|
||||
end
|
||||
|
@ -6,6 +6,7 @@ class GridTest < Test::Unit::TestCase
|
||||
setup do
|
||||
@conn = stub()
|
||||
@conn.stubs(:safe)
|
||||
@conn.stubs(:read_preference)
|
||||
@db = DB.new("testing", @conn)
|
||||
@files = mock()
|
||||
@chunks = mock()
|
||||
@ -13,6 +14,7 @@ class GridTest < Test::Unit::TestCase
|
||||
@db.expects(:[]).with('fs.files').returns(@files)
|
||||
@db.expects(:[]).with('fs.chunks').returns(@chunks)
|
||||
@db.stubs(:safe)
|
||||
@db.stubs(:read_preference)
|
||||
end
|
||||
|
||||
context "Grid classe with standard connections" do
|
||||
|
@ -65,7 +65,7 @@ class NodeTest < Test::Unit::TestCase
|
||||
Node.new(@connection, ['192.168.0.1', Connection::DEFAULT_PORT])
|
||||
end
|
||||
|
||||
should "two nodes with the same address should have the same hash" do
|
||||
should "two nodes with the same address should have the same hash negate" do
|
||||
assert_not_equal Node.new(@connection, '192.168.0.1').hash,
|
||||
Node.new(@connection, '1239.33.4.2393:29949').hash
|
||||
end
|
||||
|
98
test/unit/read_test.rb
Normal file
98
test/unit/read_test.rb
Normal file
@ -0,0 +1,98 @@
|
||||
require './test/test_helper'
|
||||
|
||||
class ReadTest < Test::Unit::TestCase
|
||||
|
||||
context "Read mode on standard connection: " do
|
||||
setup do
|
||||
@read_preference = :secondary
|
||||
@con = Mongo::Connection.new('localhost', 27017, :read => @read_preference, :connect => false)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
context "Read mode on connection: " do
|
||||
setup do
|
||||
@read_preference = :secondary
|
||||
@con = Mongo::ReplSetConnection.new(['localhost', 27017], :read => @read_preference, :connect => false)
|
||||
end
|
||||
|
||||
should "store read preference on Connection" do
|
||||
assert_equal @read_preference, @con.read_preference
|
||||
end
|
||||
|
||||
should "propogate to DB" do
|
||||
db = @con['foo']
|
||||
assert_equal @read_preference, db.read_preference
|
||||
|
||||
db = @con.db('foo')
|
||||
assert_equal @read_preference, db.read_preference
|
||||
|
||||
db = DB.new('foo', @con)
|
||||
assert_equal @read_preference, db.read_preference
|
||||
end
|
||||
|
||||
should "allow db override" do
|
||||
db = DB.new('foo', @con, :read => :primary)
|
||||
assert_equal :primary, db.read_preference
|
||||
|
||||
db = @con.db('foo', :read => :primary)
|
||||
assert_equal :primary, db.read_preference
|
||||
end
|
||||
|
||||
context "on DB: " do
|
||||
setup do
|
||||
@db = @con['foo']
|
||||
end
|
||||
|
||||
should "propogate to collection" do
|
||||
col = @db.collection('bar')
|
||||
assert_equal @read_preference, col.read_preference
|
||||
|
||||
col = @db['bar']
|
||||
assert_equal @read_preference, col.read_preference
|
||||
|
||||
col = Collection.new('bar', @db)
|
||||
assert_equal @read_preference, col.read_preference
|
||||
end
|
||||
|
||||
should "allow override on collection" do
|
||||
col = @db.collection('bar', :read => :primary)
|
||||
assert_equal :primary, col.read_preference
|
||||
|
||||
col = Collection.new('bar', @db, :read => :primary)
|
||||
assert_equal :primary, col.read_preference
|
||||
end
|
||||
end
|
||||
|
||||
context "on read mode ops" do
|
||||
setup do
|
||||
@col = @con['foo']['bar']
|
||||
@mock_socket = stub()
|
||||
end
|
||||
|
||||
should "use default value on query" do
|
||||
@con.expects(:receive_message).with do |o, m, l, s, c, r|
|
||||
r == :secondary
|
||||
end.returns([[], 0, 0])
|
||||
|
||||
@col.find_one({:a => 1})
|
||||
end
|
||||
|
||||
should "allow override default value on query" do
|
||||
@con.expects(:receive_message).with do |o, m, l, s, c, r|
|
||||
r == :primary
|
||||
end.returns([[], 0, 0])
|
||||
|
||||
@col.find_one({:a => 1}, :read => :primary)
|
||||
end
|
||||
|
||||
should "allow override alternate value on query" do
|
||||
@con.expects(:receive_message).with do |o, m, l, s, c, r|
|
||||
tags = {:dc => "ny"}
|
||||
end.returns([[], 0, 0])
|
||||
|
||||
@col.find_one({:a => 1}, :read => {:dc => "ny"})
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue
Block a user