Replication acknowledgment RUBY-126

This commit is contained in:
Kyle Banker 2010-05-04 16:00:05 -04:00
parent 797fada848
commit 065d97ca1c
6 changed files with 119 additions and 26 deletions

View File

@ -203,17 +203,22 @@ module Mongo
#
# @return [ObjectID] the _id of the saved document.
#
# @option opts [Boolean] :safe (+false+)
# If true, check that the save succeeded. OperationFailure
# will be raised on an error. Note that a safe check requires an extra
# round-trip to the database.
def save(doc, options={})
# @option opts [Boolean, Hash] :safe (+false+)
# run the operation in safe mode, which run a getlasterror command on the
# database to report any assertion. In addition, a hash can be provided to
# run an fsync and/or wait for replication of the save (>= 1.5.1). See the options
# for DB#error.
#
# @raises [OperationFailure] when :safe mode fails.
#
# @see DB#remove for options that can be passed to :safe.
def save(doc, opts={})
if doc.has_key?(:_id) || doc.has_key?('_id')
id = doc[:_id] || doc['_id']
update({:_id => id}, doc, :upsert => true, :safe => options.delete(:safe))
update({:_id => id}, doc, :upsert => true, :safe => opts[:safe])
id
else
insert(doc, :safe => options.delete(:safe))
insert(doc, :safe => opts[:safe])
end
end
@ -226,10 +231,13 @@ module Mongo
# the _id of the inserted document or a list of _ids of all inserted documents.
# Note: the object may have been modified by the database's PK factory, if it has one.
#
# @option opts [Boolean] :safe (+false+)
# If true, check that the save succeeded. OperationFailure
# will be raised on an error. Note that a safe check requires an extra
# round-trip to the database.
# @option opts [Boolean, Hash] :safe (+false+)
# run the operation in safe mode, which run a getlasterror command on the
# database to report any assertion. In addition, a hash can be provided to
# run an fsync and/or wait for replication of the insert (>= 1.5.1). See the options
# for DB#error.
#
# @see DB#remove for options that can be passed to :safe.
#
# @core insert insert-instance_method
def insert(doc_or_docs, options={})
@ -245,8 +253,11 @@ module Mongo
# @param [Hash] selector
# If specified, only matching documents will be removed.
#
# @option opts [Boolean] :safe [false] run the operation in safe mode, which
# will call :getlasterror on the database and report any assertions.
# @option opts [Boolean, Hash] :safe (+false+)
# run the operation in safe mode, which run a getlasterror command on the
# database to report any assertion. In addition, a hash can be provided to
# run an fsync and/or wait for replication of the remove (>= 1.5.1). See the options
# for DB#error.
#
# @example remove all documents from the 'users' collection:
# users.remove
@ -260,6 +271,8 @@ module Mongo
# @raise [Mongo::OperationFailure] an exception will be raised iff safe mode is enabled
# and the operation fails.
#
# @see DB#remove for options that can be passed to :safe.
#
# @core remove remove-instance_method
def remove(selector={}, opts={})
# Initial byte is 0.
@ -270,7 +283,7 @@ module Mongo
if opts[:safe]
@connection.send_message_with_safe_check(Mongo::Constants::OP_DELETE, message, @db.name,
"#{@db.name}['#{@name}'].remove(#{selector.inspect})")
"#{@db.name}['#{@name}'].remove(#{selector.inspect})", opts[:safe])
# the return value of send_message_with_safe_check isn't actually meaningful --
# only the fact that it didn't raise an error is -- so just return true
true
@ -312,7 +325,7 @@ module Mongo
message.put_array(BSON::BSON_CODER.serialize(document, false, true).to_a)
if options[:safe]
@connection.send_message_with_safe_check(Mongo::Constants::OP_UPDATE, message, @db.name,
"#{@db.name}['#{@name}'].update(#{selector.inspect}, #{document.inspect})")
"#{@db.name}['#{@name}'].update(#{selector.inspect}, #{document.inspect})", options[:safe])
else
@connection.send_message(Mongo::Constants::OP_UPDATE, message,
"#{@db.name}['#{@name}'].update(#{selector.inspect}, #{document.inspect})")
@ -657,7 +670,7 @@ module Mongo
documents.each { |doc| message.put_array(BSON::BSON_CODER.serialize(doc, check_keys, true).to_a) }
if safe
@connection.send_message_with_safe_check(Mongo::Constants::OP_INSERT, message, @db.name,
"#{@db.name}['#{collection_name}'].insert(#{documents.inspect})")
"#{@db.name}['#{collection_name}'].insert(#{documents.inspect})", safe)
else
@connection.send_message(Mongo::Constants::OP_INSERT, message,
"#{@db.name}['#{collection_name}'].insert(#{documents.inspect})")

View File

