diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index fe7ef57..aeb2663 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -84,6 +84,8 @@ module Mongo unless pk_factory @safe = opts.fetch(:safe, @db.safe) end + read = opts.fetch(:read, @db.read_preference) + @read_preference = read.is_a?(Hash) ? read.dup : read @pk_factory = pk_factory || opts[:pk] || BSON::ObjectId @hint = nil end @@ -200,6 +202,7 @@ module Mongo return_key = opts.delete(:return_key) transformer = opts.delete(:transformer) show_disk_loc = opts.delete(:max_scan) + read = opts.delete(:read) || @read_preference if timeout == false && !block_given? raise ArgumentError, "Collection#find must be invoked with a block when timeout is disabled." @@ -214,19 +217,20 @@ module Mongo raise RuntimeError, "Unknown options [#{opts.inspect}]" unless opts.empty? cursor = Cursor.new(self, { - :selector => selector, - :fields => fields, - :skip => skip, + :selector => selector, + :fields => fields, + :skip => skip, :limit => limit, - :order => sort, - :hint => hint, - :snapshot => snapshot, - :timeout => timeout, + :order => sort, + :hint => hint, + :snapshot => snapshot, + :timeout => timeout, :batch_size => batch_size, :transformer => transformer, :max_scan => max_scan, :show_disk_loc => show_disk_loc, - :return_key => return_key + :return_key => return_key, + :read => read }) if block_given? @@ -681,6 +685,13 @@ module Mongo end end + # The value of the read preference. This will be + # either +:primary+, +:secondary+, or an object + # representing the tags to be read from. + def read_preference + @read_preference + end + private def new_group(opts={}) diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 0682b5f..25f021a 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -310,7 +310,7 @@ module Mongo # # @core databases []-instance_method def [](db_name) - DB.new(db_name, self, :safe => @safe) + DB.new(db_name, self) end # Drop a database. @@ -410,11 +410,7 @@ module Mongo send_message_on_socket(packed_message, socket) ensure - if connection == :writer - checkin_writer(socket) - else - checkin_reader(socket) - end + checkin(socket) end end @@ -446,7 +442,7 @@ module Mongo docs, num_received, cursor_id = receive(sock, last_error_id) end ensure - checkin_writer(sock) + checkin(sock) end if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg']) @@ -470,16 +466,24 @@ module Mongo # @return [Array] # An array whose indexes include [0] documents returned, [1] number of document received, # and [3] a cursor_id. - def receive_message(operation, message, log_message=nil, socket=nil, command=false) + def receive_message(operation, message, log_message=nil, socket=nil, command=false, read=:primary) request_id = add_message_headers(message, operation) packed_message = message.to_s begin if socket sock = socket - checkin = false + should_checkin = false else - sock = (command ? checkout_writer : checkout_reader) - checkin = true + if command + sock = checkout_writer + elsif read == :primary + sock = checkout_writer + elsif read == :secondary + sock = checkout_reader + else + sock = checkout_tagged(read) + end + should_checkin = true end result = '' @@ -488,8 +492,8 @@ module Mongo result = receive(sock, request_id) end ensure - if checkin - command ? checkin_writer(sock) : checkin_reader(sock) + if should_checkin + checkin(sock) end end result @@ -559,6 +563,14 @@ module Mongo end alias :primary? :read_primary? + # The value of the read preference. Because + # this is a single-node connection, the value + # is +:primary+, and the connection will read + # from whichever type of node it's connected to. + def read_preference + :primary + end + # Close the connection to the database. def close @primary_pool.close if @primary_pool @@ -591,14 +603,21 @@ module Mongo # Checkin a socket used for reading. # Note: this is overridden in ReplSetConnection. def checkin_reader(socket) - if @primary_pool - @primary_pool.checkin(socket) - end + warn "Connection#checkin_writer is not deprecated and will be remove " + + "in driver v2.0. Use Connection#checkin instead." + checkin(socket) end # Checkin a socket used for writing. # Note: this is overridden in ReplSetConnection. def checkin_writer(socket) + warn "Connection#checkin_writer is not deprecated and will be remove " + + "in driver v2.0. Use Connection#checkin instead." + checkin(socket) + end + + # Check a socket back into its pool. + def checkin(socket) if @primary_pool @primary_pool.checkin(socket) end @@ -671,7 +690,7 @@ module Mongo # Global safe option. This is false by default. @safe = opts[:safe] || false - # Create a mutex when a new key, in this case a socket, + # Create a mutex when a new key, in this case a socket, # is added to the hash. @safe_mutexes = Hash.new { |h, k| h[k] = Mutex.new } diff --git a/lib/mongo/cursor.rb b/lib/mongo/cursor.rb index 9b54172..1f321e6 100644 --- a/lib/mongo/cursor.rb +++ b/lib/mongo/cursor.rb @@ -70,6 +70,8 @@ module Mongo @query_run = false @transformer = opts[:transformer] + read = opts[:read] || collection.read_preference + @read_preference = read.is_a?(Hash) ? read.dup : read batch_size(opts[:batch_size] || 0) @full_collection_name = "#{@collection.db.name}.#{@collection.name}" @@ -448,7 +450,7 @@ module Mongo message.put_long(@cursor_id) @logger.debug("MONGODB cursor.refresh() for cursor #{@cursor_id}") if @logger results, @n_received, @cursor_id = @connection.receive_message( - Mongo::Constants::OP_GET_MORE, message, nil, @socket, @command) + Mongo::Constants::OP_GET_MORE, message, nil, @socket, @command, @read_preference) @returned += @n_received @cache += results close_cursor_if_query_complete @@ -464,7 +466,7 @@ module Mongo message = construct_query_message @connection.instrument(:find, instrument_payload) do results, @n_received, @cursor_id = @connection.receive_message( - Mongo::Constants::OP_QUERY, message, nil, @socket, @command) + Mongo::Constants::OP_QUERY, message, nil, @socket, @command, @read_preference) @returned += @n_received @cache += results @query_run = true diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index 69cb363..a9e82f4 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -82,6 +82,8 @@ module Mongo @strict = opts[:strict] @pk_factory = opts[:pk] @safe = opts.fetch(:safe, @connection.safe) + read = opts.fetch(:read, @connection.read_preference) + @read_preference = read.is_a?(Hash) ? read.dup : read @cache_time = opts[:cache_time] || 300 #5 minutes. end @@ -609,6 +611,13 @@ module Mongo doc end + # The value of the read preference. This will be + # either +:primary+, +:secondary+, or an object + # representing the tags to be read from. + def read_preference + @read_preference + end + private def system_command_collection diff --git a/lib/mongo/exceptions.rb b/lib/mongo/exceptions.rb index 0989635..875eeeb 100644 --- a/lib/mongo/exceptions.rb +++ b/lib/mongo/exceptions.rb @@ -48,6 +48,9 @@ module Mongo # Raised on failures in connection to the database server. class ConnectionTimeoutError < MongoRubyError; end + # Raised when no tags in a read preference maps to a given connection. + class NodeWithTagsNotFound < MongoRubyError; end + # Raised when a connection operation fails. class ConnectionFailure < MongoDBError; end diff --git a/lib/mongo/repl_set_connection.rb b/lib/mongo/repl_set_connection.rb index f249bd5..7fa9d2a 100644 --- a/lib/mongo/repl_set_connection.rb +++ b/lib/mongo/repl_set_connection.rb @@ -112,7 +112,14 @@ module Mongo @refresh_interval = opts[:refresh_interval] || 90 # Are we allowing reads from secondaries? - @read_secondary = opts.fetch(:read_secondary, false) + if opts[:read_secondary] + warn ":read_secondary options has now been deprecated and will " + + "be removed in driver v2.0. Use the :read option instead." + @read_secondary = opts.fetch(:read_secondary, false) + @read = :secondary + else + @read = opts.fetch(:read, :primary) + end # Lock around changes to the global config @connection_lock = Mutex.new @@ -121,6 +128,10 @@ module Mongo # Store the refresher thread @refresh_thread = nil + # Maps + @sockets_to_pools = {} + @tags_to_pools = {} + # Replica set name if opts[:rs_name] warn ":rs_name option has been deprecated and will be removed in 2.0. " + @@ -226,6 +237,10 @@ module Mongo end alias :primary? :read_primary? + def read_preference + @read + end + # Close the connection to the database. def close super @@ -254,6 +269,8 @@ module Mongo @secondaries = [] @secondary_pools = [] @arbiters = [] + @tags_to_pools.clear + @sockets_to_pools.clear end # If a ConnectionFailure is raised, this method will be called @@ -265,11 +282,15 @@ module Mongo "Use ReplSetConnection#close instead." end - # Is it okay to connect to a slave? + # Returns +true+ if it's okay to read from a secondary node. + # Since this is a replica set, this must always be true. # - # @return [Boolean] + # This method exist primarily so that Cursor objects will + # generate query messages with a slaveOkay value of +true+. + # + # @return [Boolean] +true+ def slave_ok? - @read_secondary + true end def authenticate_pools @@ -299,25 +320,29 @@ module Mongo # Checkout a socket for reading (i.e., a secondary node). # Note that @read_pool might point to the primary pool - # if no read pool has been defined. That's okay; we don't - # want to have to check for the existence of the @read_pool - # because that introduces concurrency issues. + # if no read pool has been defined. def checkout_reader connect unless connected? - if @read_secondary && @read_pool - begin - return @read_pool.checkout - rescue NoMethodError - warn "Read pool was not available." + socket = @read_pool.checkout + @sockets_to_pools[socket] = @read_pool + return socket + end + + # Checkout a socket connected to a node with one of + # the provided tags. If no such node exists, raise + # an exception. + def checkout_tagged(tags) + tags.each do |k, v| + if pool = @tags_to_pools[{k.to_s => v}] + socket = pool.checkout + @sockets_to_pools[socket] = pool + return socket end end - begin - return @primary_pool.checkout - rescue NoMethodError - raise ConnectionFailure, "Not connected to any nodes." - end + raise NodeWithTagsNotFound, + "Could not find a connection tagged with #{tags}." end # Checkout a socket for writing (i.e., a primary node). @@ -326,7 +351,9 @@ module Mongo if @primary_pool begin - return @primary_pool.checkout + socket = @primary_pool.checkout + @sockets_to_pools[socket] = @primary + return socket rescue NoMethodError end end @@ -336,17 +363,21 @@ module Mongo # Checkin a socket used for reading. def checkin_reader(socket) - if @read_secondary - @read_pool.checkin(socket) - else - checkin_writer(socket) - end + warn "ReplSetConnection#checkin_writer is not deprecated and will be remove " + + "in driver v2.0. Use ReplSetConnection#checkin instead." + checkin(socket) end # Checkin a socket used for writing. def checkin_writer(socket) - if @primary_pool - @primary_pool.checkin(socket) + warn "ReplSetConnection#checkin_writer is not deprecated and will be remove " + + "in driver v2.0. Use ReplSetConnection#checkin instead." + checkin(socket) + end + + def checkin(socket) + if pool = @sockets_to_pools[socket] + pool.checkin(socket) end end end diff --git a/test/unit/cursor_test.rb b/test/unit/cursor_test.rb index 469be6b..622e630 100644 --- a/test/unit/cursor_test.rb +++ b/test/unit/cursor_test.rb @@ -5,9 +5,9 @@ class CursorTest < Test::Unit::TestCase setup do @logger = mock() @logger.stubs(:debug) - @connection = stub(:class => Connection, :logger => @logger, :slave_ok? => false) - @db = stub(:name => "testing", :slave_ok? => false, :connection => @connection) - @collection = stub(:db => @db, :name => "items") + @connection = stub(:class => Connection, :logger => @logger, :slave_ok? => false, :read_preference => :primary) + @db = stub(:name => "testing", :slave_ok? => false, :connection => @connection, :read_preference => :primary) + @collection = stub(:db => @db, :name => "items", :read_preference => :primary) @cursor = Cursor.new(@collection) end @@ -102,7 +102,7 @@ class CursorTest < Test::Unit::TestCase @logger.stubs(:debug) @connection = stub(:class => Connection, :logger => @logger, :slave_ok? => false) @db = stub(:slave_ok? => true, :name => "testing", :connection => @connection) - @collection = stub(:db => @db, :name => "items") + @collection = stub(:db => @db, :name => "items", :read_preference => :primary) end should "when an array should return a hash with each key" do diff --git a/test/unit/db_test.rb b/test/unit/db_test.rb index 5b41808..e8d2926 100644 --- a/test/unit/db_test.rb +++ b/test/unit/db_test.rb @@ -15,8 +15,10 @@ class DBTest < Test::Unit::TestCase setup do @conn = stub() @conn.stubs(:safe) + @conn.stubs(:read_preference) @db = DB.new("testing", @conn) @db.stubs(:safe) + @db.stubs(:read_preference) @collection = mock() @db.stubs(:system_command_collection).returns(@collection) end diff --git a/test/unit/grid_test.rb b/test/unit/grid_test.rb index 8e8ba3f..066d0e9 100644 --- a/test/unit/grid_test.rb +++ b/test/unit/grid_test.rb @@ -6,6 +6,7 @@ class GridTest < Test::Unit::TestCase setup do @conn = stub() @conn.stubs(:safe) + @conn.stubs(:read_preference) @db = DB.new("testing", @conn) @files = mock() @chunks = mock() @@ -13,6 +14,7 @@ class GridTest < Test::Unit::TestCase @db.expects(:[]).with('fs.files').returns(@files) @db.expects(:[]).with('fs.chunks').returns(@chunks) @db.stubs(:safe) + @db.stubs(:read_preference) end context "Grid classe with standard connections" do diff --git a/test/unit/node_test.rb b/test/unit/node_test.rb index 777e307..c3b2be7 100644 --- a/test/unit/node_test.rb +++ b/test/unit/node_test.rb @@ -65,7 +65,7 @@ class NodeTest < Test::Unit::TestCase Node.new(@connection, ['192.168.0.1', Connection::DEFAULT_PORT]) end - should "two nodes with the same address should have the same hash" do + should "two nodes with the same address should have the same hash negate" do assert_not_equal Node.new(@connection, '192.168.0.1').hash, Node.new(@connection, '1239.33.4.2393:29949').hash end diff --git a/test/unit/read_test.rb b/test/unit/read_test.rb new file mode 100644 index 0000000..40ce098 --- /dev/null +++ b/test/unit/read_test.rb @@ -0,0 +1,98 @@ +require './test/test_helper' + +class ReadTest < Test::Unit::TestCase + + context "Read mode on standard connection: " do + setup do + @read_preference = :secondary + @con = Mongo::Connection.new('localhost', 27017, :read => @read_preference, :connect => false) + end + + end + + context "Read mode on connection: " do + setup do + @read_preference = :secondary + @con = Mongo::ReplSetConnection.new(['localhost', 27017], :read => @read_preference, :connect => false) + end + + should "store read preference on Connection" do + assert_equal @read_preference, @con.read_preference + end + + should "propogate to DB" do + db = @con['foo'] + assert_equal @read_preference, db.read_preference + + db = @con.db('foo') + assert_equal @read_preference, db.read_preference + + db = DB.new('foo', @con) + assert_equal @read_preference, db.read_preference + end + + should "allow db override" do + db = DB.new('foo', @con, :read => :primary) + assert_equal :primary, db.read_preference + + db = @con.db('foo', :read => :primary) + assert_equal :primary, db.read_preference + end + + context "on DB: " do + setup do + @db = @con['foo'] + end + + should "propogate to collection" do + col = @db.collection('bar') + assert_equal @read_preference, col.read_preference + + col = @db['bar'] + assert_equal @read_preference, col.read_preference + + col = Collection.new('bar', @db) + assert_equal @read_preference, col.read_preference + end + + should "allow override on collection" do + col = @db.collection('bar', :read => :primary) + assert_equal :primary, col.read_preference + + col = Collection.new('bar', @db, :read => :primary) + assert_equal :primary, col.read_preference + end + end + + context "on read mode ops" do + setup do + @col = @con['foo']['bar'] + @mock_socket = stub() + end + + should "use default value on query" do + @con.expects(:receive_message).with do |o, m, l, s, c, r| + r == :secondary + end.returns([[], 0, 0]) + + @col.find_one({:a => 1}) + end + + should "allow override default value on query" do + @con.expects(:receive_message).with do |o, m, l, s, c, r| + r == :primary + end.returns([[], 0, 0]) + + @col.find_one({:a => 1}, :read => :primary) + end + + should "allow override alternate value on query" do + @con.expects(:receive_message).with do |o, m, l, s, c, r| + tags = {:dc => "ny"} + end.returns([[], 0, 0]) + + @col.find_one({:a => 1}, :read => {:dc => "ny"}) + end + end + end +end