Skip to content

Commit c10691f

Browse files
committed
Blue Language Specification 1.0, Part II, complete implementation
1 parent 17dec4c commit c10691f

88 files changed

Lines changed: 8125 additions & 76 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/main/java/blue/language/Blue.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
import blue.language.merge.NodeResolver;
77
import blue.language.merge.processor.*;
88
import blue.language.model.Node;
9+
import blue.language.processor.DocumentProcessingResult;
10+
import blue.language.processor.ContractProcessor;
11+
import blue.language.processor.DocumentProcessor;
12+
import blue.language.processor.model.Contract;
913
import blue.language.preprocess.Preprocessor;
1014
import blue.language.utils.*;
1115
import blue.language.utils.limits.CompositeLimits;
@@ -27,6 +31,7 @@ public class Blue implements NodeResolver {
2731
private TypeClassResolver typeClassResolver;
2832
private Map<String, String> preprocessingAliases = new HashMap<>();
2933
private Limits globalLimits = NO_LIMITS;
34+
private DocumentProcessor documentProcessor;
3035

3136

3237

@@ -37,6 +42,7 @@ public Blue() {
3742
public Blue(NodeProvider nodeProvider) {
3843
this.nodeProvider = NodeProviderWrapper.wrap(nodeProvider);
3944
this.mergingProcessor = createDefaultNodeProcessor();
45+
this.documentProcessor = createDefaultDocumentProcessor();
4046
}
4147

4248
public Blue(NodeProvider nodeProvider, MergingProcessor mergingProcessor) {
@@ -52,6 +58,7 @@ public Blue(NodeProvider nodeProvider, MergingProcessor mergingProcessor, TypeCl
5258
this.nodeProvider = NodeProviderWrapper.wrap(nodeProvider);
5359
this.mergingProcessor = mergingProcessor;
5460
this.typeClassResolver = typeClassResolver;
61+
this.documentProcessor = createDefaultDocumentProcessor();
5562
}
5663

5764
public Node resolve(Node node) {
@@ -166,6 +173,41 @@ public void addPreprocessingAliases(Map<String, String> aliases) {
166173
preprocessingAliases.putAll(aliases);
167174
}
168175

176+
public Blue registerContractProcessor(ContractProcessor<? extends Contract> processor) {
177+
if (processor == null) {
178+
throw new IllegalArgumentException("processor must not be null");
179+
}
180+
if (documentProcessor == null) {
181+
documentProcessor = createDefaultDocumentProcessor();
182+
}
183+
documentProcessor.registerContractProcessor(processor);
184+
return this;
185+
}
186+
187+
public DocumentProcessingResult processDocument(Node document, Node event) {
188+
return ensureDocumentProcessor().processDocument(document, event);
189+
}
190+
191+
public DocumentProcessor getDocumentProcessor() {
192+
return ensureDocumentProcessor();
193+
}
194+
195+
public Blue documentProcessor(DocumentProcessor documentProcessor) {
196+
if (documentProcessor == null) {
197+
throw new IllegalArgumentException("documentProcessor must not be null");
198+
}
199+
this.documentProcessor = documentProcessor;
200+
return this;
201+
}
202+
203+
public DocumentProcessingResult initializeDocument(Node document) {
204+
return ensureDocumentProcessor().initializeDocument(document);
205+
}
206+
207+
public boolean isInitialized(Node document) {
208+
return ensureDocumentProcessor().isInitialized(document);
209+
}
210+
169211
public Node preprocess(Node node) {
170212
if (node.getBlue() != null && node.getBlue().getValue() instanceof String) {
171213
String blueValue = (String) node.getBlue().getValue();
@@ -239,6 +281,17 @@ public Blue preprocessingAliases(Map<String, String> preprocessingAliases) {
239281
return this;
240282
}
241283

284+
private DocumentProcessor ensureDocumentProcessor() {
285+
if (documentProcessor == null) {
286+
documentProcessor = createDefaultDocumentProcessor();
287+
}
288+
return documentProcessor;
289+
}
290+
291+
private DocumentProcessor createDefaultDocumentProcessor() {
292+
return new DocumentProcessor();
293+
}
294+
242295
private Limits combineWithGlobalLimits(Limits methodLimits) {
243296
if (globalLimits == NO_LIMITS) {
244297
return methodLimits;
@@ -265,4 +318,4 @@ private MergingProcessor createDefaultNodeProcessor() {
265318
);
266319
}
267320

268-
}
321+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package blue.language.processor;
2+
3+
import blue.language.model.Node;
4+
import blue.language.processor.model.MarkerContract;
5+
6+
import java.util.Collections;
7+
import java.util.LinkedHashMap;
8+
import java.util.Map;
9+
import java.util.Objects;
10+
11+
/**
12+
* Snapshot of the data passed to a channel processor during matching.
13+
*
14+
* <p>The event node supplied here is a fresh clone of the inbound event.
15+
* Channel processors MAY mutate it (for example, to normalise or enrich the
16+
* payload); any changes are confined to this invocation and the adapted node
17+
* becomes the one delivered to downstream handlers and persisted in the
18+
* checkpoint.</p>
19+
*/
20+
public final class ChannelEvaluationContext {
21+
22+
private final String scopePath;
23+
private final Node event;
24+
private final Object eventObject;
25+
private final Map<String, MarkerContract> markers;
26+
27+
ChannelEvaluationContext(String scopePath,
28+
Node event,
29+
Object eventObject,
30+
Map<String, MarkerContract> markers) {
31+
this.scopePath = Objects.requireNonNull(scopePath, "scopePath");
32+
this.event = event;
33+
this.eventObject = eventObject;
34+
this.markers = markers == null
35+
? Collections.emptyMap()
36+
: Collections.unmodifiableMap(new LinkedHashMap<>(markers));
37+
}
38+
39+
public String scopePath() {
40+
return scopePath;
41+
}
42+
43+
public Node event() {
44+
// Mutable clone scoped to this invocation; safe to adapt.
45+
return event;
46+
}
47+
48+
public Object eventObject() {
49+
return eventObject;
50+
}
51+
52+
public Map<String, MarkerContract> markers() {
53+
return markers;
54+
}
55+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package blue.language.processor;
2+
3+
import blue.language.processor.model.ChannelContract;
4+
5+
/**
6+
* Processor specialization for channel contracts.
7+
*/
8+
public interface ChannelProcessor<T extends ChannelContract> extends ContractProcessor<T> {
9+
10+
boolean matches(T contract, ChannelEvaluationContext context);
11+
12+
default String eventId(T contract, ChannelEvaluationContext context) {
13+
return null;
14+
}
15+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package blue.language.processor;
2+
3+
import blue.language.model.Node;
4+
import blue.language.processor.model.ChannelContract;
5+
6+
import java.util.List;
7+
import java.util.Objects;
8+
9+
/**
10+
* Executes channel matching and handler invocation for a scope.
11+
*
12+
* <p>Applies checkpoint gating for external channels and feeds successful
13+
* matches into the registered handler processors.</p>
14+
*/
15+
final class ChannelRunner {
16+
17+
private final DocumentProcessor owner;
18+
private final ProcessorEngine.Execution execution;
19+
private final DocumentProcessingRuntime runtime;
20+
private final CheckpointManager checkpointManager;
21+
22+
ChannelRunner(DocumentProcessor owner,
23+
ProcessorEngine.Execution execution,
24+
DocumentProcessingRuntime runtime,
25+
CheckpointManager checkpointManager) {
26+
this.owner = Objects.requireNonNull(owner, "owner");
27+
this.execution = Objects.requireNonNull(execution, "execution");
28+
this.runtime = Objects.requireNonNull(runtime, "runtime");
29+
this.checkpointManager = Objects.requireNonNull(checkpointManager, "checkpointManager");
30+
}
31+
32+
void runExternalChannel(String scopePath,
33+
ContractBundle bundle,
34+
ContractBundle.ChannelBinding channel,
35+
Node event) {
36+
if (execution.isScopeInactive(scopePath)) {
37+
return;
38+
}
39+
runtime.chargeChannelMatchAttempt();
40+
ChannelContract contract = channel.contract();
41+
ProcessorEngine.ChannelMatch match = ProcessorEngine.evaluateChannel(owner, contract, bundle, scopePath, event);
42+
if (!match.matches) {
43+
return;
44+
}
45+
Node eventForHandlers = match.eventNode() != null ? match.eventNode() : event;
46+
checkpointManager.ensureCheckpointMarker(scopePath, bundle);
47+
CheckpointManager.CheckpointRecord checkpoint = checkpointManager.findCheckpoint(bundle, channel.key());
48+
String eventSignature = match.eventId != null
49+
? match.eventId
50+
: ProcessorEngine.canonicalSignature(eventForHandlers);
51+
if (checkpointManager.isDuplicate(checkpoint, eventSignature)) {
52+
return;
53+
}
54+
runHandlers(scopePath, bundle, channel.key(), eventForHandlers, false);
55+
if (execution.isScopeInactive(scopePath)) {
56+
return;
57+
}
58+
checkpointManager.persist(scopePath, bundle, checkpoint, eventSignature, eventForHandlers);
59+
}
60+
61+
void runHandlers(String scopePath,
62+
ContractBundle bundle,
63+
String channelKey,
64+
Node event,
65+
boolean allowTerminatedWork) {
66+
List<ContractBundle.HandlerBinding> handlers = bundle.handlersFor(channelKey);
67+
if (handlers.isEmpty()) {
68+
return;
69+
}
70+
for (ContractBundle.HandlerBinding handler : handlers) {
71+
if (!allowTerminatedWork && execution.isScopeInactive(scopePath)) {
72+
break;
73+
}
74+
runtime.chargeHandlerOverhead();
75+
ProcessorExecutionContext context = execution.createContext(scopePath, bundle, event, allowTerminatedWork);
76+
ProcessorEngine.executeHandler(owner, handler.contract(), context);
77+
if (execution.isScopeInactive(scopePath) && !allowTerminatedWork) {
78+
break;
79+
}
80+
}
81+
}
82+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package blue.language.processor;
2+
3+
import blue.language.model.Node;
4+
import blue.language.processor.model.ChannelEventCheckpoint;
5+
import blue.language.processor.model.MarkerContract;
6+
import blue.language.processor.util.PointerUtils;
7+
import blue.language.processor.util.ProcessorContractConstants;
8+
import blue.language.processor.util.ProcessorPointerConstants;
9+
10+
import java.util.LinkedHashMap;
11+
import java.util.Map;
12+
import java.util.Objects;
13+
import java.util.function.Function;
14+
15+
/**
16+
* Handles per-scope checkpoint lifecycle: lazy creation, gating, and persistence.
17+
*/
18+
final class CheckpointManager {
19+
20+
private final DocumentProcessingRuntime runtime;
21+
private final Function<Node, String> signatureFn;
22+
23+
CheckpointManager(DocumentProcessingRuntime runtime, Function<Node, String> signatureFn) {
24+
this.runtime = Objects.requireNonNull(runtime, "runtime");
25+
this.signatureFn = Objects.requireNonNull(signatureFn, "signatureFn");
26+
}
27+
28+
void ensureCheckpointMarker(String scopePath, ContractBundle bundle) {
29+
MarkerContract marker = bundle.marker(ProcessorContractConstants.KEY_CHECKPOINT);
30+
String pointer = PointerUtils.resolvePointer(scopePath, ProcessorPointerConstants.RELATIVE_CHECKPOINT);
31+
if (marker == null) {
32+
Node markerNode = new Node()
33+
.type(new Node().blueId("ChannelEventCheckpoint"))
34+
.properties("lastEvents", new Node().properties(new LinkedHashMap<>()))
35+
.properties("lastSignatures", new Node().properties(new LinkedHashMap<>()));
36+
runtime.directWrite(pointer, markerNode);
37+
bundle.registerCheckpointMarker(new ChannelEventCheckpoint());
38+
return;
39+
}
40+
if (!(marker instanceof ChannelEventCheckpoint)) {
41+
throw new IllegalStateException(
42+
"Reserved key 'checkpoint' must contain a Channel Event Checkpoint at " + pointer);
43+
}
44+
}
45+
46+
CheckpointRecord findCheckpoint(ContractBundle bundle, String channelKey) {
47+
for (Map.Entry<String, MarkerContract> entry : bundle.markerEntries()) {
48+
if (entry.getValue() instanceof ChannelEventCheckpoint) {
49+
ChannelEventCheckpoint checkpoint = (ChannelEventCheckpoint) entry.getValue();
50+
Node stored = checkpoint.lastEvent(channelKey);
51+
CheckpointRecord record = new CheckpointRecord(entry.getKey(), checkpoint, channelKey, stored);
52+
String storedSignature = checkpoint.lastSignature(channelKey);
53+
record.lastEventSignature = storedSignature != null ? storedSignature : signatureFn.apply(stored);
54+
return record;
55+
}
56+
}
57+
return null;
58+
}
59+
60+
boolean isDuplicate(CheckpointRecord record, String signature) {
61+
return record != null && record.matches(signature);
62+
}
63+
64+
void persist(String scopePath,
65+
ContractBundle bundle,
66+
CheckpointRecord record,
67+
String eventSignature,
68+
Node eventNode) {
69+
if (record == null) {
70+
return;
71+
}
72+
String pointer = PointerUtils.resolvePointer(scopePath,
73+
ProcessorPointerConstants.relativeCheckpointLastEvent(record.markerKey, record.channelKey));
74+
Node stored = eventNode != null ? eventNode.clone() : null;
75+
runtime.chargeCheckpointUpdate();
76+
runtime.directWrite(pointer, stored);
77+
record.checkpoint.updateEvent(record.channelKey, stored);
78+
record.lastEventNode = stored != null ? stored.clone() : null;
79+
String signaturePointer = PointerUtils.resolvePointer(scopePath,
80+
ProcessorPointerConstants.relativeCheckpointLastSignature(record.markerKey, record.channelKey));
81+
Node signatureNode = eventSignature != null ? new Node().value(eventSignature) : null;
82+
runtime.directWrite(signaturePointer, signatureNode);
83+
record.checkpoint.updateSignature(record.channelKey, eventSignature);
84+
record.lastEventSignature = eventSignature;
85+
}
86+
87+
static final class CheckpointRecord {
88+
final String markerKey;
89+
final ChannelEventCheckpoint checkpoint;
90+
final String channelKey;
91+
Node lastEventNode;
92+
String lastEventSignature;
93+
94+
CheckpointRecord(String markerKey,
95+
ChannelEventCheckpoint checkpoint,
96+
String channelKey,
97+
Node lastEventNode) {
98+
this.markerKey = markerKey;
99+
this.checkpoint = checkpoint;
100+
this.channelKey = channelKey;
101+
this.lastEventNode = lastEventNode != null ? lastEventNode.clone() : null;
102+
}
103+
104+
boolean matches(String signature) {
105+
return signature != null && signature.equals(lastEventSignature);
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)