Skip to content

Latest commit

 

History

History
600 lines (447 loc) · 20.1 KB

File metadata and controls

600 lines (447 loc) · 20.1 KB
synopsis Learn details about using messaging services and outbox for asynchronous communications.
status released

Messaging

{{$frontmatter?.synopsis}}

[[toc]]

Overview

Messaging enables decoupled communication between services using events. CAP distinguishes between the logical and technical messaging layers, separating business concerns from technical infrastructure.

The logical layer consists of three primary components:

Modeled Events: Events are defined in CDS models with typed schemas, providing compile-time validation and IDE support. These events represent business occurrences like 'orderProcessed', or 'stockUpdated'.

Event Topics: Topics organize events into logical channels and are responsible for event routing. Topics can be explicitly defined as annotation or derived from service and event names.

CAP Services: Services act as event producers or consumers, using simple APIs like srv.emit('reviewed', data) or srv.on('orderProcessed', handler). Services communicate using logical event names without needing to know the underlying infrastructure details.

The technical layer handles the actual message transport and delivery:

CAP Messaging Service: The translation layer between logical events and technical infrastructure. It manages topic resolution, message serialization, and routing logic. For topic resolution the logical events are delegated to the messaging service, the corresponding event name on the technical service is either the fully qualified event name or the value of the @topic annotation if given.

Message Brokers: The core of the technical infrastructure, handling message persistence, delivery guarantees, and cross-service communication. Examples include SAP Event Mesh, Apache Kafka, or Redis Streams.

The message flow follows a clear path through both layers:

Outbound Flow (Publisher): A CAP service calls srv.emit('reviewed', data) → CAP Messaging Service resolves the event name to a fully qualified topic (for example, OrderSrv.reviewed) → Message is serialized and sent to the Event Broker → Broker stores and distributes the message to all subscribers.

Inbound Flow (Subscriber): Event Broker delivers message from subscribed topic → CAP Messaging Service receives the message → Service name and event name are resolved from the topic → Message is routed to the appropriate CAP service handler via srv.on('reviewed', handler). Registering a modeled srv.on(...) event handler causes the broker to listen to those events, for example, creates a subscription for Event Mesh.

Alternatively custom handlers can bypass the service layer and work directly with the messaging service.

Summary Table

CDS Event Declaration Emitting via srv.emit Emitting via messaging.emit Broker Topic Receiving via srv.on Receiving via messaging.on
No @topic 'reviewed' 'OrderSrv.reviewed' OrderSrv.reviewed 'reviewed' 'OrderSrv.reviewed'
With @topic: 'foo.bar' 'reviewed' 'foo.bar' foo.bar 'reviewed' 'foo.bar'

cds.MessagingService class

Class cds.MessagingService and subclasses thereof are technical services representing asynchronous messaging channels. They can be used directly/low-level, or behind the scenes on higher-level service-to-service eventing.

class cds.MessagingService extends cds.Service

Declaring Events

In your CDS model, you can model events using the event keyword inside services. Once you created the messaging section in cds.requires, all modeled events are automatically enabled for messaging.

You can then use the services to emit events (for your own service) or receive events (for external services).

Example:

In your package.json:

{
  "cds": {
    "requires": {
      "ExternalService": {
        "kind": "odata",
        "model": "srv/external/external.cds"
      },
      "messaging": {
        "kind": "enterprise-messaging"
      }
    }
  }
}

In srv/external/external.cds:

service ExternalService {
    event ExternalEvent {
        ID: UUID;
        rating: Decimal;
    }
}

In srv/own.cds:

service OwnService {
    event OwnEvent {
        ID: UUID;
        rating: Decimal;
    }
}

The implementation can use CAP application services or bypass them and work directly with the messaging service.

In srv/own.js (CAP Application Services):

module.exports = async srv => {
  const externalService = await cds.connect.to('ExternalService')
  externalService.on('ExternalEvent', async msg => {
    await srv.emit('OwnEvent', msg.data)
  })
}

In srv/own.js (CAP Messaging Services):

module.exports = async srv => {
  const externalService = await cds.connect.to('messaging')
  messaging.on('ExternalService.ExternalEvent', async msg => {
    await srv.emit('OwnService.OwnEvent', msg.data)
  })
}

Custom Topics with Declared Events

You can specify topics to modeled events using the @topic annotation. ::: tip If no annotation is provided, the topic will be set to the fully qualified event name. :::

Example:

service OwnService {
    @topic: 'my.custom.topic'
    event OwnEvent { ID: UUID; rating: Decimal; }
}

CloudEvents Protocol

CloudEvents is a commonly used specification for describing event data.

An example event looks like this:

{
  "type": "sap.s4.beh.salesorder.v1.SalesOrder.Created.v1",
  "specversion": "1.0",
  "source": "/default/sap.s4.beh/ER9CLNT001",
  "id": "0894ef45-7741-1eea-b7be-ce30f48e9a1d",
  "time": "2020-08-14T06:21:52Z",
  "datacontenttype": "application/json",
  "data": {
    "SalesOrder":"3016329"
  }
}

