Fixes for replica pairs.

This commit is contained in:
Kyle Banker 2009-11-23 13:13:14 -05:00
parent 12454d04ad
commit 7890d6e146
12 changed files with 176 additions and 131 deletions

View File

@ -255,7 +255,21 @@ If you have the source code, you can run the tests.
$ rake test $ rake test
The tests now require shoulda and mocha. You can install these gems as This will run both unit and functional tests. If you want to run these
individually:
$ rake test:unit
$ rake test:functional
If you want to test replica pairs, you can run the following tests
individually:
$ rake test:pair_count
$ rake test:pair_insert
$ rake test:pair_query
All tests now require shoulda and mocha. You can install these gems as
follows: follows:
$ gem install shoulda $ gem install shoulda

View File

@ -32,8 +32,13 @@ namespace :test do
t.verbose = true t.verbose = true
end end
Rake::TestTask.new(:pair_count) do |t|
t.test_files = FileList['test/replica/count_test.rb']
t.verbose = true
end
Rake::TestTask.new(:pair_insert) do |t| Rake::TestTask.new(:pair_insert) do |t|
t.test_files = FileList['test/replica/pair_test.rb'] t.test_files = FileList['test/replica/insert_test.rb']
t.verbose = true t.verbose = true
end end

View File

@ -39,12 +39,10 @@ module Mongo
# see if the server is the master. If it is not, an error # see if the server is the master. If it is not, an error
# is thrown. # is thrown.
# #
# :auto_reconnect :: If a DB connection gets closed (for example, we # :auto_reconnect :: DEPRECATED. When an operation fails, a
# have a server pair and saw the "not master" # ConnectionFailure will be raised. The client is encouraged to retry the
# error, which closes the connection), then # operation as necessary.
# automatically try to reconnect to the master or #
# to the single server we have been given. Defaults
# to +false+.
# :logger :: Optional Logger instance to which driver usage information # :logger :: Optional Logger instance to which driver usage information
# will be logged. # will be logged.
# #

View File

@ -63,9 +63,12 @@ module Mongo
# pair but it has died or something like that) then we close that # pair but it has died or something like that) then we close that
# connection. If the db has auto connect option and a pair of # connection. If the db has auto connect option and a pair of
# servers, next request will re-open on master server. # servers, next request will re-open on master server.
@db.close if err == "not master" if err == "not master"
raise ConnectionFailure, err
@db.close
end
raise err raise OperationFailure, err
end end
o o

View File

