diff --git a/README.rdoc b/README.rdoc index 609c96f..ef30c4d 100644 --- a/README.rdoc +++ b/README.rdoc @@ -74,6 +74,10 @@ Use _mongo_ type in match. # eg: my_id: "507f1f77bcf86cd799439011" object_id_keys my_id + # Specify whether the operations should be executed in order (default: true). + # If false, continues to insert remaining documents even if some inserts fail. + ordered false + # Other buffer configurations here diff --git a/fluent-plugin-mongo.gemspec b/fluent-plugin-mongo.gemspec index c14382d..988823b 100644 --- a/fluent-plugin-mongo.gemspec +++ b/fluent-plugin-mongo.gemspec @@ -23,6 +23,7 @@ Gem::Specification.new do |gem| gem.add_development_dependency "simplecov", ">= 0.5.4" gem.add_development_dependency "rr", ">= 1.0.0" gem.add_development_dependency "test-unit", ">= 3.0.0" + gem.add_development_dependency "test-unit-rr", ">= 1.0.0" gem.add_development_dependency "timecop", "~> 0.9.4" gem.add_development_dependency "webrick", ">= 1.7.0" end diff --git a/lib/fluent/plugin/out_mongo.rb b/lib/fluent/plugin/out_mongo.rb index e2b6e47..059c155 100644 --- a/lib/fluent/plugin/out_mongo.rb +++ b/lib/fluent/plugin/out_mongo.rb @@ -65,6 +65,8 @@ class MongoOutput < Output config_param :ssl_verify, :bool, default: false config_param :ssl_ca_cert, :string, default: nil + desc "Whether the operations should be executed in order. If false, continues to insert remaining documents even if some inserts fail." + config_param :ordered, :bool, default: true config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE @@ -355,7 +357,7 @@ def operate(database, collection, records) replace_value_of_hash(r) end - get_collection(database, collection, @collection_options).insert_many(records) + get_collection(database, collection, @collection_options).insert_many(records, ordered: @ordered) rescue Mongo::Error::BulkWriteError => e log.warn "#{records.size - e.result["n_inserted"]} documents are not inserted. Maybe these documents are invalid as a BSON." forget_collection(collection) diff --git a/test/helper.rb b/test/helper.rb index f170e8c..0fd346a 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -4,3 +4,5 @@ require 'fluent/plugin/out_mongo' require 'fluent/plugin/out_mongo_replset' require 'fluent/plugin/in_mongo_tail' +require 'rr' +require 'test/unit/rr' diff --git a/test/plugin/test_out_mongo.rb b/test/plugin/test_out_mongo.rb index b4c63a0..e59f8dd 100644 --- a/test/plugin/test_out_mongo.rb +++ b/test/plugin/test_out_mongo.rb @@ -67,6 +67,23 @@ def test_configure assert_equal({capped: true, size: 100}, d.instance.collection_options) assert_equal({ssl: false, write: {j: false}}, d.instance.client_options) assert_nil d.instance.connection_string + assert_true d.instance.ordered + end + + def test_configure_with_disabled_ordered + d = create_driver(%[ + @type mongo + database fluent_test + collection test_collection + + ordered false + ]) + + assert_equal('fluent_test', d.instance.database) + assert_equal('test_collection', d.instance.collection) + assert_equal('localhost', d.instance.host) + assert_equal(port, d.instance.port) + assert_false d.instance.ordered end def test_configure_with_connection_string @@ -247,6 +264,24 @@ def test_write_with_expire_index assert_equal({"expireAfterSeconds"=>120.0}, expire_after_hash) end + def test_write_with_disabled_ordered + d = create_driver(%[ + @type mongo + connection_string mongodb://localhost:#{port}/#{database_name} + collection #{collection_name} + ordered false + ]) + + mock_collection = Object.new + # Check the expected value is given as ordered parameter + mock(mock_collection).insert_many(anything, ordered: false) + stub(d.instance).get_collection { mock_collection } + + d.run(default_tag: 'test') do + emit_documents(d) + end + end + def test_overflow_integer_value d = create_driver d.run(default_tag: 'test') do