Initial replica set support

This commit is contained in:
Kyle Banker 2010-07-19 12:07:46 -04:00
parent c9573f05ad
commit 0afa5aa412
12 changed files with 424 additions and 25 deletions

View File

@ -58,23 +58,43 @@ namespace :test do
t.verbose = true t.verbose = true
end end
Rake::TestTask.new(:pair_count) do |t| Rake::TestTask.new(:replica_pair_count) do |t|
t.test_files = FileList['test/replica/count_test.rb'] t.test_files = FileList['test/replica_pairs/count_test.rb']
t.verbose = true t.verbose = true
end end
Rake::TestTask.new(:pair_insert) do |t| Rake::TestTask.new(:replica_pair_insert) do |t|
t.test_files = FileList['test/replica/insert_test.rb'] t.test_files = FileList['test/replica_pairs/insert_test.rb']
t.verbose = true t.verbose = true
end end
Rake::TestTask.new(:pooled_pair_insert) do |t| Rake::TestTask.new(:pooled_replica_pair_insert) do |t|
t.test_files = FileList['test/replica/pooled_insert_test.rb'] t.test_files = FileList['test/replica_pairs/pooled_insert_test.rb']
t.verbose = true t.verbose = true
end end
Rake::TestTask.new(:pair_query) do |t| Rake::TestTask.new(:replica_pair_query) do |t|
t.test_files = FileList['test/replica/query_test.rb'] t.test_files = FileList['test/replica_pairs/query_test.rb']
t.verbose = true
end
Rake::TestTask.new(:replica_set_count) do |t|
t.test_files = FileList['test/replica_sets/count_test.rb']
t.verbose = true
end
Rake::TestTask.new(:replica_set_insert) do |t|
t.test_files = FileList['test/replica_sets/insert_test.rb']
t.verbose = true
end
Rake::TestTask.new(:pooled_replica_set_insert) do |t|
t.test_files = FileList['test/replica_sets/pooled_insert_test.rb']
t.verbose = true
end
Rake::TestTask.new(:replica_set_query) do |t|
t.test_files = FileList['test/replica_sets/query_test.rb']
t.verbose = true t.verbose = true
end end

View File