@ -69,7 +69,6 @@ module Mongo
attr_reader :logger attr_reader :logger
def slave_ok?; @slave_ok; end def slave_ok?; @slave_ok; end
def auto_reconnect?; @auto_reconnect; end
# A primary key factory object (or +nil+). See the README.doc file or # A primary key factory object (or +nil+). See the README.doc file or
# DB#new for details. # DB#new for details.
@ -110,15 +109,13 @@ module Mongo
# see if the server is the master. If it is not, an error # see if the server is the master. If it is not, an error
# is thrown. # is thrown.
# #
# :auto_reconnect :: If the connection gets closed (for example, we
# have a server pair and saw the "not master"
# error, which closes the connection), then
# automatically try to reconnect to the master or
# to the single server we have been given. Defaults
# to +false+.
# :logger :: Optional Logger instance to which driver usage information # :logger :: Optional Logger instance to which driver usage information
# will be logged. # will be logged.
# #
# :auto_reconnect :: DEPRECATED. When an operation fails, a
# ConnectionFailure will be raised. The client is encouraged to retry the
# operation as necessary.
#
# When a DB object first connects to a pair, it will find the master # When a DB object first connects to a pair, it will find the master
# instance and connect to that one. On socket error or if we recieve a # instance and connect to that one. On socket error or if we recieve a
# "not master" error, we again find the master of the pair. # "not master" error, we again find the master of the pair.
@ -144,7 +141,10 @@ module Mongo
@strict = options[:strict] @strict = options[:strict]
@pk_factory = options[:pk] @pk_factory = options[:pk]
@slave_ok = options[:slave_ok] && @nodes.length == 1 # only OK if one node @slave_ok = options[:slave_ok] && @nodes.length == 1 # only OK if one node
@auto_reconnect = options[:auto_reconnect] if options[:auto_reconnect]
warn(":auto_reconnect is deprecated. henceforth, any time an operation fails, " +
"the driver will attempt to reconnect master on subsequent operations.")
end
@semaphore = Mutex.new @semaphore = Mutex.new
@socket = nil @socket = nil
@logger = options[:logger] @logger = options[:logger]
@ -177,7 +177,7 @@ module Mongo
false false
end end
} }
raise "error: failed to connect to any given host:port" unless @socket raise ConnectionFailure, "error: failed to connect to any given host:port" unless @socket
end end
# Returns true if +username+ has +password+ in # Returns true if +username+ has +password+ in
@ -457,49 +457,28 @@ module Mongo
message_with_check = last_error_message message_with_check = last_error_message
@logger.debug(" MONGODB #{log_message || message}") if @logger @logger.debug(" MONGODB #{log_message || message}") if @logger
@semaphore.synchronize do @semaphore.synchronize do
safe_send do
send_message_on_socket(message_with_headers.append!(message_with_check).to_s) send_message_on_socket(message_with_headers.append!(message_with_check).to_s)
docs, num_received, cursor_id = receive docs, num_received, cursor_id = receive
if num_received == 1 && error = docs[0]['err'] if num_received == 1 && error = docs[0]['err']
if docs[0]['err'] == 'not master'
raise ConnectionFailure
else
raise Mongo::OperationFailure, error raise Mongo::OperationFailure, error
end
else else
true true
end end
end end
end end
end
# Send a message to the database and waits for the response. # Send a message to the database and waits for the response.
def receive_message_with_operation(operation, message, log_message=nil) def receive_message_with_operation(operation, message, log_message=nil)
message_with_headers = add_message_headers(operation, message).to_s message_with_headers = add_message_headers(operation, message).to_s
@logger.debug(" MONGODB #{log_message || message}") if @logger @logger.debug(" MONGODB #{log_message || message}") if @logger
@semaphore.synchronize do @semaphore.synchronize do
response = ""
safe_send do
send_message_on_socket(message_with_headers) send_message_on_socket(message_with_headers)
response = receive receive
end end
response
end
end
# Capture errors and try to reconnect
def safe_send
sent = false
while(!sent) do
begin
response = yield
sent = true
rescue Timeout::Error, SocketError, SystemCallError, IOError, ConnectionFailure => ex
if @auto_reconnect
connect_to_master
else
raise ex
close
end
end
end
response
end end
# Return +true+ if +doc+ contains an 'ok' field with the value 1. # Return +true+ if +doc+ contains an 'ok' field with the value 1.
@ -610,28 +589,19 @@ module Mongo
# Sending a message on socket. # Sending a message on socket.
def send_message_on_socket(packed_message) def send_message_on_socket(packed_message)
connect_to_master if !connected? && @auto_reconnect connect_to_master if !connected?
sent = false
while !sent do
begin begin
@socket.print(packed_message) @socket.print(packed_message)
@socket.flush @socket.flush
sent = true
rescue => ex rescue => ex
sent = false
if @auto_reconnect
connect_to_master
else
close close
raise ex raise ConnectionFailure, "Operation failed with the following exception: #{ex}."
end
end
end end
end end
# Receive data of specified length on socket. # Receive data of specified length on socket.
def receive_data_on_socket(length) def receive_data_on_socket(length)
connect_to_master if !connected? && @auto_reconnect connect_to_master if !connected?
message = "" message = ""
chunk = "" chunk = ""
while message.length < length do while message.length < length do
@ -639,8 +609,8 @@ module Mongo
chunk = @socket.read(length - message.length) chunk = @socket.read(length - message.length)
raise ConnectionFailure, "connection closed" unless chunk && chunk.length > 0 raise ConnectionFailure, "connection closed" unless chunk && chunk.length > 0
message += chunk message += chunk
rescue Timeout => ex rescue => ex
raise OperationFailure, "Database command timed out." raise ConnectionFailure, "Operation failed with the following exception: #{ex}"
end end
end end
message message

View File

@ -0,0 +1,34 @@
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require 'mongo'
require 'test/unit'
require 'test/test_helper'
# NOTE: this test should be run only if a replica pair is running.
class CountTest < Test::Unit::TestCase
include Mongo
def setup
@conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil)
@db = @conn.db('mongo-ruby-test')
@db.drop_collection("test-pairs")
@coll = @db.collection("test-pairs")
end
def test_correct_count_after_insertion_reconnect
@coll.insert({:a => 20}, :safe => true)
assert_equal 1, @coll.count
# Sleep to allow resync
sleep(3)
puts "Please disconnect the current master."
gets
rescue_connection_failure do
@coll.insert({:a => 30}, :safe => true)
end
@coll.insert({:a => 40}, :safe => true)
assert_equal 3, @coll.count, "Second count failed"
end
end

View File

@ -0,0 +1,51 @@
$:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require 'mongo'
require 'test/unit'
require 'test/test_helper'
# NOTE: this test should be run only if a replica pair is running.
class ReplicaPairTest < Test::Unit::TestCase
include Mongo
def setup
@conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil)
@db = @conn.db('mongo-ruby-test')
@db.drop_collection("test-pairs")
@coll = @db.collection("test-pairs")
end
def test_insert
@coll.save({:a => 20}, :safe => true)
puts "Please disconnect the current master."
gets
rescue_connection_failure do
@coll.save({:a => 30}, :safe => true)
end
@coll.save({:a => 40}, :safe => true)
@coll.save({:a => 50}, :safe => true)
@coll.save({:a => 60}, :safe => true)
@coll.save({:a => 70}, :safe => true)
puts "Please reconnect the old master to make sure that the new master " +
"has synced with the previous master. Note: this may have happened already."
gets
results = []
# Note: need to figure out what to do here since this first find will fail.
rescue_connection_failure do
@coll.find.each {|r| results << r}
[20, 30, 40, 50, 60, 70].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end
end
@coll.save({:a => 80}, :safe => true)
@coll.find.each {|r| results << r}
[20, 30, 40, 50, 60, 70, 80].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a} on second find"
end
end
end

