Skip to content

Commit 2b0b58a

Browse files
authored
feat: Emit for Connectivity Plan and Transactional Event Queues
1 parent f856100 commit 2b0b58a

15 files changed

Lines changed: 198 additions & 92 deletions

File tree

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
.classpath
88
.settings/
99

10+
## VS Code
11+
java-formatter.xml
12+
settings.json
13+
1014
## Java
1115
target/
1216

cds-feature-event-hub/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
<dependency>
6969
<groupId>com.fasterxml.jackson.core</groupId>
7070
<artifactId>jackson-databind</artifactId>
71-
<version>2.19.2</version>
71+
<version>2.21.0</version>
7272
</dependency>
7373

7474
<!-- TEST DEPENDENCIES -->

cds-feature-event-hub/src/main/java/com/sap/cds/feature/messaging/eventhub/adapter/EventHubWebhookAdapter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ public class EventHubWebhookAdapter extends HttpServlet {
4242
public EventHubWebhookAdapter(CdsRuntime runtime) {
4343
this.runtime = runtime;
4444
this.messagingServices = runtime.getServiceCatalog().getServices(MessagingService.class)
45-
.map(OutboxService::unboxed)
46-
.filter(EventHubMessagingService.class::isInstance)
47-
.map(EventHubMessagingService.class::cast)
48-
.toList();
45+
.map(OutboxService::unboxed)
46+
.filter(EventHubMessagingService.class::isInstance)
47+
.map(EventHubMessagingService.class::cast)
48+
.toList();
4949
ServiceBinding binding = EventHubBindingUtils.getServiceBinding(runtime).get();
5050
this.clientId = EventHubBindingUtils.getClientId(binding);
5151
this.isMultitenant = EventHubBindingUtils.isBindingMultitenant(binding);

cds-feature-event-hub/src/main/java/com/sap/cds/feature/messaging/eventhub/client/EventHubClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ public class EventHubClient extends RestClient {
2222

2323
public EventHubClient(ServiceBinding binding) {
2424
super(ServiceBindingDestinationOptions
25-
.forService(binding)
26-
.onBehalfOf(OnBehalfOf.TECHNICAL_USER_PROVIDER)
27-
.build());
25+
.forService(binding)
26+
.onBehalfOf(OnBehalfOf.TECHNICAL_USER_PROVIDER)
27+
.build());
2828
}
2929

3030
public void sendMessage(Map<String, Object> message, Map<String, Object> headers) throws IOException {

cds-feature-event-hub/src/main/java/com/sap/cds/feature/messaging/eventhub/service/EventHubMessagingService.java

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
public class EventHubMessagingService extends AbstractMessagingService {
3131

3232
private static final Logger logger = LoggerFactory.getLogger(EventHubMessagingService.class);
33-
public static final String CE_SOURCE = "ceSource";
33+
public static final String CE_SOURCE = "ceSource";
34+
public static final String SYSTEM_ID = "systemId";
3435

3536
private final String ceSource;
37+
private final String systemId;
3638
private final boolean isMultitenant;
3739
private final MessagingBrokerQueueListener queueListener;
3840
private final EventHubClient eventHubClient;
@@ -48,12 +50,25 @@ public EventHubMessagingService(ServiceBinding binding, MessagingServiceConfig s
4850
this.ceSource = ((List<String>) binding.getCredentials().get(CE_SOURCE)).get(0) + '/';
4951
} else {
5052
this.ceSource = null;
53+
logger.error("Missing ceSource in binding credentials, emit() will be deactivated");
54+
}
55+
56+
if (binding.getCredentials().containsKey(SYSTEM_ID)) {
57+
this.systemId = (String) binding.getCredentials().get(SYSTEM_ID);
58+
} else {
59+
this.systemId = null;
60+
logger.error("Missing systemId in binding credentials, emit() will be deactivated");
5161
}
5262

5363
this.isMultitenant = EventHubBindingUtils.isBindingMultitenant(binding);
54-
this.queueListener = new MessagingBrokerQueueListener(this, toFullyQualifiedQueueName(queue), queue, runtime, true);
55-
// emitting messages is only supported in multitenant mode
56-
this.eventHubClient = this.isMultitenant ? new EventHubClient(binding) : null;
64+
this.queueListener = new MessagingBrokerQueueListener(this, serviceConfig, toFullyQualifiedQueueName(queue), queue, cdsRuntime);
65+
66+
if (EventHubBindingUtils.bindingHasEndpoints(binding)) {
67+
this.eventHubClient = new EventHubClient(binding);
68+
} else {
69+
this.eventHubClient = null;
70+
logger.error("No endpoints found in service binding, emit() will be deactivated");
71+
}
5772
}
5873

5974
private static MessagingServiceConfig ensureMandatoryConfig(MessagingServiceConfig serviceConfig) {
@@ -67,7 +82,7 @@ public void init() {
6782
super.init();
6883

6984
String queueName = toFullyQualifiedQueueName(queue);
70-
for(MessageTopic topic : queue.getTopics()) {
85+
for (MessageTopic topic : queue.getTopics()) {
7186
String topicName = topic.getBrokerName();
7287
cacheQueueTopicSubscription(queueName, topicName);
7388
}
@@ -104,7 +119,6 @@ public boolean isRegisteredBrokerTopic(String event) {
104119
return queue.getTopics().stream().anyMatch(t -> t.getBrokerName().equals(event));
105120
}
106121

107-
108122
@Override
109123
protected void removeQueue(String name) throws IOException {
110124
// not used
@@ -127,19 +141,30 @@ protected void registerQueueListener(String queue, MessagingBrokerQueueListener
127141

128142
@Override
129143
protected void emitTopicMessage(String topic, TopicMessageEventContext context) {
130-
// emitting messages is only supported in multitenant mode
131-
if (!this.isMultitenant) {
132-
throw new ErrorStatusException(EventHubErrorStatuses.EVENT_HUB_EMIT_FAILED);
144+
if (eventHubClient == null) {
145+
throw new ErrorStatusException(EventHubErrorStatuses.EVENT_HUB_EMIT_MISSING_ENDPOINTS);
133146
}
134147

135148
String tenant = getTenant(context);
136-
137-
try {
138-
Map<String, Object> headers = context.getHeadersMap();
139-
if (ceSource != null) {
140-
headers.put(CloudEventUtils.KEY_SOURCE, ceSource + tenant);
149+
Map<String, Object> headers = context.getHeadersMap();
150+
if (isMultitenant) {
151+
if (ceSource == null) {
152+
throw new ErrorStatusException(EventHubErrorStatuses.EVENT_HUB_EMIT_MISSING_CE_SOURCE);
153+
}
154+
155+
headers.put(CloudEventUtils.KEY_SOURCE, ceSource + tenant);
156+
} else {
157+
if (systemId == null) {
158+
throw new ErrorStatusException(EventHubErrorStatuses.EVENT_HUB_EMIT_MISSING_SYSTEM_ID);
159+
}
160+
if (ceSource == null) {
161+
throw new ErrorStatusException(EventHubErrorStatuses.EVENT_HUB_EMIT_MISSING_CE_SOURCE);
141162
}
163+
164+
headers.put(CloudEventUtils.KEY_SOURCE, ceSource + systemId);
165+
}
142166

167+
try {
143168
logger.debug("Sending message for Event Hub '{}' to type '{}'", getName(), headers.get(CloudEventUtils.KEY_TYPE));
144169
eventHubClient.sendMessage(context.getDataMap(), headers);
145170
} catch (IOException e) {
@@ -148,7 +173,6 @@ protected void emitTopicMessage(String topic, TopicMessageEventContext context)
148173
}
149174

150175
private String getTenant(EventContext context) {
151-
152176
String tenant = context.getUserInfo().getTenant();
153177

154178
if (tenant != null) {

cds-feature-event-hub/src/main/java/com/sap/cds/feature/messaging/eventhub/service/EventHubMessagingServiceConfiguration.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public void services(CdsRuntimeConfigurer configurer) {
3333
List<MessagingServiceConfig> serviceConfigs = config.getServicesByBinding(binding.getName().get());
3434

3535
if (!serviceConfigs.isEmpty()) {
36+
logger.debug("Initialization of the Event Hub based on service binding '{}'", binding.getName().get());
3637
createDefaultService = false;
3738
serviceConfigs.forEach(serviceConfig -> {
3839
if (Boolean.TRUE.equals(serviceConfig.isEnabled())) {
@@ -50,13 +51,16 @@ public void services(CdsRuntimeConfigurer configurer) {
5051
logger.debug("Initialization of the Event Hub based on service binding '{}' and kind '{}'", binding.getName().get(), KIND_LABEL);
5152
createDefaultService = false;
5253
serviceConfigsByKind.forEach(serviceConfig -> {
53-
// check that the service is enabled and whether not already found by name or binding
54-
if (Boolean.TRUE.equals(serviceConfig.isEnabled())
55-
&& serviceConfigs.stream().noneMatch(c -> c.getName().equals(serviceConfig.getName()))) {
56-
configureService(configurer, binding, serviceConfig);
57-
} else {
58-
logger.info("The messaging service '{}' is explicitly disabled via configuration", serviceConfig.getName());
54+
// check whether the service config is not already found by name or binding
55+
if (serviceConfigs.stream().noneMatch(c -> c.getName().equals(serviceConfig.getName()))) {
56+
// check that the service is enabled
57+
if (Boolean.TRUE.equals(serviceConfig.isEnabled())) {
58+
configureService(configurer, binding, serviceConfig);
59+
} else {
60+
logger.info("The messaging service '{}' is explicitly disabled via configuration", serviceConfig.getName());
61+
}
5962
}
63+
6064
});
6165
}
6266

@@ -69,18 +73,18 @@ public void services(CdsRuntimeConfigurer configurer) {
6973
configureService(configurer, binding, defConfig);
7074
} else {
7175
logger.warn(
72-
"Could not create service for binding '{}': A configuration with the same name is already defined for another kind or binding.",
73-
binding.getName().get());
76+
"Could not create service for binding '{}': A configuration with the same name is already defined for another kind or binding.",
77+
binding.getName().get());
7478
}
7579
}
7680

77-
logger.debug("Finished the initialization of the Event Hub service binding '{}'", binding.getName().get());
81+
logger.info("Finished the initialization of the Event Hub service binding '{}'", binding.getName().get());
7882
});
7983
}
8084

8185
private void configureService(CdsRuntimeConfigurer configurer, ServiceBinding binding, MessagingServiceConfig serviceConfig) {
86+
logger.debug("Loading config '{}' for service binding '{}'", serviceConfig.getName(), binding.getName().get());
8287
EventHubMessagingService messagingService = new EventHubMessagingService(binding, serviceConfig, configurer.getCdsRuntime());
8388
configurer.service(outboxed(messagingService, serviceConfig, configurer.getCdsRuntime()));
8489
}
85-
8690
}

cds-feature-event-hub/src/main/java/com/sap/cds/feature/messaging/eventhub/utils/EventHubBindingUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,10 @@ public static boolean isBindingMultitenant(ServiceBinding binding) {
4646
return ServiceBindingUtils.matches(binding, MT_BINDING_LABEL);
4747
}
4848

49+
@SuppressWarnings("unchecked")
50+
public static boolean bindingHasEndpoints(ServiceBinding binding) {
51+
Map<String, Object> credentials = binding.getCredentials();
52+
Map<String, Object> endpoints = (Map<String, Object>) credentials.getOrDefault("endpoints", Map.of());
53+
return !endpoints.isEmpty();
54+
}
4955
}

cds-feature-event-hub/src/main/java/com/sap/cds/feature/messaging/eventhub/utils/EventHubErrorStatuses.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55

66
public enum EventHubErrorStatuses implements ErrorStatus {
77

8-
EVENT_HUB_EMIT_FAILED(50007026, "Event Hub service in single tenant plan does not support to emit events.", ErrorStatuses.SERVER_ERROR),
98
MULTIPLE_EVENT_HUB_BINDINGS(50007027, "Multiple event-hub service bindings found: Only a single service binding for Event Hub is supported.", ErrorStatuses.SERVER_ERROR),
10-
EVENT_HUB_TENANT_CONTEXT_MISSING(50007028, "Missing tenant context to emit a message to Event Hub.", ErrorStatuses.SERVER_ERROR);
9+
EVENT_HUB_TENANT_CONTEXT_MISSING(50007028, "Missing tenant context to emit a message to Event Hub.", ErrorStatuses.SERVER_ERROR),
10+
EVENT_HUB_EMIT_MISSING_CE_SOURCE(50007029, "Event Hub service failed to emit, due to ceSource missing in the service binding.", ErrorStatuses.SERVER_ERROR),
11+
EVENT_HUB_EMIT_MISSING_SYSTEM_ID(50007030, "Event Hub service failed to emit, due to systemId missing in the service binding.", ErrorStatuses.SERVER_ERROR),
12+
EVENT_HUB_EMIT_MISSING_ENDPOINTS(50007031, "Event Hub service failed to emit, due to missing endpoints in the service binding.", ErrorStatuses.SERVER_ERROR);
1113

1214
private final int code;
1315
private final String description;

cds-feature-event-hub/src/test/java/com/sap/cds/feature/messaging/eventhub/service/EventHubMessagingServiceConfigurationTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ void testDefaultServConfiguration() {
3333
CdsRuntimeConfigurer configurer = CdsRuntimeConfigurer.create();
3434
configurer.environment(() -> {
3535
return Stream.of(new DefaultServiceBindingBuilder()
36-
.withName("eb-mt-tests-eb").withServicePlan("event-connectivity")
37-
.withServiceName("event-broker").build());
36+
.withName("eb-mt-tests-eb")
37+
.withServiceName("event-broker")
38+
.withServicePlan("event-connectivity")
39+
.build());
3840
});
3941

4042
configurer.serviceConfigurations();

cds-feature-event-hub/src/test/java/com/sap/cds/feature/messaging/eventhub/service/EventHubMessagingServiceTest.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import java.util.HashMap;
77
import java.util.Map;
8-
import java.util.stream.Stream;
98

109
import org.junit.jupiter.api.Assertions;
1110
import org.junit.jupiter.api.Test;
@@ -15,36 +14,9 @@
1514
import com.sap.cds.services.impl.environment.SimplePropertiesProvider;
1615
import com.sap.cds.services.runtime.CdsRuntime;
1716
import com.sap.cds.services.runtime.CdsRuntimeConfigurer;
18-
import com.sap.cloud.environment.servicebinding.api.DefaultServiceBindingBuilder;
1917

2018
class EventHubMessagingServiceTest {
2119

22-
@Test
23-
void testEmit_SingleTenantNotSupported() {
24-
CdsProperties properties = new CdsProperties();
25-
CdsProperties.Messaging.MessagingServiceConfig config = new CdsProperties.Messaging.MessagingServiceConfig("cfg");
26-
config.setBinding("eb-mt-tests-eb");
27-
config.getOutbox().setEnabled(false);
28-
properties.getMessaging().getServices().put(config.getName(), config);
29-
30-
CdsRuntimeConfigurer configurer = CdsRuntimeConfigurer.create(new SimplePropertiesProvider(properties));
31-
configurer.environment(() -> {
32-
return Stream.of(new DefaultServiceBindingBuilder()
33-
.withName("eb-mt-tests-eb").withServicePlan("event-connectivity")
34-
.withServiceName("event-broker").build());
35-
});
36-
37-
configurer.environmentConfigurations();
38-
configurer.serviceConfigurations();
39-
configurer.eventHandlerConfigurations();
40-
CdsRuntime runtime = configurer.complete();
41-
42-
43-
ContextualizedServiceException e = Assertions.assertThrows(ContextualizedServiceException.class, () -> emitMessage(runtime));
44-
assertEquals(EventHubErrorStatuses.EVENT_HUB_EMIT_FAILED, e.getErrorStatus());
45-
46-
}
47-
4820
@Test
4921
void testEmit_TenantNotSupported() {
5022
CdsProperties properties = new CdsProperties();

0 commit comments

Comments
 (0)