From 5b1e09a6ce9f4291a215ddc1dea2bdfdb7d32ad0 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Sat, 7 Jan 2012 08:19:38 +0900 Subject: [PATCH] Add :collect_on_error option to collect invalid documents in bulk-insert --- lib/mongo/collection.rb | 38 ++++++++++++++++++++++++++++++++++---- test/collection_test.rb | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/lib/mongo/collection.rb b/lib/mongo/collection.rb index c65fcee..1a9c632 100644 --- a/lib/mongo/collection.rb +++ b/lib/mongo/collection.rb @@ -321,6 +321,10 @@ module Mongo # # @return [ObjectId, Array] # The _id of the inserted document or a list of _ids of all inserted documents. + # @return [[ObjectId, Array], [Hash, Array]] + # 1st, the _id of the inserted document or a list of _ids of all inserted documents. + # 2nd, a list of invalid documents as a BSON. + # Return this result format when :collect_on_error is true. # # @option opts [Boolean, Hash] :safe (+false+) # run the operation in safe mode, which run a getlasterror command on the @@ -338,6 +342,8 @@ module Mongo # if some are rejected on error. When safe mode is # enabled, any error will raise an OperationFailure exception. # MongoDB v2.0+. + # @option opts [Boolean] :collect_on_error (+false+) if true, then + # collects invalid documents as a BSON. This option changes result format. # # @core insert insert-instance_method def insert(doc_or_docs, opts={}) @@ -944,10 +950,28 @@ module Mongo else message = BSON::ByteBuffer.new("\0\0\0\0") end + + collect_on_error = !!flags[:collect_on_error] + error_docs = [] if collect_on_error + BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{collection_name}") - documents.each do |doc| - message.put_binary(BSON::BSON_CODER.serialize(doc, check_keys, true, @connection.max_bson_size).to_s) - end + documents = + if collect_on_error + documents.select do |doc| + begin + message.put_binary(BSON::BSON_CODER.serialize(doc, check_keys, true, @connection.max_bson_size).to_s) + true + rescue StandardError => e # StandardError will be replaced with BSONError + doc.delete(:_id) + error_docs << doc + false + end + end + else + documents.each do |doc| + message.put_binary(BSON::BSON_CODER.serialize(doc, check_keys, true, @connection.max_bson_size).to_s) + end + end raise InvalidOperation, "Exceded maximum insert size of 16,000,000 bytes" if message.size > 16_000_000 instrument(:insert, :database => @db.name, :collection => collection_name, :documents => documents) do @@ -957,7 +981,13 @@ module Mongo @connection.send_message(Mongo::Constants::OP_INSERT, message) end end - documents.collect { |o| o[:_id] || o['_id'] } + + doc_ids = documents.collect { |o| o[:_id] || o['_id'] } + if collect_on_error + return doc_ids, error_docs + else + doc_ids + end end def generate_index_name(spec) diff --git a/test/collection_test.rb b/test/collection_test.rb index ed90ca7..948131d 100644 --- a/test/collection_test.rb +++ b/test/collection_test.rb @@ -193,6 +193,41 @@ class TestCollection < Test::Unit::TestCase end end + def test_bson_invalid_key_serialize_error_with_collect_on_error + docs = [] + docs << {:foo => 1} + docs << {:bar => 1} + invalid_docs = [] + invalid_docs << {'$invalid-key' => 1} + invalid_docs << {'invalid.key' => 1} + docs += invalid_docs + assert_raise BSON::InvalidKeyName do + @@test.insert(docs, :collect_on_error => false) + end + assert_equal 0, @@test.count + + doc_ids, error_docs = @@test.insert(docs, :collect_on_error => true) + assert_equal 2, @@test.count + assert_equal error_docs, invalid_docs + end + + def test_bson_invalid_encoding_serialize_error_with_collect_on_error + docs = [] + docs << {:foo => 1} + docs << {:bar => 1} + invalid_docs = [] + invalid_docs << {"\223\372\226{" => 1} # non utf8 encoding + docs += invalid_docs + assert_raise BSON::InvalidStringEncoding do + @@test.insert(docs, :collect_on_error => false) + end + assert_equal 0, @@test.count + + doc_ids, error_docs = @@test.insert(docs, :collect_on_error => true) + assert_equal 2, @@test.count + assert_equal error_docs, invalid_docs + end + def test_maximum_insert_size docs = [] 16.times do