To help you adhere to this standard, CAP prefills these header fields automatically. To enable this, you need to set the option format: 'cloudevents' in your message broker.

Example:

{
  cds: {
    requires: {
      messaging: {
        kind: 'enterprise-messaging-shared',
        format: 'cloudevents'
      }
    }
  }
}

You can always overwrite the default values.

Topic Prefixes

If you want the topics to start with a certain string, you can set a publish and/or a subscribe prefix in your message broker.

Example:

{
  cds: {
    requires: {
      messaging: {
        kind: 'enterprise-messaging-shared',
        publishPrefix: 'default/sap.cap/books/',
        subscribePrefix: 'default/sap.cap/reviews/'
      }
    }
  }
}

Topic Manipulations

If you specify your format to be cloudevents, the following default prefixes are set:

{
  publishPrefix: '$namespace/ce/',
  subscribePrefix: '+/+/+/ce/'
}

In addition to that, slashes in the event name are replaced by dots and the source header field is derived based on publishPrefix.

Examples:

publishPrefix derived source
my/own/namespace/ce/ /my/own/namespace
my/own.namespace/-/ce/ /my/own.namespace

Emitting Events

To send a message to the message broker, you can use the emit method on a transaction for the connected service.

Example:

const messaging = await cds.connect.to('messaging')

this.after(['CREATE', 'UPDATE', 'DELETE'], 'Reviews', async (_, req) => {
  const { ID } = req.data
  const { rating } = await cds.run(
    SELECT.one(['round(avg(rating),2) as rating'])
    .from(Reviews)
    .where({ ID }))

  // send to a topic
  await messaging.emit('my/custom/topic',
   { ID, rating })

  // alternative if you want to send custom headers
  await messaging.emit('my/custom/topic',
    { ID, rating },
    { 'X-Correlation-ID': req.headers['X-Correlation-ID'] })

  // or use the object parameter
  await messaging.emit({ event: 'my/custom/topic',
    data: { ID, rating },
    headers: { 'X-Correlation-ID': req.headers['X-Correlation-ID'] }})
})

::: tip The messages are sent once the transaction is successful. Per default, a persistent queue is used. See Messaging - Queue for more information. :::

Receiving Events

To listen to messages from a message broker, you can use the on method on the connected service. The necessary topic subscriptions are automatically created.

Example:

const messaging = await cds.connect.to('messaging')

// listen to a topic
messaging.on('my/custom/topic', msg => {
  const { ID, rating } = msg.data
  return cds.run(UPDATE(Books, ID).with({ rating }))
})

Once all handlers are executed successfully, the message is acknowledged. If one handler throws an error, the message broker is informed that the message couldn't be consumed properly. In this case, the broker sends the message again. To avoid endless cycles, consider catching all errors.

If you want to receive all messages without creating topic subscriptions, you can register on '*'. This feature is useful when consuming messages from a dead letter queue.

messaging.on('*', async msg => { /*...*/ })

::: tip In general, messages don't contain user information but operate with a technical user. As a consequence, the user of the message processing context (cds.context.user) is set to cds.User.privileged and, hence, any necessary authorization checks must be done in custom handlers. :::

Inbox

You can store received messages in an inbox before they're processed. Under the hood, it uses the task queue for reliable asynchronous processing. Enable it by setting the inboxed option to true, for example:

{
  cds: {
    requires: {
      messaging: {
        kind: 'enterprise-messaging',
        inboxed: true
      }
    }
  }
}

Message Brokers

To safely send and receive messages between applications, you need a message broker in-between where you can create queues that listen to topics. All relevant incoming messages are first stored in those queues before they're consumed. This way messages aren't lost when the consuming application isn't available.

In CDS, you can configure one of the available broker services in your requires section.

According to our grow as you go principle, it makes sense to first test your application logic without a message broker and enable it later. Therefore, we provide support for local messaging (if everything is inside one Node.js process) as well as file-based messaging.

Configuring Message Brokers

You must provide all necessary credentials by binding the message broker to your app.

For local environments, use cds bind in a hybrid setup.

::: tip For local testing use kind: enterprise-messaging-shared to avoid the complexity of HTTP-based messaging. :::

