From 0afa5aa412e69f1210428fdf86725f1cc1f61051 Mon Sep 17 00:00:00 2001 From: Kyle Banker Date: Mon, 19 Jul 2010 12:07:46 -0400 Subject: [PATCH] Initial replica set support --- Rakefile | 36 +++- lib/mongo/connection.rb | 171 ++++++++++++++++-- test/connection_test.rb | 10 + test/{replica => replica_pairs}/count_test.rb | 0 .../{replica => replica_pairs}/insert_test.rb | 0 .../pooled_insert_test.rb | 0 test/{replica => replica_pairs}/query_test.rb | 0 test/replica_sets/count_test.rb | 33 ++++ test/replica_sets/insert_test.rb | 50 +++++ test/replica_sets/pooled_insert_test.rb | 54 ++++++ test/replica_sets/query_test.rb | 39 ++++ test/unit/connection_test.rb | 56 +++++- 12 files changed, 424 insertions(+), 25 deletions(-) rename test/{replica => replica_pairs}/count_test.rb (100%) rename test/{replica => replica_pairs}/insert_test.rb (100%) rename test/{replica => replica_pairs}/pooled_insert_test.rb (100%) rename test/{replica => replica_pairs}/query_test.rb (100%) create mode 100644 test/replica_sets/count_test.rb create mode 100644 test/replica_sets/insert_test.rb create mode 100644 test/replica_sets/pooled_insert_test.rb create mode 100644 test/replica_sets/query_test.rb diff --git a/Rakefile b/Rakefile index 5a83010..78ae0a9 100644 --- a/Rakefile +++ b/Rakefile @@ -58,23 +58,43 @@ namespace :test do t.verbose = true end - Rake::TestTask.new(:pair_count) do |t| - t.test_files = FileList['test/replica/count_test.rb'] + Rake::TestTask.new(:replica_pair_count) do |t| + t.test_files = FileList['test/replica_pairs/count_test.rb'] t.verbose = true end - Rake::TestTask.new(:pair_insert) do |t| - t.test_files = FileList['test/replica/insert_test.rb'] + Rake::TestTask.new(:replica_pair_insert) do |t| + t.test_files = FileList['test/replica_pairs/insert_test.rb'] t.verbose = true end - Rake::TestTask.new(:pooled_pair_insert) do |t| - t.test_files = FileList['test/replica/pooled_insert_test.rb'] + Rake::TestTask.new(:pooled_replica_pair_insert) do |t| + t.test_files = FileList['test/replica_pairs/pooled_insert_test.rb'] t.verbose = true end - Rake::TestTask.new(:pair_query) do |t| - t.test_files = FileList['test/replica/query_test.rb'] + Rake::TestTask.new(:replica_pair_query) do |t| + t.test_files = FileList['test/replica_pairs/query_test.rb'] + t.verbose = true + end + + Rake::TestTask.new(:replica_set_count) do |t| + t.test_files = FileList['test/replica_sets/count_test.rb'] + t.verbose = true + end + + Rake::TestTask.new(:replica_set_insert) do |t| + t.test_files = FileList['test/replica_sets/insert_test.rb'] + t.verbose = true + end + + Rake::TestTask.new(:pooled_replica_set_insert) do |t| + t.test_files = FileList['test/replica_sets/pooled_insert_test.rb'] + t.verbose = true + end + + Rake::TestTask.new(:replica_set_query) do |t| + t.test_files = FileList['test/replica_sets/query_test.rb'] t.verbose = true end diff --git a/lib/mongo/connection.rb b/lib/mongo/connection.rb index 27ba1ca..871366b 100644 --- a/lib/mongo/connection.rb +++ b/lib/mongo/connection.rb @@ -40,16 +40,15 @@ module Mongo # Counter for generating unique request ids. @@current_request_id = 0 - # Create a connection to MongoDB. Specify either one or a pair of servers, - # along with a maximum connection pool size and timeout. + # Create a connection to MongoDB. # # If connecting to just one server, you may specify whether connection to slave is permitted. # In all cases, the default host is "localhost" and the default port is 27017. # - # To specify a pair, use Connection.paired. - # - # Note that there are a few issues when using connection pooling with Ruby 1.9 on Windows. These - # should be resolved in the next release. + # To specify more than one host pair to be used as seeds in a replica set + # or replica pair, use Connection.multi. If you're only specifying one node in the + # replica set, you can use Connection.new, as any other host known to the set will be + # cached. # # @param [String, Hash] host. # @param [Integer] port specify a port number here if only one host is being specified. @@ -57,7 +56,8 @@ module Mongo # @option options [Boolean] :slave_ok (false) Must be set to +true+ when connecting # to a single, slave node. # @option options [Logger, #debug] :logger (nil) Logger instance to receive driver operation log. - # @option options [Integer] :pool_size (1) The maximum number of socket connections that can be opened to the database. + # @option options [Integer] :pool_size (1) The maximum number of socket connections that can be + # opened to the database. # @option options [Float] :timeout (5.0) When all of the connections to the pool are checked out, # this is the number of seconds to wait for a new connection to be released before throwing an exception. # @@ -110,14 +110,49 @@ module Mongo @checked_out = [] # slave_ok can be true only if one node is specified - @slave_ok = options[:slave_ok] && @nodes.length == 1 + if @nodes.length > 1 && options[:slave_ok] + raise MongoArgumentError, "Can't specify more than one node when :slave_ok is true." + else + @slave_ok = options[:slave_ok] + end + @logger = options[:logger] || nil @options = options should_connect = options[:connect].nil? ? true : options[:connect] - connect_to_master if should_connect + connect if should_connect end + # Initialize a paired connection to MongoDB. + # + # @param nodes [Array] An array of arrays, each of which specified a host and port. + # @param opts Takes the same options as Connection.new + # + # @example + # Connection.paired([["db1.example.com", 27017], + # ["db2.example.com", 27017]]) + # + # @example + # Connection.paired([["db1.example.com", 27017], + # ["db2.example.com", 27017]], + # :pool_size => 20, :timeout => 5) + # + # @return [Mongo::Connection] + def self.multi(nodes, opts={}) + unless nodes.length > 0 && nodes.all? {|n| n.is_a? Array} + raise MongoArgumentError, "Connection.paired requires at least one node to be specified." + end + # Block returns an array, the first element being an array of nodes and the second an array + # of authorizations for the database. + new(nil, nil, opts) do |con| + nodes.map do |node| + con.pair_val_to_connection(node) + end + end + end + + # @deprecated + # # Initialize a paired connection to MongoDB. # # @param nodes [Array] An array of arrays, each of which specified a host and port. @@ -134,6 +169,7 @@ module Mongo # # @return [Mongo::Connection] def self.paired(nodes, opts={}) + warn "Connection.paired is deprecated. Please use Connection.multi instead." unless nodes.length == 2 && nodes.all? {|n| n.is_a? Array} raise MongoArgumentError, "Connection.paired requires that exactly two nodes be specified." end @@ -412,11 +448,109 @@ module Mongo result end + + # Create a new socket and attempt to connect to master. + # If successful, sets host and port to master and returns the socket. + # + # If connecting to a replica set, this method will update the + # initially-provided seed list with any nodes known to the set. + # + # @raise [ConnectionFailure] if unable to connect to any host or port. + def connect + reset_connection + + first_node = @nodes.first + if is_primary?(check_is_master(first_node)) + set_primary(first_node) + else + while !connected? && !(nodes_to_try = @nodes - @nodes_tried).empty? + nodes_to_try.each do |node| + if is_primary?(check_is_master(node)) + set_primary(node) + break + end + end + end + end + + raise ConnectionFailure, "failed to connect to any given host:port" unless connected? + end + + # @deprecated + # + # Create a new socket and attempt to connect to master. + # If successful, sets host and port to master and returns the socket. + def connect_to_master + warn "Connection#connect_to_master is deprecated. Use Connection#connect instead." + connect + end + + def reset_connection + close + @host = nil + @port = nil + @nodes_tried = [] + end + + def connected? + @host && @port + end + + # Primary is defined as either a master node or a slave if + # :slave_ok has been set to +true+. + # + # If a primary node is discovered, we set the the @host and @port and + # apply any saved authentication. + # + # TODO: use the 'primary', and 'seconday' fields if we're in a replica set + def is_primary?(config) + config && (config['ismaster'] == 1 || config['ismaster'] == true) || @slave_ok + end + + # @return + def check_is_master(node) + begin + host, port = *node + socket = TCPSocket.new(host, port) + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + + config = self['admin'].command({:ismaster => 1}, :sock => socket) + + rescue OperationFailure, SocketError, SystemCallError, IOError => ex + close + ensure + @nodes_tried << node + update_node_list(config['hosts']) if config && config['hosts'] + socket.close if socket + end + + config + end + + def set_primary(node) + @host, @port = *node + apply_saved_authentication + end + + def update_node_list(hosts) + new_nodes = hosts.map do |host| + if !host.respond_to?(:split) + warn "Could not parse host #{host.inspect}." + next + end + + host, port = host.split(':') + [host, port.to_i] + end + + @nodes |= new_nodes + end + # Create a new socket and attempt to connect to master. # If successful, sets host and port to master and returns the socket. # # @raise [ConnectionFailure] if unable to connect to any host or port. - def connect_to_master + def connect_to_master_old close @host = @port = nil for node_pair in @nodes @@ -450,6 +584,21 @@ module Mongo raise ConnectionFailure, "failed to connect to any given host:port" unless socket end + def attempt_node(node) + host, port = *node + socket = TCPSocket.new(host, port) + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + + # If we're connected to master, set the @host and @port + result = self['admin'].command({:ismaster => 1}, :check_response => false, :sock => socket) + if Mongo::Support.ok?(result) + if ((is_master = result['ismaster'] == 1) || @slave_ok) + @host, @port = host, port + end + apply_saved_authentication + end + end + # Are we connected to MongoDB? This is determined by checking whether # host and port have values, since they're set to nil on calls to #close. def connected? @@ -581,7 +730,7 @@ module Mongo # pool size has not been exceeded. Otherwise, wait for the next # available socket. def checkout - connect_to_master if !connected? + connect if !connected? start_time = Time.now loop do if (Time.now - start_time) > @timeout diff --git a/test/connection_test.rb b/test/connection_test.rb index 1dbb790..46cefb5 100644 --- a/test/connection_test.rb +++ b/test/connection_test.rb @@ -18,6 +18,10 @@ class TestConnection < Test::Unit::TestCase @mongo.db(MONGO_TEST_DB).error end + def test_slave_ok_with_multiple_nodes + + end + def test_server_info server_info = @mongo.server_info assert server_info.keys.include?("version") @@ -133,6 +137,12 @@ class TestConnection < Test::Unit::TestCase assert_equal ['bar', 27018], nodes[1] end + def test_slave_ok_with_multiple_nodes + assert_raise MongoArgumentError do + Connection.paired([['foo', 27017], ['bar', 27018]], :connect => false, :slave_ok => true) + end + end + context "Saved authentications" do setup do @conn = Mongo::Connection.new diff --git a/test/replica/count_test.rb b/test/replica_pairs/count_test.rb similarity index 100% rename from test/replica/count_test.rb rename to test/replica_pairs/count_test.rb diff --git a/test/replica/insert_test.rb b/test/replica_pairs/insert_test.rb similarity index 100% rename from test/replica/insert_test.rb rename to test/replica_pairs/insert_test.rb diff --git a/test/replica/pooled_insert_test.rb b/test/replica_pairs/pooled_insert_test.rb similarity index 100% rename from test/replica/pooled_insert_test.rb rename to test/replica_pairs/pooled_insert_test.rb diff --git a/test/replica/query_test.rb b/test/replica_pairs/query_test.rb similarity index 100% rename from test/replica/query_test.rb rename to test/replica_pairs/query_test.rb diff --git a/test/replica_sets/count_test.rb b/test/replica_sets/count_test.rb new file mode 100644 index 0000000..3f4eacc --- /dev/null +++ b/test/replica_sets/count_test.rb @@ -0,0 +1,33 @@ +$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) +require 'mongo' +require 'test/unit' +require 'test/test_helper' + +# NOTE: This test expects a replica set of three nodes to be running +# on the local host. +class ReplicaSetCountTest < Test::Unit::TestCase + include Mongo + + def setup + @conn = Mongo::Connection.multi([['localhost', 27017], ['localhost', 27018], ['localhost', 27019]]) + @db = @conn.db(MONGO_TEST_DB) + @db.drop_collection("test-sets") + @coll = @db.collection("test-sets") + end + + def test_correct_count_after_insertion_reconnect + @coll.insert({:a => 20})#, :safe => {:w => 3, :wtimeout => 10000}) + assert_equal 1, @coll.count + + puts "Please disconnect the current master." + gets + + rescue_connection_failure do + @coll.insert({:a => 30}, :safe => true) + end + + @coll.insert({:a => 40}, :safe => true) + assert_equal 3, @coll.count, "Second count failed" + end + +end diff --git a/test/replica_sets/insert_test.rb b/test/replica_sets/insert_test.rb new file mode 100644 index 0000000..8eef365 --- /dev/null +++ b/test/replica_sets/insert_test.rb @@ -0,0 +1,50 @@ +$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) +require 'mongo' +require 'test/unit' +require 'test/test_helper' + +# NOTE: this test should be run only if a replica pair is running. +class ReplicaPairInsertTest < Test::Unit::TestCase + include Mongo + + def setup + @conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil) + @db = @conn.db('mongo-ruby-test') + @db.drop_collection("test-pairs") + @coll = @db.collection("test-pairs") + end + + def test_insert + @coll.save({:a => 20}, :safe => true) + puts "Please disconnect the current master." + gets + + rescue_connection_failure do + @coll.save({:a => 30}, :safe => true) + end + + @coll.save({:a => 40}, :safe => true) + @coll.save({:a => 50}, :safe => true) + @coll.save({:a => 60}, :safe => true) + @coll.save({:a => 70}, :safe => true) + + puts "Please reconnect the old master to make sure that the new master " + + "has synced with the previous master. Note: this may have happened already." + gets + results = [] + + rescue_connection_failure do + @coll.find.each {|r| results << r} + [20, 30, 40, 50, 60, 70].each do |a| + assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}" + end + end + + @coll.save({:a => 80}, :safe => true) + @coll.find.each {|r| results << r} + [20, 30, 40, 50, 60, 70, 80].each do |a| + assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a} on second find" + end + end + +end diff --git a/test/replica_sets/pooled_insert_test.rb b/test/replica_sets/pooled_insert_test.rb new file mode 100644 index 0000000..ed8984b --- /dev/null +++ b/test/replica_sets/pooled_insert_test.rb @@ -0,0 +1,54 @@ +$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) +require 'mongo' +require 'test/unit' +require 'test/test_helper' + +# NOTE: this test should be run only if a replica pair is running. +class ReplicaPairPooledInsertTest < Test::Unit::TestCase + include Mongo + + def setup + @conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil, :pool_size => 10, :timeout => 5) + @db = @conn.db('mongo-ruby-test') + @db.drop_collection("test-pairs") + @coll = @db.collection("test-pairs") + end + + def test_insert + expected_results = [-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + @coll.save({:a => -1}, :safe => true) + puts "Please disconnect the current master." + gets + + threads = [] + 10.times do |i| + threads[i] = Thread.new do + rescue_connection_failure do + @coll.save({:a => i}, :safe => true) + end + end + end + + puts "Please reconnect the old master to make sure that the new master " + + "has synced with the previous master. Note: this may have happened already." + + "Note also that when connection with multiple threads, you may need to wait a few seconds" + + "after restarting the old master so that all the data has had a chance to sync." + + "This is a case of eventual consistency." + gets + results = [] + + rescue_connection_failure do + @coll.find.each {|r| results << r} + expected_results.each do |a| + assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}" + end + end + + @coll.save({:a => 10}, :safe => true) + @coll.find.each {|r| results << r} + (expected_results + [10]).each do |a| + assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a} on second find" + end + end + +end diff --git a/test/replica_sets/query_test.rb b/test/replica_sets/query_test.rb new file mode 100644 index 0000000..65da640 --- /dev/null +++ b/test/replica_sets/query_test.rb @@ -0,0 +1,39 @@ +$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib')) +require 'mongo' +require 'test/unit' +require 'test/test_helper' + +# NOTE: this test should be run only if a replica pair is running. +class ReplicaPairQueryTest < Test::Unit::TestCase + include Mongo + + def setup + @conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil) + @db = @conn.db('mongo-ruby-test') + @db.drop_collection("test-pairs") + @coll = @db.collection("test-pairs") + end + + def test_query + @coll.save({:a => 20}) + @coll.save({:a => 30}) + @coll.save({:a => 40}) + results = [] + @coll.find.each {|r| results << r} + [20, 30, 40].each do |a| + assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}" + end + + puts "Please disconnect the current master." + gets + + results = [] + rescue_connection_failure do + @coll.find.each {|r| results << r} + [20, 30, 40].each do |a| + assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}" + end + end + end + +end diff --git a/test/unit/connection_test.rb b/test/unit/connection_test.rb index 199b73e..8246e1e 100644 --- a/test/unit/connection_test.rb +++ b/test/unit/connection_test.rb @@ -18,13 +18,13 @@ class ConnectionTest < Test::Unit::TestCase context "given a single node" do setup do - TCPSocket.stubs(:new).returns(new_mock_socket) @conn = Connection.new('localhost', 27017, :connect => false) + TCPSocket.stubs(:new).returns(new_mock_socket) admin_db = new_mock_db admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1}) @conn.expects(:[]).with('admin').returns(admin_db) - @conn.connect_to_master + @conn.connect end should "set localhost and port to master" do @@ -41,19 +41,63 @@ class ConnectionTest < Test::Unit::TestCase end end + context "connecting to a replica set" do + setup do + TCPSocket.stubs(:new).returns(new_mock_socket) + @conn = Connection.new('localhost', 27017, :connect => false) + + admin_db = new_mock_db + @hosts = ['localhost:27018', 'localhost:27019'] + admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1, 'hosts' => @hosts}) + @conn.expects(:[]).with('admin').returns(admin_db) + @conn.connect + end + + should "store the hosts returned from the ismaster command" do + @hosts.each do |host| + host, port = host.split(":") + port = port.to_i + assert @conn.nodes.include?([host, port]), "Connection doesn't include host #{host.inspect}." + end + end + end + + context "connecting to a replica set and providing seed nodes" do + setup do + TCPSocket.stubs(:new).returns(new_mock_socket) + @conn = Connection.multi([['localhost', 27017], ['localhost', 27019]], :connect => false) + + admin_db = new_mock_db + @hosts = ['localhost:27017', 'localhost:27018', 'localhost:27019'] + admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1, 'hosts' => @hosts}) + @conn.expects(:[]).with('admin').returns(admin_db) + @conn.connect + end + + should "not store any hosts redundantly" do + assert_equal 3, @conn.nodes.size + + @hosts.each do |host| + host, port = host.split(":") + port = port.to_i + assert @conn.nodes.include?([host, port]), "Connection doesn't include host #{host.inspect}." + end + end + end + context "initializing a paired connection" do should "require left and right nodes" do assert_raise MongoArgumentError do - Connection.paired(['localhost', 27018], :connect => false) + Connection.multi(['localhost', 27018], :connect => false) end assert_raise MongoArgumentError do - Connection.paired(['localhost', 27018], :connect => false) + Connection.multi(['localhost', 27018], :connect => false) end end should "store both nodes" do - @conn = Connection.paired([['localhost', 27017], ['localhost', 27018]], :connect => false) + @conn = Connection.multi([['localhost', 27017], ['localhost', 27018]], :connect => false) assert_equal ['localhost', 27017], @conn.nodes[0] assert_equal ['localhost', 27018], @conn.nodes[1] @@ -103,7 +147,7 @@ class ConnectionTest < Test::Unit::TestCase admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1}) @conn.expects(:[]).with('admin').returns(admin_db) @conn.expects(:apply_saved_authentication) - @conn.connect_to_master + @conn.connect end should "raise an error on invalid uris" do