Fixes for replica pairs (unstable)

This commit is contained in:
Kyle Banker 2009-11-20 17:48:41 -05:00
parent 25cf35ecbb
commit 12454d04ad
6 changed files with 144 additions and 28 deletions

View File

@ -31,6 +31,16 @@ namespace :test do
t.test_files = FileList['test/test*.rb'] t.test_files = FileList['test/test*.rb']
t.verbose = true t.verbose = true
end end
Rake::TestTask.new(:pair_insert) do |t|
t.test_files = FileList['test/replica/pair_test.rb']
t.verbose = true
end
Rake::TestTask.new(:pair_query) do |t|
t.test_files = FileList['test/replica/query_test.rb']
t.verbose = true
end
end end
desc "Generate documentation" desc "Generate documentation"

View File

@ -31,5 +31,5 @@ module Mongo
ASCENDING = 1 ASCENDING = 1
DESCENDING = -1 DESCENDING = -1
VERSION = "0.17.1" VERSION = "0.17.2"
end end

View File

@ -15,6 +15,7 @@
# ++ # ++
require 'socket' require 'socket'
require 'timeout'
require 'digest/md5' require 'digest/md5'
require 'thread' require 'thread'
require 'mongo/collection' require 'mongo/collection'
@ -147,6 +148,7 @@ module Mongo
@semaphore = Mutex.new @semaphore = Mutex.new
@socket = nil @socket = nil
@logger = options[:logger] @logger = options[:logger]
@network_timeout = 20
connect_to_master connect_to_master
end end
@ -166,10 +168,10 @@ module Mongo
is_master = master? is_master = master?
@semaphore.lock if semaphore_is_locked @semaphore.lock if semaphore_is_locked
if !@slave_ok && !is_master if @nodes.length == 1 && !is_master && !@slave_ok
raise ConfigurationError, "Trying to connect directly to slave; if this is what you want, specify :slave_ok => true." raise ConfigurationError, "Trying to connect directly to slave; if this is what you want, specify :slave_ok => true."
end end
@slave_ok || is_master is_master || @slave_ok
rescue SocketError, SystemCallError, IOError => ex rescue SocketError, SystemCallError, IOError => ex
close if @socket close if @socket
false false
@ -352,16 +354,6 @@ module Mongo
@socket != nil @socket != nil
end end
def receive_full(length)
message = ""
while message.length < length do
chunk = @socket.recv(length - message.length)
raise "connection closed" unless chunk.length > 0
message += chunk
end
message
end
# Returns a Cursor over the query results. # Returns a Cursor over the query results.
# #
# Note that the query gets sent lazily; the cursor calls # Note that the query gets sent lazily; the cursor calls
@ -465,10 +457,14 @@ module Mongo
message_with_check = last_error_message message_with_check = last_error_message
@logger.debug(" MONGODB #{log_message || message}") if @logger @logger.debug(" MONGODB #{log_message || message}") if @logger
@semaphore.synchronize do @semaphore.synchronize do
safe_send do
send_message_on_socket(message_with_headers.append!(message_with_check).to_s) send_message_on_socket(message_with_headers.append!(message_with_check).to_s)
docs, num_received, cursor_id = receive docs, num_received, cursor_id = receive
if num_received == 1 && error = docs[0]['err'] if num_received == 1 && error = docs[0]['err']
raise Mongo::OperationFailure, error raise Mongo::OperationFailure, error
else
true
end
end end
end end
end end
@ -478,9 +474,32 @@ module Mongo
message_with_headers = add_message_headers(operation, message).to_s message_with_headers = add_message_headers(operation, message).to_s
@logger.debug(" MONGODB #{log_message || message}") if @logger @logger.debug(" MONGODB #{log_message || message}") if @logger
@semaphore.synchronize do @semaphore.synchronize do
response = ""
safe_send do
send_message_on_socket(message_with_headers) send_message_on_socket(message_with_headers)
receive response = receive
end end
response
end
end
# Capture errors and try to reconnect
def safe_send
sent = false
while(!sent) do
begin
response = yield
sent = true
rescue Timeout::Error, SocketError, SystemCallError, IOError, ConnectionFailure => ex
if @auto_reconnect
connect_to_master
else
raise ex
close
end
end
end
response
end end
# Return +true+ if +doc+ contains an 'ok' field with the value 1. # Return +true+ if +doc+ contains an 'ok' field with the value 1.
@ -592,22 +611,37 @@ module Mongo
# Sending a message on socket. # Sending a message on socket.
def send_message_on_socket(packed_message) def send_message_on_socket(packed_message)
connect_to_master if !connected? && @auto_reconnect connect_to_master if !connected? && @auto_reconnect
sent = false
while !sent do
begin begin
@socket.print(packed_message) @socket.print(packed_message)
@socket.flush @socket.flush
sent = true
rescue => ex rescue => ex
sent = false
if @auto_reconnect
connect_to_master
else
close close
raise ex raise ex
end end
end end
end
end
# Receive data of specified length on socket. # Receive data of specified length on socket.
def receive_data_on_socket(length) def receive_data_on_socket(length)
connect_to_master if !connected? && @auto_reconnect
message = "" message = ""
chunk = ""
while message.length < length do while message.length < length do
chunk = @socket.recv(length - message.length) begin
raise "connection closed" unless chunk.length > 0 chunk = @socket.read(length - message.length)
raise ConnectionFailure, "connection closed" unless chunk && chunk.length > 0
message += chunk message += chunk
rescue Timeout => ex
raise OperationFailure, "Database command timed out."
end
end end
message message
end end