SAP Event Mesh (Shared) { #event-mesh-shared}

kind: enterprise-messaging-shared

Use this if you want to communicate using SAP Event Mesh in a shared way.

If you register at least one handler, a queue will automatically be created if not yet existent. Keep in mind that unused queues aren't automatically deleted, this has to be done manually.

You have the following configuration options:

If the queue name isn't specified, it's derived from application_name and the first four characters of application_id of your VCAP_APPLICATION environmental variable, as well as the namespace property of your SAP Event Mesh binding in VCAP_SERVICES: {namespace}/{application_name}/{truncated_application_id}. This makes sure that every application has its own queue.

Example:

{
    "requires": {
        "messaging": {
            "kind": "enterprise-messaging-shared",
            "queue": {
               "name": "my/enterprise/messaging/queue",
               "accessType": "EXCLUSIVE",
               "maxMessageSizeInBytes": 19000000
            },
            "amqp": {
              "incomingSessionWindow": 100
            }
        }
    }
}

::: warning ❗ Warning When using enterprise-messaging-shared in a multitenant scenario, only the provider account will have an event bus. There is no tenant isolation. :::

::: tip You need to install the latest version of the npm package @sap/xb-msg-amqp-v100. :::

::: tip For optimal performance, you should set the correct access type. To make sure your server is not flooded with messages, you should set the incoming session window. :::

SAP Event Mesh

kind: enterprise-messaging

This is the same as enterprise-messaging-shared except that messages are transferred through HTTP. For incoming messages, a webhook is used.

Compared to enterprise-messaging-shared you have the additional configuration option:

  • webhook: An object containing the waitingPeriod property as the time in milliseconds until a webhook is created after the application is listening to incoming HTTP requests (default: 5000). Additional properties are described in the Subscription object in SAP Event Mesh - REST APIs Messaging.

Example:

{
    "requires": {
        "messaging": {
            "kind": "enterprise-messaging",
            "queue": {
               "name": "my/enterprise/messaging/queue",
               "accessType": "EXCLUSIVE",
               "maxMessageSizeInBytes": 19000000
            },
            "webhook": {
              "waitingPeriod": 7000
            }
        }
    }
}

If your server is authenticated using XSUAA, you need to grant the scope $XSAPPNAME.emcallback to SAP Event Mesh for it to be able to trigger the handshake and send messages.

::: code-group

{
  ...,
  "scopes": [
    ...,
    {
      "name": "$XSAPPNAME.emcallback",
      "description": "Event Mesh Callback Access",
      "grant-as-authority-to-apps": [
        "$XSSERVICENAME(<SERVICE_NAME_OF_YOUR_EVENT_MESH_INSTANCE>)"
      ]
    }
  ]
}

:::

Make sure to add this to the service descriptor of your SAP Event Mesh instance:

{
  ...,
  "authorities": [
    "$ACCEPT_GRANTED_AUTHORITIES"
  ]
}

::: warning This will not work in the dev plan of SAP Event Mesh. :::

::: warning If you enable the cors middleware, handshake requests from SAP Event Mesh might be intercepted. :::

SAP Cloud Application Event Hub { #event-broker }

kind: event-broker

Use this if you want to communicate using SAP Cloud Application Event Hub.

The integration with SAP Cloud Application Event Hub is provided using the plugin @cap-js/event-broker. Please see the plugin's setup guide for more details.

SAP Integration Suite, Advanced Event Mesh { #advanced-event-mesh }

kind: advanced-event-mesh

Use this if you want to communicate using SAP Integration Suite, advanced event mesh.

The integration with SAP Integration Suite, advanced event mesh is provided using the plugin @cap-js/advanced-event-mesh. Please see the plugin's setup guide for more details.

Redis PubSub

::: warning This is a beta feature. Beta features aren't part of the officially delivered scope that SAP guarantees for future releases. :::

kind: redis-messaging

Use Redis PubSub as a message broker.

There are no queues:

  • Messages are lost when consumers are not available.
  • All instances receive the messages independently.

::: warning No tenant isolation in multitenant scenario When using redis-messaging in a multitenant scenario, only the provider account will have an event bus. There is no tenant isolation. :::

::: tip You need to install the latest version of the npm package redis. :::

File Based

kind: file-based-messaging

Don't use this in production, only if you want to test your application locally. It creates a file and uses it as a simple message broker.

You can have at most one consuming app per emitted event.

You have the following configuration options:

  • file: You can set the file path (default is ~/.cds-msg-box).

Example:

{
    "requires": {
        "messaging": {
            "kind": "file-based-messaging",
            "file": "../msg-box"
        }
    }
}

::: warning No tenant isolation in multitenant scenario When using file-based-messaging in a multitenant scenario, only the provider account will have an event bus. There is no tenant isolation. :::

Local Messaging

kind: local-messaging

You can use local messaging to communicate inside one Node.js process. It's especially useful in your automated tests.

Composite-Messaging

kind: composite-messaging

If you have several messaging services and don't want to mention them explicitly in your code, you can create a composite-messaging service where you can define routes for incoming and outgoing messages. In those routes, you can use glob patterns to match topics (** for any number of any character, * for any number of any character except / and ., ? for a single character).

Example:

{
  "requires": {
    "messaging": {
      "kind": "composite-messaging",
      "routes": {
        "myEnterpriseMessagingReview": ["cap/msg/system/review/*"],
        "myEnterpriseMessagingBook": ["**/book/*"]
      }
    },
    "myEnterpriseMessagingReview": {
      "kind": "enterprise-messaging",
      "queue": {
        "name": "cap/msg/system/review"
      }
    },
    "myEnterpriseMessagingBook": {
      "kind": "enterprise-messaging",
      "queue": {
        "name": "cap/msg/system/book"
      }
    }
  }
}
module.exports = async srv => {
  const messaging = await cds.connect.to('messaging')

  messaging.on('book/repository/book/modified', msg => {
    // comes from myEnterpriseMessagingBook
  })

  messaging.on('cap/msg/system/review/reviewed', msg => {
    // comes from myEnterpriseMessagingReview
  })
}