Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.rdoc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ Use _mongo_ type in match.
# eg: my_id: "507f1f77bcf86cd799439011"
object_id_keys my_id

# Specify whether the operations should be executed in order (default: true).
# If false, continues to insert remaining documents even if some inserts fail.
ordered false

# Other buffer configurations here
</match>

Expand Down
1 change: 1 addition & 0 deletions fluent-plugin-mongo.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Gem::Specification.new do |gem|
gem.add_development_dependency "simplecov", ">= 0.5.4"
gem.add_development_dependency "rr", ">= 1.0.0"
gem.add_development_dependency "test-unit", ">= 3.0.0"
gem.add_development_dependency "test-unit-rr", ">= 1.0.0"
gem.add_development_dependency "timecop", "~> 0.9.4"
gem.add_development_dependency "webrick", ">= 1.7.0"
end
4 changes: 3 additions & 1 deletion lib/fluent/plugin/out_mongo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class MongoOutput < Output
config_param :ssl_verify, :bool, default: false
config_param :ssl_ca_cert, :string, default: nil

desc "Whether the operations should be executed in order. If false, continues to insert remaining documents even if some inserts fail."
config_param :ordered, :bool, default: true

config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
Expand Down Expand Up @@ -355,7 +357,7 @@ def operate(database, collection, records)
replace_value_of_hash(r)
end

get_collection(database, collection, @collection_options).insert_many(records)
get_collection(database, collection, @collection_options).insert_many(records, ordered: @ordered)
rescue Mongo::Error::BulkWriteError => e
log.warn "#{records.size - e.result["n_inserted"]} documents are not inserted. Maybe these documents are invalid as a BSON."
forget_collection(collection)
Expand Down
2 changes: 2 additions & 0 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
require 'fluent/plugin/out_mongo'
require 'fluent/plugin/out_mongo_replset'
require 'fluent/plugin/in_mongo_tail'
require 'rr'
require 'test/unit/rr'
35 changes: 35 additions & 0 deletions test/plugin/test_out_mongo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@ def test_configure
assert_equal({capped: true, size: 100}, d.instance.collection_options)
assert_equal({ssl: false, write: {j: false}}, d.instance.client_options)
assert_nil d.instance.connection_string
assert_true d.instance.ordered
end

def test_configure_with_disabled_ordered
d = create_driver(%[
@type mongo
database fluent_test
collection test_collection

ordered false
])

assert_equal('fluent_test', d.instance.database)
assert_equal('test_collection', d.instance.collection)
assert_equal('localhost', d.instance.host)
assert_equal(port, d.instance.port)
assert_false d.instance.ordered
end

def test_configure_with_connection_string
Expand Down Expand Up @@ -247,6 +264,24 @@ def test_write_with_expire_index
assert_equal({"expireAfterSeconds"=>120.0}, expire_after_hash)
end

def test_write_with_disabled_ordered
d = create_driver(%[
@type mongo
connection_string mongodb://localhost:#{port}/#{database_name}
collection #{collection_name}
ordered false
])

mock_collection = Object.new
# Check the expected value is given as ordered parameter
mock(mock_collection).insert_many(anything, ordered: false)
stub(d.instance).get_collection { mock_collection }

d.run(default_tag: 'test') do
emit_documents(d)
end
end

def test_overflow_integer_value
d = create_driver
d.run(default_tag: 'test') do
Expand Down