View File

@ -30,6 +30,9 @@ module Mongo
# Raised when a database operation fails. # Raised when a database operation fails.
class OperationFailure < MongoDBError; end class OperationFailure < MongoDBError; end
# Raised when a database operation fails.
class ConnectionFailure < MongoDBError; end
# Raised when a client attempts to perform an invalid operation. # Raised when a client attempts to perform an invalid operation.
class InvalidOperation < MongoDBError; end class InvalidOperation < MongoDBError; end

36
test/replica/pair_test.rb Normal file
View File

@ -0,0 +1,36 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
require 'test/unit'
# NOTE: this test should be run only if
class ReplicaPairTest < Test::Unit::TestCase
include Mongo
def setup
@conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil, :auto_reconnect => true)
@db = @conn.db('mongo-ruby-test')
@db.drop_collection("test-pairs")
@coll = @db.collection("test-pairs")
end
def test_insert
@coll.save({:a => 20}, :safe => true)
puts "Please disconnect the current master. Test will resume in 15 seconds."
sleep(15)
@coll.save({:a => 30}, :safe => true)
@coll.save({:a => 40}, :safe => true)
@coll.save({:a => 50}, :safe => true)
@coll.save({:a => 60}, :safe => true)
@coll.save({:a => 70}, :safe => true)
puts "Please reconnect the old master. Test will resume in 15 seconds."
sleep(15)
results = []
# Note: need to figure out what to do here.
@coll.find.each {|r| r}
@coll.find.each {|r| results << r}
[20, 30, 40, 50, 60, 70].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end
end
end

View File

@ -0,0 +1,33 @@
$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib')
require 'mongo'
require 'test/unit'
# NOTE: this test should be run only if
class ReplicaPairTest < Test::Unit::TestCase
include Mongo
def setup
@conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil, :auto_reconnect => true)
@db = @conn.db('mongo-ruby-test')
@db.drop_collection("test-pairs")
@coll = @db.collection("test-pairs")
end
def test_query
@coll.save({:a => 20})
@coll.save({:a => 30})
@coll.save({:a => 40})
results = []
@coll.find.each {|r| p results << r}
[20, 30, 40].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end
puts "Please disconnect the current master. Test will resume in 15 seconds."
sleep(15)
@coll.find.each {|r| p results << r}
[20, 30, 40].each do |a|
assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}"
end
end
end