|
7 | 7 |
|
8 | 8 | import com.google.common.base.Preconditions; |
9 | 9 | import org.opensearch.dataprepper.acknowledgements.InactiveAcknowledgementSetManager; |
| 10 | +import org.opensearch.dataprepper.model.CheckpointState; |
10 | 11 | import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; |
11 | 12 | import org.opensearch.dataprepper.model.buffer.Buffer; |
| 13 | +import org.opensearch.dataprepper.model.event.DefaultEventHandle; |
| 14 | +import org.opensearch.dataprepper.model.event.Event; |
12 | 15 | import org.opensearch.dataprepper.model.event.EventFactory; |
| 16 | +import org.opensearch.dataprepper.model.event.EventHandle; |
| 17 | +import org.opensearch.dataprepper.model.event.InternalEventHandle; |
13 | 18 | import org.opensearch.dataprepper.model.processor.Processor; |
14 | 19 | import org.opensearch.dataprepper.model.record.Record; |
15 | 20 | import org.opensearch.dataprepper.model.sink.Sink; |
|
21 | 26 | import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; |
22 | 27 | import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination; |
23 | 28 | import org.opensearch.dataprepper.parser.DataFlowComponent; |
| 29 | +import org.opensearch.dataprepper.pipeline.common.FutureHelper; |
| 30 | +import org.opensearch.dataprepper.pipeline.common.FutureHelperResult; |
24 | 31 | import org.opensearch.dataprepper.pipeline.common.PipelineThreadFactory; |
25 | 32 | import org.opensearch.dataprepper.pipeline.common.PipelineThreadPoolExecutor; |
| 33 | +import org.opensearch.dataprepper.pipeline.exceptions.InvalidEventHandleException; |
26 | 34 | import org.opensearch.dataprepper.pipeline.router.Router; |
27 | 35 | import org.opensearch.dataprepper.pipeline.router.RouterCopyRecordStrategy; |
28 | 36 | import org.opensearch.dataprepper.pipeline.router.RouterGetRecordStrategy; |
|
37 | 45 | import java.util.Collections; |
38 | 46 | import java.util.LinkedList; |
39 | 47 | import java.util.List; |
| 48 | +import java.util.Map; |
40 | 49 | import java.util.Objects; |
| 50 | +import java.util.Set; |
41 | 51 | import java.util.concurrent.ExecutorService; |
42 | 52 | import java.util.concurrent.Future; |
43 | 53 | import java.util.concurrent.TimeUnit; |
|
54 | 64 | @SuppressWarnings({"rawtypes", "unchecked"}) |
55 | 65 | public class Pipeline { |
56 | 66 | private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class); |
| 67 | + private final boolean acknowledgementsEnabled; |
57 | 68 | private volatile AtomicBoolean stopRequested; |
| 69 | + private boolean isEmptyRecordsLogged = false; |
58 | 70 |
|
59 | 71 | private final String name; |
60 | 72 | private final Source source; |
@@ -137,6 +149,7 @@ public Pipeline( |
137 | 149 | new PipelineThreadFactory(format("%s-sink-worker", name)), this); |
138 | 150 |
|
139 | 151 | stopRequested = new AtomicBoolean(false); |
| 152 | + this.acknowledgementsEnabled = source.areAcknowledgementsEnabled() || buffer.areAcknowledgementsEnabled(); |
140 | 153 | } |
141 | 154 |
|
142 | 155 | AcknowledgementSetManager getAcknowledgementSetManager() { |
@@ -310,6 +323,83 @@ public void removeShutdownObserver(final PipelineObserver pipelineObserver) { |
310 | 323 | observers.remove(pipelineObserver); |
311 | 324 | } |
312 | 325 |
|
| 326 | + public void runProcessorsAndSinks(final List<Processor> processors) { |
| 327 | + final Map.Entry<Collection, CheckpointState> readResult = buffer.read(getReadBatchTimeoutInMillis()); |
| 328 | + Collection records = readResult.getKey(); |
| 329 | + final CheckpointState checkpointState = readResult.getValue(); |
| 330 | + //TODO Hacky way to avoid logging continuously - Will be removed as part of metrics implementation |
| 331 | + if (records.isEmpty()) { |
| 332 | + if(!isEmptyRecordsLogged) { |
| 333 | + LOG.debug(" {} Worker: No records received from buffer", getName()); |
| 334 | + isEmptyRecordsLogged = true; |
| 335 | + } |
| 336 | + } else { |
| 337 | + LOG.debug(" {} Worker: Processing {} records from buffer", getName(), records.size()); |
| 338 | + } |
| 339 | + |
| 340 | + //Should Empty list from buffer should be sent to the processors? For now sending as the Stateful processors expects it. |
| 341 | + for (final Processor processor : processors) { |
| 342 | + |
| 343 | + List<Event> inputEvents = null; |
| 344 | + if (acknowledgementsEnabled) { |
| 345 | + inputEvents = ((List<Record<Event>>) records).stream().map(Record::getData).collect(Collectors.toList()); |
| 346 | + } |
| 347 | + |
| 348 | + try { |
| 349 | + records = processor.execute(records); |
| 350 | + if (inputEvents != null) { |
| 351 | + processAcknowledgements(inputEvents, records); |
| 352 | + } |
| 353 | + } catch (final Exception e) { |
| 354 | + LOG.error("A processor threw an exception. This batch of Events will be dropped, and their EventHandles will be released: ", e); |
| 355 | + if (inputEvents != null) { |
| 356 | + processAcknowledgements(inputEvents, Collections.emptyList()); |
| 357 | + } |
| 358 | + |
| 359 | + records = Collections.emptyList(); |
| 360 | + break; |
| 361 | + } |
| 362 | + } |
| 363 | + |
| 364 | + postToSink(records); |
| 365 | + // Checkpoint the current batch read from the buffer after being processed by processors and sinks. |
| 366 | + buffer.checkpoint(checkpointState); |
| 367 | + |
| 368 | + } |
| 369 | + |
| 370 | + public void executeAllProcessorsAndSinks() { |
| 371 | + List<Processor> processors = this.processorSets.stream().flatMap(Collection::stream).collect(Collectors.toList()); |
| 372 | + runProcessorsAndSinks(processors); |
| 373 | + } |
| 374 | + |
| 375 | + private void processAcknowledgements(List<Event> inputEvents, Collection<Record<Event>> outputRecords) { |
| 376 | + Set<Event> outputEventsSet = outputRecords.stream().map(Record::getData).collect(Collectors.toSet()); |
| 377 | + // For each event in the input events list that is not present in the output events, send positive acknowledgement, if acknowledgements are enabled for it |
| 378 | + inputEvents.forEach(event -> { |
| 379 | + EventHandle eventHandle = event.getEventHandle(); |
| 380 | + if (eventHandle != null && eventHandle instanceof DefaultEventHandle) { |
| 381 | + InternalEventHandle internalEventHandle = (InternalEventHandle)(DefaultEventHandle)eventHandle; |
| 382 | + if (internalEventHandle.getAcknowledgementSet() != null && !outputEventsSet.contains(event)) { |
| 383 | + eventHandle.release(true); |
| 384 | + } |
| 385 | + } else if (eventHandle != null) { |
| 386 | + throw new InvalidEventHandleException("Unexpected EventHandle"); |
| 387 | + } |
| 388 | + }); |
| 389 | + } |
| 390 | + |
| 391 | + /** |
| 392 | + * TODO Add isolator pattern - Fail if one of the Sink fails [isolator Pattern] |
| 393 | + * Uses the pipeline method to publish to sinks, waits for each of the sink result to be true before attempting to |
| 394 | + * process more records from buffer. |
| 395 | + */ |
| 396 | + private boolean postToSink(final Collection<Record> records) { |
| 397 | + LOG.debug("Pipeline Worker: Submitting {} processed records to sinks", records.size()); |
| 398 | + final List<Future<Void>> sinkFutures = publishToSinks(records); |
| 399 | + final FutureHelperResult<Void> futureResults = FutureHelper.awaitFuturesIndefinitely(sinkFutures); |
| 400 | + return futureResults.getFailedReasons().isEmpty(); |
| 401 | + } |
| 402 | + |
313 | 403 | private void shutdownExecutorService(final ExecutorService executorService, final long timeoutForTerminationInMillis, final String workerName) { |
314 | 404 | LOG.info("Pipeline [{}] - Shutting down {} process workers.", name, workerName); |
315 | 405 |
|
|
0 commit comments