View File

@ -1,36 +0,0 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
require 'test/unit'
# NOTE: this test should be run only if
class ReplicaPairTest < Test::Unit::TestCase
include Mongo
def setup
@conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil, :auto_reconnect => true)
@db = @conn.db('mongo-ruby-test')
@db.drop_collection("test-pairs")
@coll = @db.collection("test-pairs")
end
def test_insert
@coll.save({:a => 20}, :safe => true)
puts "Please disconnect the current master. Test will resume in 15 seconds."
sleep(15)
@coll.save({:a => 30}, :safe => true)
@coll.save({:a => 40}, :safe => true)
@coll.save({:a => 50}, :safe => true)
@coll.save({:a => 60}, :safe => true)
@coll.save({:a => 70}, :safe => true)
puts "Please reconnect the old master. Test will resume in 15 seconds."
sleep(15)
results = []
# Note: need to figure out what to do here.
@coll.find.each {|r| r}
@coll.find.each {|r| results << r}
[20, 30, 40, 50, 60, 70].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end
end
end

View File

@ -1,13 +1,14 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') $:.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
require 'mongo' require 'mongo'
require 'test/unit' require 'test/unit'
require 'test/test_helper'
# NOTE: this test should be run only if # NOTE: this test should be run only if a replica pair is running.
class ReplicaPairTest < Test::Unit::TestCase class ReplicaPairTest < Test::Unit::TestCase
include Mongo include Mongo
def setup def setup
@conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil, :auto_reconnect => true) @conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil)
@db = @conn.db('mongo-ruby-test') @db = @conn.db('mongo-ruby-test')
@db.drop_collection("test-pairs") @db.drop_collection("test-pairs")
@coll = @db.collection("test-pairs") @coll = @db.collection("test-pairs")
@ -18,16 +19,21 @@ class ReplicaPairTest < Test::Unit::TestCase
@coll.save({:a => 30}) @coll.save({:a => 30})
@coll.save({:a => 40}) @coll.save({:a => 40})
results = [] results = []
@coll.find.each {|r| p results << r} @coll.find.each {|r| results << r}
[20, 30, 40].each do |a| [20, 30, 40].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}" assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end end
puts "Please disconnect the current master. Test will resume in 15 seconds."
sleep(15) puts "Please disconnect the current master."
@coll.find.each {|r| p results << r} gets
results = []
rescue_connection_failure do
@coll.find.each {|r| results << r}
[20, 30, 40].each do |a| [20, 30, 40].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}" assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end end
end end
end
end end

View File

@ -144,21 +144,6 @@ class DBTest < Test::Unit::TestCase
@@db.logout # only testing that we don't throw exception @@db.logout # only testing that we don't throw exception
end end
def test_auto_connect
@@db.close
db = Connection.new(@@host, @@port, :auto_reconnect => true).db('ruby-mongo-test')
assert db.connected?
assert db.auto_reconnect?
db.close
assert !db.connected?
assert db.auto_reconnect?
db.collection('test').insert('a' => 1)
assert db.connected?
ensure
@@db = Connection.new(@@host, @@port).db('ruby-mongo-test')
@@users = @@db.collection('system.users')
end
def test_error def test_error
@@db.reset_error_history @@db.reset_error_history
assert_nil @@db.error assert_nil @@db.error

View File

@ -776,7 +776,7 @@ class DBAPITest < Test::Unit::TestCase
# doesn't really test functionality, just that the option is set correctly # doesn't really test functionality, just that the option is set correctly
def test_snapshot def test_snapshot
@@db.collection("test").find({}, :snapshot => true).to_a @@db.collection("test").find({}, :snapshot => true).to_a
assert_raise RuntimeError do assert_raise OperationFailure do
@@db.collection("test").find({}, :snapshot => true, :sort => 'a').to_a @@db.collection("test").find({}, :snapshot => true, :sort => 'a').to_a
end end
end end

View File

@ -22,4 +22,19 @@ require 'mongo'
# NOTE: most tests assume that MongoDB is running. # NOTE: most tests assume that MongoDB is running.
class Test::Unit::TestCase class Test::Unit::TestCase
include Mongo include Mongo
# Generic code for rescuing connection failures and retrying operations.
# This could be combined with some timeout functionality.
def rescue_connection_failure
success = false
while !success
begin
yield
success = true
rescue Mongo::ConnectionFailure
puts "Rescuing"
sleep(1)
end
end
end
end end