diff --git a/Rakefile b/Rakefile index 360ed75..4bd561a 100644 --- a/Rakefile +++ b/Rakefile @@ -31,6 +31,16 @@ namespace :test do t.test_files = FileList['test/test*.rb'] t.verbose = true end + + Rake::TestTask.new(:pair_insert) do |t| + t.test_files = FileList['test/replica/pair_test.rb'] + t.verbose = true + end + + Rake::TestTask.new(:pair_query) do |t| + t.test_files = FileList['test/replica/query_test.rb'] + t.verbose = true + end end desc "Generate documentation" diff --git a/lib/mongo.rb b/lib/mongo.rb index 84c60a4..787de49 100644 --- a/lib/mongo.rb +++ b/lib/mongo.rb @@ -31,5 +31,5 @@ module Mongo ASCENDING = 1 DESCENDING = -1 - VERSION = "0.17.1" + VERSION = "0.17.2" end diff --git a/lib/mongo/db.rb b/lib/mongo/db.rb index 6bf606d..5bac890 100644 --- a/lib/mongo/db.rb +++ b/lib/mongo/db.rb @@ -15,6 +15,7 @@ # ++ require 'socket' +require 'timeout' require 'digest/md5' require 'thread' require 'mongo/collection' @@ -147,6 +148,7 @@ module Mongo @semaphore = Mutex.new @socket = nil @logger = options[:logger] + @network_timeout = 20 connect_to_master end @@ -166,10 +168,10 @@ module Mongo is_master = master? @semaphore.lock if semaphore_is_locked - if !@slave_ok && !is_master + if @nodes.length == 1 && !is_master && !@slave_ok raise ConfigurationError, "Trying to connect directly to slave; if this is what you want, specify :slave_ok => true." end - @slave_ok || is_master + is_master || @slave_ok rescue SocketError, SystemCallError, IOError => ex close if @socket false @@ -352,16 +354,6 @@ module Mongo @socket != nil end - def receive_full(length) - message = "" - while message.length < length do - chunk = @socket.recv(length - message.length) - raise "connection closed" unless chunk.length > 0 - message += chunk - end - message - end - # Returns a Cursor over the query results. # # Note that the query gets sent lazily; the cursor calls @@ -465,10 +457,14 @@ module Mongo message_with_check = last_error_message @logger.debug(" MONGODB #{log_message || message}") if @logger @semaphore.synchronize do - send_message_on_socket(message_with_headers.append!(message_with_check).to_s) - docs, num_received, cursor_id = receive - if num_received == 1 && error = docs[0]['err'] - raise Mongo::OperationFailure, error + safe_send do + send_message_on_socket(message_with_headers.append!(message_with_check).to_s) + docs, num_received, cursor_id = receive + if num_received == 1 && error = docs[0]['err'] + raise Mongo::OperationFailure, error + else + true + end end end end @@ -478,11 +474,34 @@ module Mongo message_with_headers = add_message_headers(operation, message).to_s @logger.debug(" MONGODB #{log_message || message}") if @logger @semaphore.synchronize do - send_message_on_socket(message_with_headers) - receive + response = "" + safe_send do + send_message_on_socket(message_with_headers) + response = receive + end + response end end + # Capture errors and try to reconnect + def safe_send + sent = false + while(!sent) do + begin + response = yield + sent = true + rescue Timeout::Error, SocketError, SystemCallError, IOError, ConnectionFailure => ex + if @auto_reconnect + connect_to_master + else + raise ex + close + end + end + end + response + end + # Return +true+ if +doc+ contains an 'ok' field with the value 1. def ok?(doc) ok = doc['ok'] @@ -592,22 +611,37 @@ module Mongo # Sending a message on socket. def send_message_on_socket(packed_message) connect_to_master if !connected? && @auto_reconnect - begin - @socket.print(packed_message) - @socket.flush - rescue => ex - close - raise ex + sent = false + while !sent do + begin + @socket.print(packed_message) + @socket.flush + sent = true + rescue => ex + sent = false + if @auto_reconnect + connect_to_master + else + close + raise ex + end + end end end # Receive data of specified length on socket. def receive_data_on_socket(length) + connect_to_master if !connected? && @auto_reconnect message = "" + chunk = "" while message.length < length do - chunk = @socket.recv(length - message.length) - raise "connection closed" unless chunk.length > 0 - message += chunk + begin + chunk = @socket.read(length - message.length) + raise ConnectionFailure, "connection closed" unless chunk && chunk.length > 0 + message += chunk + rescue Timeout => ex + raise OperationFailure, "Database command timed out." + end end message end diff --git a/lib/mongo/errors.rb b/lib/mongo/errors.rb index 78a5502..326f9d2 100644 --- a/lib/mongo/errors.rb +++ b/lib/mongo/errors.rb @@ -29,6 +29,9 @@ module Mongo # Raised when a database operation fails. class OperationFailure < MongoDBError; end + + # Raised when a database operation fails. + class ConnectionFailure < MongoDBError; end # Raised when a client attempts to perform an invalid operation. class InvalidOperation < MongoDBError; end diff --git a/test/replica/pair_test.rb b/test/replica/pair_test.rb new file mode 100644 index 0000000..e8d30c1 --- /dev/null +++ b/test/replica/pair_test.rb @@ -0,0 +1,36 @@ +$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') +require 'mongo' +require 'test/unit' + +# NOTE: this test should be run only if +class ReplicaPairTest < Test::Unit::TestCase + include Mongo + + def setup + @conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil, :auto_reconnect => true) + @db = @conn.db('mongo-ruby-test') + @db.drop_collection("test-pairs") + @coll = @db.collection("test-pairs") + end + + def test_insert + @coll.save({:a => 20}, :safe => true) + puts "Please disconnect the current master. Test will resume in 15 seconds." + sleep(15) + @coll.save({:a => 30}, :safe => true) + @coll.save({:a => 40}, :safe => true) + @coll.save({:a => 50}, :safe => true) + @coll.save({:a => 60}, :safe => true) + @coll.save({:a => 70}, :safe => true) + puts "Please reconnect the old master. Test will resume in 15 seconds." + sleep(15) + results = [] + # Note: need to figure out what to do here. + @coll.find.each {|r| r} + @coll.find.each {|r| results << r} + [20, 30, 40, 50, 60, 70].each do |a| + assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}" + end + end + +end diff --git a/test/replica/query_test.rb b/test/replica/query_test.rb new file mode 100644 index 0000000..e07c6f5 --- /dev/null +++ b/test/replica/query_test.rb @@ -0,0 +1,33 @@ +$LOAD_PATH[0,0] = File.join(File.dirname(__FILE__), '..', 'lib') +require 'mongo' +require 'test/unit' + +# NOTE: this test should be run only if +class ReplicaPairTest < Test::Unit::TestCase + include Mongo + + def setup + @conn = Mongo::Connection.new({:left => ["localhost", 27017], :right => ["localhost", 27018]}, nil, :auto_reconnect => true) + @db = @conn.db('mongo-ruby-test') + @db.drop_collection("test-pairs") + @coll = @db.collection("test-pairs") + end + + def test_query + @coll.save({:a => 20}) + @coll.save({:a => 30}) + @coll.save({:a => 40}) + results = [] + @coll.find.each {|r| p results << r} + [20, 30, 40].each do |a| + assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}" + end + puts "Please disconnect the current master. Test will resume in 15 seconds." + sleep(15) + @coll.find.each {|r| p results << r} + [20, 30, 40].each do |a| + assert results.any? {|r| r['a'] == a}, "Could not find record for a => #{a}" + end + end + +end