@ -40,16 +40,15 @@ module Mongo
# Counter for generating unique request ids. # Counter for generating unique request ids.
@@current_request_id = 0 @@current_request_id = 0
# Create a connection to MongoDB. Specify either one or a pair of servers, # Create a connection to MongoDB.
# along with a maximum connection pool size and timeout.
# #
# If connecting to just one server, you may specify whether connection to slave is permitted. # If connecting to just one server, you may specify whether connection to slave is permitted.
# In all cases, the default host is "localhost" and the default port is 27017. # In all cases, the default host is "localhost" and the default port is 27017.
# #
# To specify a pair, use Connection.paired. # To specify more than one host pair to be used as seeds in a replica set
# # or replica pair, use Connection.multi. If you're only specifying one node in the
# Note that there are a few issues when using connection pooling with Ruby 1.9 on Windows. These # replica set, you can use Connection.new, as any other host known to the set will be
# should be resolved in the next release. # cached.
# #
# @param [String, Hash] host. # @param [String, Hash] host.
# @param [Integer] port specify a port number here if only one host is being specified. # @param [Integer] port specify a port number here if only one host is being specified.
@ -57,7 +56,8 @@ module Mongo
# @option options [Boolean] :slave_ok (false) Must be set to +true+ when connecting # @option options [Boolean] :slave_ok (false) Must be set to +true+ when connecting
# to a single, slave node. # to a single, slave node.
# @option options [Logger, #debug] :logger (nil) Logger instance to receive driver operation log. # @option options [Logger, #debug] :logger (nil) Logger instance to receive driver operation log.
# @option options [Integer] :pool_size (1) The maximum number of socket connections that can be opened to the database. # @option options [Integer] :pool_size (1) The maximum number of socket connections that can be
# opened to the database.
# @option options [Float] :timeout (5.0) When all of the connections to the pool are checked out, # @option options [Float] :timeout (5.0) When all of the connections to the pool are checked out,
# this is the number of seconds to wait for a new connection to be released before throwing an exception. # this is the number of seconds to wait for a new connection to be released before throwing an exception.
# #
@ -110,14 +110,49 @@ module Mongo
@checked_out = [] @checked_out = []
# slave_ok can be true only if one node is specified # slave_ok can be true only if one node is specified
@slave_ok = options[:slave_ok] && @nodes.length == 1 if @nodes.length > 1 && options[:slave_ok]
raise MongoArgumentError, "Can't specify more than one node when :slave_ok is true."
else
@slave_ok = options[:slave_ok]
end
@logger = options[:logger] || nil @logger = options[:logger] || nil
@options = options @options = options
should_connect = options[:connect].nil? ? true : options[:connect] should_connect = options[:connect].nil? ? true : options[:connect]
connect_to_master if should_connect connect if should_connect
end end
# Initialize a paired connection to MongoDB.
#
# @param nodes [Array] An array of arrays, each of which specified a host and port.
# @param opts Takes the same options as Connection.new
#
# @example
# Connection.paired([["db1.example.com", 27017],
# ["db2.example.com", 27017]])
#
# @example
# Connection.paired([["db1.example.com", 27017],
# ["db2.example.com", 27017]],
# :pool_size => 20, :timeout => 5)
#
# @return [Mongo::Connection]
def self.multi(nodes, opts={})
unless nodes.length > 0 && nodes.all? {|n| n.is_a? Array}
raise MongoArgumentError, "Connection.paired requires at least one node to be specified."
end
# Block returns an array, the first element being an array of nodes and the second an array
# of authorizations for the database.
new(nil, nil, opts) do |con|
nodes.map do |node|
con.pair_val_to_connection(node)
end
end
end
# @deprecated
#
# Initialize a paired connection to MongoDB. # Initialize a paired connection to MongoDB.
# #
# @param nodes [Array] An array of arrays, each of which specified a host and port. # @param nodes [Array] An array of arrays, each of which specified a host and port.
@ -134,6 +169,7 @@ module Mongo
# #
# @return [Mongo::Connection] # @return [Mongo::Connection]
def self.paired(nodes, opts={}) def self.paired(nodes, opts={})
warn "Connection.paired is deprecated. Please use Connection.multi instead."
unless nodes.length == 2 && nodes.all? {|n| n.is_a? Array} unless nodes.length == 2 && nodes.all? {|n| n.is_a? Array}
raise MongoArgumentError, "Connection.paired requires that exactly two nodes be specified." raise MongoArgumentError, "Connection.paired requires that exactly two nodes be specified."
end end
@ -412,11 +448,109 @@ module Mongo
result result
end end
# Create a new socket and attempt to connect to master.
# If successful, sets host and port to master and returns the socket.
#
# If connecting to a replica set, this method will update the
# initially-provided seed list with any nodes known to the set.
#
# @raise [ConnectionFailure] if unable to connect to any host or port.
def connect
reset_connection
first_node = @nodes.first
if is_primary?(check_is_master(first_node))
set_primary(first_node)
else
while !connected? && !(nodes_to_try = @nodes - @nodes_tried).empty?
nodes_to_try.each do |node|
if is_primary?(check_is_master(node))
set_primary(node)
break
end
end
end
end
raise ConnectionFailure, "failed to connect to any given host:port" unless connected?
end
# @deprecated
#
# Create a new socket and attempt to connect to master.
# If successful, sets host and port to master and returns the socket.
def connect_to_master
warn "Connection#connect_to_master is deprecated. Use Connection#connect instead."
connect
end
def reset_connection
close
@host = nil
@port = nil
@nodes_tried = []
end
def connected?
@host && @port
end
# Primary is defined as either a master node or a slave if
# :slave_ok has been set to +true+.
#
# If a primary node is discovered, we set the the @host and @port and
# apply any saved authentication.
#
# TODO: use the 'primary', and 'seconday' fields if we're in a replica set
def is_primary?(config)
config && (config['ismaster'] == 1 || config['ismaster'] == true) || @slave_ok
end
# @return
def check_is_master(node)
begin
host, port = *node
socket = TCPSocket.new(host, port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
config = self['admin'].command({:ismaster => 1}, :sock => socket)
rescue OperationFailure, SocketError, SystemCallError, IOError => ex
close
ensure
@nodes_tried << node
update_node_list(config['hosts']) if config && config['hosts']
socket.close if socket
end
config
end
def set_primary(node)
@host, @port = *node
apply_saved_authentication
end
def update_node_list(hosts)
new_nodes = hosts.map do |host|
if !host.respond_to?(:split)
warn "Could not parse host #{host.inspect}."
next
end
host, port = host.split(':')
[host, port.to_i]
end
@nodes |= new_nodes
end
# Create a new socket and attempt to connect to master. # Create a new socket and attempt to connect to master.
# If successful, sets host and port to master and returns the socket. # If successful, sets host and port to master and returns the socket.
# #
# @raise [ConnectionFailure] if unable to connect to any host or port. # @raise [ConnectionFailure] if unable to connect to any host or port.
def connect_to_master def connect_to_master_old
close close
@host = @port = nil @host = @port = nil
for node_pair in @nodes for node_pair in @nodes
@ -450,6 +584,21 @@ module Mongo
raise ConnectionFailure, "failed to connect to any given host:port" unless socket raise ConnectionFailure, "failed to connect to any given host:port" unless socket
end end
def attempt_node(node)
host, port = *node
socket = TCPSocket.new(host, port)
socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
# If we're connected to master, set the @host and @port
result = self['admin'].command({:ismaster => 1}, :check_response => false, :sock => socket)
if Mongo::Support.ok?(result)
if ((is_master = result['ismaster'] == 1) || @slave_ok)
@host, @port = host, port
end
apply_saved_authentication
end
end
# Are we connected to MongoDB? This is determined by checking whether # Are we connected to MongoDB? This is determined by checking whether
# host and port have values, since they're set to nil on calls to #close. # host and port have values, since they're set to nil on calls to #close.
def connected? def connected?
@ -581,7 +730,7 @@ module Mongo
# pool size has not been exceeded. Otherwise, wait for the next # pool size has not been exceeded. Otherwise, wait for the next
# available socket. # available socket.
def checkout def checkout
connect_to_master if !connected? connect if !connected?
start_time = Time.now start_time = Time.now
loop do loop do
if (Time.now - start_time) > @timeout if (Time.now - start_time) > @timeout

View File

@ -18,6 +18,10 @@ class TestConnection < Test::Unit::TestCase
@mongo.db(MONGO_TEST_DB).error @mongo.db(MONGO_TEST_DB).error
end end
def test_slave_ok_with_multiple_nodes
end
def test_server_info def test_server_info
server_info = @mongo.server_info server_info = @mongo.server_info
assert server_info.keys.include?("version") assert server_info.keys.include?("version")
@ -133,6 +137,12 @@ class TestConnection < Test::Unit::TestCase
assert_equal ['bar', 27018], nodes[1] assert_equal ['bar', 27018], nodes[1]
end end
def test_slave_ok_with_multiple_nodes
assert_raise MongoArgumentError do
Connection.paired([['foo', 27017], ['bar', 27018]], :connect => false, :slave_ok => true)
end
end
context "Saved authentications" do context "Saved authentications" do
setup do setup do
@conn = Mongo::Connection.new @conn = Mongo::Connection.new

View File

@ -0,0 +1,33 @@
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require 'mongo'
require 'test/unit'
require 'test/test_helper'
# NOTE: This test expects a replica set of three nodes to be running
# on the local host.
class ReplicaSetCountTest < Test::Unit::TestCase
include Mongo
def setup
@conn = Mongo::Connection.multi([['localhost', 27017], ['localhost', 27018], ['localhost', 27019]])
@db = @conn.db(MONGO_TEST_DB)
@db.drop_collection("test-sets")
@coll = @db.collection("test-sets")
end
def test_correct_count_after_insertion_reconnect
@coll.insert({:a => 20})#, :safe => {:w => 3, :wtimeout => 10000})
assert_equal 1, @coll.count
puts "Please disconnect the current master."
gets
rescue_connection_failure do
@coll.insert({:a => 30}, :safe => true)
end
@coll.insert({:a => 40}, :safe => true)
assert_equal 3, @coll.count, "Second count failed"
end
end

View File

@ -0,0 +1,50 @@
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require 'mongo'
require 'test/unit'
require 'test/test_helper'
# NOTE: this test should be run only if a replica pair is running.
class ReplicaPairInsertTest < Test::Unit::TestCase
include Mongo
def setup
@conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil)
@db = @conn.db('mongo-ruby-test')
@db.drop_collection("test-pairs")
@coll = @db.collection("test-pairs")
end
def test_insert
@coll.save({:a => 20}, :safe => true)
puts "Please disconnect the current master."
gets
rescue_connection_failure do
@coll.save({:a => 30}, :safe => true)
end
@coll.save({:a => 40}, :safe => true)
@coll.save({:a => 50}, :safe => true)
@coll.save({:a => 60}, :safe => true)
@coll.save({:a => 70}, :safe => true)
puts "Please reconnect the old master to make sure that the new master " +
"has synced with the previous master. Note: this may have happened already."
gets
results = []
rescue_connection_failure do
@coll.find.each {|r| results << r}
[20, 30, 40, 50, 60, 70].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end
end
@coll.save({:a => 80}, :safe => true)
@coll.find.each {|r| results << r}
[20, 30, 40, 50, 60, 70, 80].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a} on second find"
end
end
end

