Add :collect_on_error option to collect invalid documents in bulk-insert
This commit is contained in:
parent
881c73ac82
commit
5b1e09a6ce
@ -321,6 +321,10 @@ module Mongo
|
|||||||
#
|
#
|
||||||
# @return [ObjectId, Array]
|
# @return [ObjectId, Array]
|
||||||
# The _id of the inserted document or a list of _ids of all inserted documents.
|
# The _id of the inserted document or a list of _ids of all inserted documents.
|
||||||
|
# @return [[ObjectId, Array], [Hash, Array]]
|
||||||
|
# 1st, the _id of the inserted document or a list of _ids of all inserted documents.
|
||||||
|
# 2nd, a list of invalid documents as a BSON.
|
||||||
|
# Return this result format when :collect_on_error is true.
|
||||||
#
|
#
|
||||||
# @option opts [Boolean, Hash] :safe (+false+)
|
# @option opts [Boolean, Hash] :safe (+false+)
|
||||||
# run the operation in safe mode, which run a getlasterror command on the
|
# run the operation in safe mode, which run a getlasterror command on the
|
||||||
@ -338,6 +342,8 @@ module Mongo
|
|||||||
# if some are rejected on error. When safe mode is
|
# if some are rejected on error. When safe mode is
|
||||||
# enabled, any error will raise an OperationFailure exception.
|
# enabled, any error will raise an OperationFailure exception.
|
||||||
# MongoDB v2.0+.
|
# MongoDB v2.0+.
|
||||||
|
# @option opts [Boolean] :collect_on_error (+false+) if true, then
|
||||||
|
# collects invalid documents as a BSON. This option changes result format.
|
||||||
#
|
#
|
||||||
# @core insert insert-instance_method
|
# @core insert insert-instance_method
|
||||||
def insert(doc_or_docs, opts={})
|
def insert(doc_or_docs, opts={})
|
||||||
@ -944,10 +950,28 @@ module Mongo
|
|||||||
else
|
else
|
||||||
message = BSON::ByteBuffer.new("\0\0\0\0")
|
message = BSON::ByteBuffer.new("\0\0\0\0")
|
||||||
end
|
end
|
||||||
|
|
||||||
|
collect_on_error = !!flags[:collect_on_error]
|
||||||
|
error_docs = [] if collect_on_error
|
||||||
|
|
||||||
BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{collection_name}")
|
BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{collection_name}")
|
||||||
documents.each do |doc|
|
documents =
|
||||||
message.put_binary(BSON::BSON_CODER.serialize(doc, check_keys, true, @connection.max_bson_size).to_s)
|
if collect_on_error
|
||||||
end
|
documents.select do |doc|
|
||||||
|
begin
|
||||||
|
message.put_binary(BSON::BSON_CODER.serialize(doc, check_keys, true, @connection.max_bson_size).to_s)
|
||||||
|
true
|
||||||
|
rescue StandardError => e # StandardError will be replaced with BSONError
|
||||||
|
doc.delete(:_id)
|
||||||
|
error_docs << doc
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
else
|
||||||
|
documents.each do |doc|
|
||||||
|
message.put_binary(BSON::BSON_CODER.serialize(doc, check_keys, true, @connection.max_bson_size).to_s)
|
||||||
|
end
|
||||||
|
end
|
||||||
raise InvalidOperation, "Exceded maximum insert size of 16,000,000 bytes" if message.size > 16_000_000
|
raise InvalidOperation, "Exceded maximum insert size of 16,000,000 bytes" if message.size > 16_000_000
|
||||||
|
|
||||||
instrument(:insert, :database => @db.name, :collection => collection_name, :documents => documents) do
|
instrument(:insert, :database => @db.name, :collection => collection_name, :documents => documents) do
|
||||||
@ -957,7 +981,13 @@ module Mongo
|
|||||||
@connection.send_message(Mongo::Constants::OP_INSERT, message)
|
@connection.send_message(Mongo::Constants::OP_INSERT, message)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
documents.collect { |o| o[:_id] || o['_id'] }
|
|
||||||
|
doc_ids = documents.collect { |o| o[:_id] || o['_id'] }
|
||||||
|
if collect_on_error
|
||||||
|
return doc_ids, error_docs
|
||||||
|
else
|
||||||
|
doc_ids
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def generate_index_name(spec)
|
def generate_index_name(spec)
|
||||||
|
@ -193,6 +193,41 @@ class TestCollection < Test::Unit::TestCase
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_bson_invalid_key_serialize_error_with_collect_on_error
|
||||||
|
docs = []
|
||||||
|
docs << {:foo => 1}
|
||||||
|
docs << {:bar => 1}
|
||||||
|
invalid_docs = []
|
||||||
|
invalid_docs << {'$invalid-key' => 1}
|
||||||
|
invalid_docs << {'invalid.key' => 1}
|
||||||
|
docs += invalid_docs
|
||||||
|
assert_raise BSON::InvalidKeyName do
|
||||||
|
@@test.insert(docs, :collect_on_error => false)
|
||||||
|
end
|
||||||
|
assert_equal 0, @@test.count
|
||||||
|
|
||||||
|
doc_ids, error_docs = @@test.insert(docs, :collect_on_error => true)
|
||||||
|
assert_equal 2, @@test.count
|
||||||
|
assert_equal error_docs, invalid_docs
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_bson_invalid_encoding_serialize_error_with_collect_on_error
|
||||||
|
docs = []
|
||||||
|
docs << {:foo => 1}
|
||||||
|
docs << {:bar => 1}
|
||||||
|
invalid_docs = []
|
||||||
|
invalid_docs << {"\223\372\226{" => 1} # non utf8 encoding
|
||||||
|
docs += invalid_docs
|
||||||
|
assert_raise BSON::InvalidStringEncoding do
|
||||||
|
@@test.insert(docs, :collect_on_error => false)
|
||||||
|
end
|
||||||
|
assert_equal 0, @@test.count
|
||||||
|
|
||||||
|
doc_ids, error_docs = @@test.insert(docs, :collect_on_error => true)
|
||||||
|
assert_equal 2, @@test.count
|
||||||
|
assert_equal error_docs, invalid_docs
|
||||||
|
end
|
||||||
|
|
||||||
def test_maximum_insert_size
|
def test_maximum_insert_size
|
||||||
docs = []
|
docs = []
|
||||||
16.times do
|
16.times do
|
||||||
|
Loading…
Reference in New Issue
Block a user