From 8c8f92b52c12e8b2108b7760bb9c2180a3d49f3d Mon Sep 17 00:00:00 2001 From: Shizuo Fujita Date: Fri, 24 Apr 2026 11:43:32 +0900 Subject: [PATCH 1/3] out_mongo: Add `ordered` parameter This commit adds the `ordered` configuration parameter to the `out_mongo`plugin (default: true). By setting `ordered false`, the plugin passes the `ordered: false` option to the MongoDB driver's `insert_many` method. This allows the MongoDB server to continue processing remaining documents in a bulk insert operation even if some documents fail, which improves throughput. Signed-off-by: Shizuo Fujita --- README.rdoc | 3 +++ fluent-plugin-mongo.gemspec | 1 + lib/fluent/plugin/out_mongo.rb | 4 +++- test/helper.rb | 2 ++ test/plugin/test_out_mongo.rb | 35 ++++++++++++++++++++++++++++++++++ 5 files changed, 44 insertions(+), 1 deletion(-) diff --git a/README.rdoc b/README.rdoc index 609c96f..3c800e9 100644 --- a/README.rdoc +++ b/README.rdoc @@ -74,6 +74,9 @@ Use _mongo_ type in match. # eg: my_id: "507f1f77bcf86cd799439011" object_id_keys my_id + # Specify whether the operations should be executed in order + ordered false + # Other buffer configurations here diff --git a/fluent-plugin-mongo.gemspec b/fluent-plugin-mongo.gemspec index c14382d..988823b 100644 --- a/fluent-plugin-mongo.gemspec +++ b/fluent-plugin-mongo.gemspec @@ -23,6 +23,7 @@ Gem::Specification.new do |gem| gem.add_development_dependency "simplecov", ">= 0.5.4" gem.add_development_dependency "rr", ">= 1.0.0" gem.add_development_dependency "test-unit", ">= 3.0.0" + gem.add_development_dependency "test-unit-rr", ">= 1.0.0" gem.add_development_dependency "timecop", "~> 0.9.4" gem.add_development_dependency "webrick", ">= 1.7.0" end diff --git a/lib/fluent/plugin/out_mongo.rb b/lib/fluent/plugin/out_mongo.rb index e2b6e47..292bbaa 100644 --- a/lib/fluent/plugin/out_mongo.rb +++ b/lib/fluent/plugin/out_mongo.rb @@ -65,6 +65,8 @@ class MongoOutput < Output config_param :ssl_verify, :bool, default: false config_param :ssl_ca_cert, :string, default: nil + desc "Whether the operations should be executed in order" + config_param :ordered, :bool, default: true config_section :buffer do config_set_default :@type, DEFAULT_BUFFER_TYPE @@ -355,7 +357,7 @@ def operate(database, collection, records) replace_value_of_hash(r) end - get_collection(database, collection, @collection_options).insert_many(records) + get_collection(database, collection, @collection_options).insert_many(records, ordered: @ordered) rescue Mongo::Error::BulkWriteError => e log.warn "#{records.size - e.result["n_inserted"]} documents are not inserted. Maybe these documents are invalid as a BSON." forget_collection(collection) diff --git a/test/helper.rb b/test/helper.rb index f170e8c..0fd346a 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -4,3 +4,5 @@ require 'fluent/plugin/out_mongo' require 'fluent/plugin/out_mongo_replset' require 'fluent/plugin/in_mongo_tail' +require 'rr' +require 'test/unit/rr' diff --git a/test/plugin/test_out_mongo.rb b/test/plugin/test_out_mongo.rb index b4c63a0..e59f8dd 100644 --- a/test/plugin/test_out_mongo.rb +++ b/test/plugin/test_out_mongo.rb @@ -67,6 +67,23 @@ def test_configure assert_equal({capped: true, size: 100}, d.instance.collection_options) assert_equal({ssl: false, write: {j: false}}, d.instance.client_options) assert_nil d.instance.connection_string + assert_true d.instance.ordered + end + + def test_configure_with_disabled_ordered + d = create_driver(%[ + @type mongo + database fluent_test + collection test_collection + + ordered false + ]) + + assert_equal('fluent_test', d.instance.database) + assert_equal('test_collection', d.instance.collection) + assert_equal('localhost', d.instance.host) + assert_equal(port, d.instance.port) + assert_false d.instance.ordered end def test_configure_with_connection_string @@ -247,6 +264,24 @@ def test_write_with_expire_index assert_equal({"expireAfterSeconds"=>120.0}, expire_after_hash) end + def test_write_with_disabled_ordered + d = create_driver(%[ + @type mongo + connection_string mongodb://localhost:#{port}/#{database_name} + collection #{collection_name} + ordered false + ]) + + mock_collection = Object.new + # Check the expected value is given as ordered parameter + mock(mock_collection).insert_many(anything, ordered: false) + stub(d.instance).get_collection { mock_collection } + + d.run(default_tag: 'test') do + emit_documents(d) + end + end + def test_overflow_integer_value d = create_driver d.run(default_tag: 'test') do From 984fe8f2fecd349c5aeadc90daac3fe1c9683be1 Mon Sep 17 00:00:00 2001 From: Shizuo Fujita Date: Fri, 24 Apr 2026 15:30:18 +0900 Subject: [PATCH 2/3] out_mongo: Update desc for ordered parameter Signed-off-by: Shizuo Fujita --- lib/fluent/plugin/out_mongo.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/out_mongo.rb b/lib/fluent/plugin/out_mongo.rb index 292bbaa..059c155 100644 --- a/lib/fluent/plugin/out_mongo.rb +++ b/lib/fluent/plugin/out_mongo.rb @@ -65,7 +65,7 @@ class MongoOutput < Output config_param :ssl_verify, :bool, default: false config_param :ssl_ca_cert, :string, default: nil - desc "Whether the operations should be executed in order" + desc "Whether the operations should be executed in order. If false, continues to insert remaining documents even if some inserts fail." config_param :ordered, :bool, default: true config_section :buffer do From 867752e1fbdfa36621745405718281726cd89e99 Mon Sep 17 00:00:00 2001 From: Shizuo Fujita Date: Fri, 24 Apr 2026 15:32:32 +0900 Subject: [PATCH 3/3] README: update explanation Signed-off-by: Shizuo Fujita --- README.rdoc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.rdoc b/README.rdoc index 3c800e9..ef30c4d 100644 --- a/README.rdoc +++ b/README.rdoc @@ -74,7 +74,8 @@ Use _mongo_ type in match. # eg: my_id: "507f1f77bcf86cd799439011" object_id_keys my_id - # Specify whether the operations should be executed in order + # Specify whether the operations should be executed in order (default: true). + # If false, continues to insert remaining documents even if some inserts fail. ordered false # Other buffer configurations here