View File

@ -0,0 +1,54 @@
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require 'mongo'
require 'test/unit'
require 'test/test_helper'
# NOTE: this test should be run only if a replica pair is running.
class ReplicaPairPooledInsertTest < Test::Unit::TestCase
include Mongo
def setup
@conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil, :pool_size => 10, :timeout => 5)
@db = @conn.db('mongo-ruby-test')
@db.drop_collection("test-pairs")
@coll = @db.collection("test-pairs")
end
def test_insert
expected_results = [-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
@coll.save({:a => -1}, :safe => true)
puts "Please disconnect the current master."
gets
threads = []
10.times do |i|
threads[i] = Thread.new do
rescue_connection_failure do
@coll.save({:a => i}, :safe => true)
end
end
end
puts "Please reconnect the old master to make sure that the new master " +
"has synced with the previous master. Note: this may have happened already." +
"Note also that when connection with multiple threads, you may need to wait a few seconds" +
"after restarting the old master so that all the data has had a chance to sync." +
"This is a case of eventual consistency."
gets
results = []
rescue_connection_failure do
@coll.find.each {|r| results << r}
expected_results.each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end
end
@coll.save({:a => 10}, :safe => true)
@coll.find.each {|r| results << r}
(expected_results + [10]).each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a} on second find"
end
end
end