@ -347,13 +347,17 @@ module Mongo
# @param [BSON::ByteBuffer] message a message to send to the database.
# @param [String] db_name the name of the database. used on call to get_last_error.
# @param [String] log_message text version of +message+ for logging.
# @param [Hash] last_error_params parameters to be sent to getLastError. See DB#error for
# available options.
#
# @see DB#error for valid last error params.
#
# @return [Array]
# An array whose indexes include [0] documents returned, [1] number of document received,
# and [3] a cursor_id.
def send_message_with_safe_check(operation, message, db_name, log_message=nil)
def send_message_with_safe_check(operation, message, db_name, log_message=nil, last_error_params=false)
message_with_headers = add_message_headers(operation, message)
message_with_check = last_error_message(db_name)
message_with_check = last_error_message(db_name, last_error_params)
@logger.debug(" MONGODB #{log_message || message}") if @logger
begin
sock = checkout
@ -366,7 +370,7 @@ module Mongo
ensure
checkin(sock)
end
if num_received == 1 && error = docs[0]['err']
if num_received == 1 && (error = docs[0]['err'] || docs[0]['errmsg'])
raise Mongo::OperationFailure, error
end
[docs, num_received, cursor_id]
@ -644,13 +648,21 @@ module Mongo
[docs, number_received, cursor_id]
end
def last_error_message(db_name)
# Constructs a getlasterror message. This method is used exclusively by
# Connection#send_message_with_safe_check.
def last_error_message(db_name, opts)
message = BSON::ByteBuffer.new
message.put_int(0)
BSON::BSON_RUBY.serialize_cstr(message, "#{db_name}.$cmd")
message.put_int(0)
message.put_int(-1)
message.put_array(BSON::BSON_CODER.serialize({:getlasterror => 1}, false).unpack("C*"))
cmd = OrderedHash.new
cmd[:getlasterror] = 1
if opts.is_a?(Hash)
opts.assert_valid_keys(:w, :wtimeout, :fsync)
cmd.merge!(opts)
end
message.put_array(BSON::BSON_CODER.serialize(cmd, false).unpack("C*"))
add_message_headers(Mongo::Constants::OP_QUERY, message)
end

View File

@ -247,11 +247,19 @@ module Mongo
# Get the error message from the most recently executed database
# operation for this connection.
#
# @return [String, Nil] either the text describing the error or nil if no
# @option opts [Boolean] :fsync (false)
# @option opts [Integer] :w (nil)
# @option opts [Integer] :wtimeout (nil)
#
# @return [String, Nil] either the text describing an error or nil if no
# error has occurred.
def error
doc = command(:getlasterror => 1)
raise MongoDBError, "error retrieving last error: #{doc}" unless ok?(doc)
def error(opts={})
opts.assert_valid_keys(:w, :wtimeout, :fsync)
cmd = OrderedHash.new
cmd[:getlasterror] = 1
cmd.merge!(opts) unless opts.empty?
doc = command(cmd)
raise MongoDBError, "error retrieving last error: #{doc.inspect}" unless ok?(doc)
doc['err']
end

View File

@ -115,6 +115,39 @@ class TestCollection < Test::Unit::TestCase
end
end
if @@version >= "1.5.1"
def test_safe_mode_with_advanced_safe_with_invalid_options
assert_raise_error ArgumentError, "Unknown key(s): wtime" do
@@test.insert({:foo => 1}, :safe => {:w => 2, :wtime => 1, :fsync => true})
end
assert_raise_error ArgumentError, "Unknown key(s): wtime" do
@@test.update({:foo => 1}, {:foo => 2}, :safe => {:w => 2, :wtime => 1, :fsync => true})
end
assert_raise_error ArgumentError, "Unknown key(s): wtime" do
@@test.remove({:foo => 2}, :safe => {:w => 2, :wtime => 1, :fsync => true})
end
end
def test_safe_mode_with_w_failure
assert_raise_error OperationFailure, "timed out waiting for slaves" do
@@test.insert({:foo => 1}, :safe => {:w => 2, :wtimeout => 1, :fsync => true})
end
assert_raise_error OperationFailure, "timed out waiting for slaves" do
@@test.update({:foo => 1}, {:foo => 2}, :safe => {:w => 2, :wtimeout => 1, :fsync => true})
end
assert_raise_error OperationFailure, "timed out waiting for slaves" do
@@test.remove({:foo => 2}, :safe => {:w => 2, :wtimeout => 1, :fsync => true})
end
end
def test_safe_mode_with_write_and_fsync
assert @@test.insert({:foo => 1}, :safe => {:w => 1, :wtimeout => 1, :fsync => true})
assert @@test.update({:foo => 1}, {:foo => 2}, :safe => {:w => 1, :wtimeout => 1, :fsync => true})
assert @@test.remove({:foo => 2}, :safe => {:w => 1, :wtimeout => 1, :fsync => true})
end
end
def test_update
id1 = @@test.save("x" => 5)
@@test.update({}, {"$inc" => {"x" => 1}})
@ -272,7 +305,7 @@ class TestCollection < Test::Unit::TestCase
end
end
if @@version <= "1.5.1"
if @@version >= "1.5.1"
def test_fields_with_slice
@@test.save({:foo => [1, 2, 3, 4, 5, 6], :test => 'slice'})

View File

@ -183,6 +183,24 @@ class DBTest < Test::Unit::TestCase
assert_nil @@db.previous_error
end
if @@version >= "1.5.1"
def test_failing_error_params
assert_raise_error Mongo::MongoDBError, "timed out waiting for slaves" do
@@db.error(:w => 2, :wtimeout => 10, :fsync => true)
end
end
def test_passing_error_params
assert_nil @@db.error(:w => 1, :wtimeout => 10, :fsync => true)
end
def test_invalid_error_params
assert_raise_error ArgumentError, "Unknown key(s): z" do
@@db.error(:z => 1, :wtimeout => 10, :fsync => true)
end
end
end
def test_check_command_response
command = {:forceerror => 1}
assert_raise OperationFailure do

View File

@ -42,4 +42,13 @@ class Test::Unit::TestCase
end
end
end
def assert_raise_error(klass, message)
begin
yield
rescue => e
assert_equal klass, e.class
assert e.message.include?(message), "#{e.message} does not include #{message}."
end
end
end