View File

@ -0,0 +1,39 @@
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require 'mongo'
require 'test/unit'
require 'test/test_helper'
# NOTE: this test should be run only if a replica pair is running.
class ReplicaPairQueryTest < Test::Unit::TestCase
include Mongo
def setup
@conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil)
@db = @conn.db('mongo-ruby-test')
@db.drop_collection("test-pairs")
@coll = @db.collection("test-pairs")
end
def test_query
@coll.save({:a => 20})
@coll.save({:a => 30})
@coll.save({:a => 40})
results = []
@coll.find.each {|r| results << r}
[20, 30, 40].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end
puts "Please disconnect the current master."
gets
results = []
rescue_connection_failure do
@coll.find.each {|r| results << r}
[20, 30, 40].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end
end
end
end

View File

@ -18,13 +18,13 @@ class ConnectionTest < Test::Unit::TestCase
context "given a single node" do context "given a single node" do
setup do setup do
TCPSocket.stubs(:new).returns(new_mock_socket)
@conn = Connection.new('localhost', 27017, :connect => false) @conn = Connection.new('localhost', 27017, :connect => false)
TCPSocket.stubs(:new).returns(new_mock_socket)
admin_db = new_mock_db admin_db = new_mock_db
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1}) admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1})
@conn.expects(:[]).with('admin').returns(admin_db) @conn.expects(:[]).with('admin').returns(admin_db)
@conn.connect_to_master @conn.connect
end end
should "set localhost and port to master" do should "set localhost and port to master" do
@ -41,19 +41,63 @@ class ConnectionTest < Test::Unit::TestCase
end end
end end
context "connecting to a replica set" do
setup do
TCPSocket.stubs(:new).returns(new_mock_socket)
@conn = Connection.new('localhost', 27017, :connect => false)
admin_db = new_mock_db
@hosts = ['localhost:27018', 'localhost:27019']
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1, 'hosts' => @hosts})
@conn.expects(:[]).with('admin').returns(admin_db)
@conn.connect
end
should "store the hosts returned from the ismaster command" do
@hosts.each do |host|
host, port = host.split(":")
port = port.to_i
assert @conn.nodes.include?([host, port]), "Connection doesn't include host #{host.inspect}."
end
end
end
context "connecting to a replica set and providing seed nodes" do
setup do
TCPSocket.stubs(:new).returns(new_mock_socket)
@conn = Connection.multi([['localhost', 27017], ['localhost', 27019]], :connect => false)
admin_db = new_mock_db
@hosts = ['localhost:27017', 'localhost:27018', 'localhost:27019']
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1, 'hosts' => @hosts})
@conn.expects(:[]).with('admin').returns(admin_db)
@conn.connect
end
should "not store any hosts redundantly" do
assert_equal 3, @conn.nodes.size
@hosts.each do |host|
host, port = host.split(":")
port = port.to_i
assert @conn.nodes.include?([host, port]), "Connection doesn't include host #{host.inspect}."
end
end
end
context "initializing a paired connection" do context "initializing a paired connection" do
should "require left and right nodes" do should "require left and right nodes" do
assert_raise MongoArgumentError do assert_raise MongoArgumentError do
Connection.paired(['localhost', 27018], :connect => false) Connection.multi(['localhost', 27018], :connect => false)
end end
assert_raise MongoArgumentError do assert_raise MongoArgumentError do
Connection.paired(['localhost', 27018], :connect => false) Connection.multi(['localhost', 27018], :connect => false)
end end
end end
should "store both nodes" do should "store both nodes" do
@conn = Connection.paired([['localhost', 27017], ['localhost', 27018]], :connect => false) @conn = Connection.multi([['localhost', 27017], ['localhost', 27018]], :connect => false)
assert_equal ['localhost', 27017], @conn.nodes[0] assert_equal ['localhost', 27017], @conn.nodes[0]
assert_equal ['localhost', 27018], @conn.nodes[1] assert_equal ['localhost', 27018], @conn.nodes[1]
@ -103,7 +147,7 @@ class ConnectionTest < Test::Unit::TestCase
admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1}) admin_db.expects(:command).returns({'ok' => 1, 'ismaster' => 1})
@conn.expects(:[]).with('admin').returns(admin_db) @conn.expects(:[]).with('admin').returns(admin_db)
@conn.expects(:apply_saved_authentication) @conn.expects(:apply_saved_authentication)
@conn.connect_to_master @conn.connect
end end
should "raise an error on invalid uris" do should "raise an error on invalid uris" do