Skip to content

Latest commit

 

History

History
850 lines (650 loc) · 39.5 KB

File metadata and controls

850 lines (650 loc) · 39.5 KB

Event-Driven Architecture: Using Events to Integrate Microservices

事件驱动架构:使用事件来集成微服务

In the preceding chapter, we never actually spoke about how we would receive the "batch quantity changed" events, or indeed, how we might notify the outside world about reallocations.

在前一章中,我们实际上从未谈及 如何 接收“批次数量已更改”事件,或者我们如何通知外界关于重新分配的情况。

We have a microservice with a web API, but what about other ways of talking to other systems? How will we know if, say, a shipment is delayed or the quantity is amended? How will we tell the warehouse system that an order has been allocated and needs to be sent to a customer?

我们有一个带有 Web API 的微服务,但与其他系统交互的其他方式呢?比如说,如果一个货运被延迟或数量被修改, 我们怎么得知?我们又如何告诉仓储系统,一个订单已经被分配,需要发送给客户呢?

In this chapter, we’d like to show how the events metaphor can be extended to encompass the way that we handle incoming and outgoing messages from the system. Internally, the core of our application is now a message processor. Let’s follow through on that so it becomes a message processor externally as well. As shown in Our application is a message processor(我们的应用程序是一个消息处理器), our application will receive events from external sources via an external message bus (we’ll use Redis pub/sub queues as an example) and publish its outputs, in the form of events, back there as well.

在本章中,我们希望展示如何扩展事件这一比喻,使其涵盖我们处理系统中传入和传出消息的方式。在内部,我们应用程序的核心现在是一个消息处理器。 让我们继续深化这个思路,使其也能够在 外部 成为一个消息处理器。如 Our application is a message processor(我们的应用程序是一个消息处理器) 所示, 我们的应用程序将通过外部消息总线(这里以 Redis 的发布/订阅队列为例)接收来自外部来源的事件,并以事件的形式将其输出发布回外部消息总线。

apwp 1101
Figure 1. Our application is a message processor(我们的应用程序是一个消息处理器)
Tip

The code for this chapter is in the chapter_11_external_events branch on GitHub:

本章的代码在 GitHub 上 的 chapter_11_external_events 分支中:

git clone https://github.com/cosmicpython/code.git
cd code
git checkout chapter_11_external_events
# or to code along, checkout the previous chapter:
git checkout chapter_10_commands

Distributed Ball of Mud, and Thinking in Nouns

分布式泥球,与基于名词的思考方式

Before we get into that, let’s talk about the alternatives. We regularly talk to engineers who are trying to build out a microservices architecture. Often they are migrating from an existing application, and their first instinct is to split their system into nouns.

在深入探讨之前,让我们先来谈谈其他选择。我们经常与正尝试构建微服务架构的工程师交流。 他们通常在从现有应用程序迁移时,第一反应是将系统按 名词 拆分。

What nouns have we introduced so far in our system? Well, we have batches of stock, orders, products, and customers. So a naive attempt at breaking up the system might have looked like Context diagram with noun-based services(基于名词的服务的上下文图) (notice that we’ve named our system after a noun, Batches, instead of Allocation).

在我们的系统中到目前为止我们引入了哪些名词?嗯,我们有库存批次、订单、产品和客户。因此, 一种天真的尝试是将系统拆分成类似 Context diagram with noun-based services(基于名词的服务的上下文图) 的形式(注意, 我们用一个名词 Batches 来给我们的系统命名,而不是 Allocation)。

apwp 1102
Figure 2. Context diagram with noun-based services(基于名词的服务的上下文图)
[plantuml, apwp_1102, config=plantuml.cfg]
@startuml Batches Context Diagram
!include images/C4_Context.puml

System(batches, "Batches", "Knows about available stock")
Person(customer, "Customer", "Wants to buy furniture")
System(orders, "Orders", "Knows about customer orders")
System(warehouse, "Warehouse", "Knows about shipping instructions")

Rel_R(customer, orders, "Places order with")
Rel_D(orders, batches, "Reserves stock with")
Rel_D(batches, warehouse, "Sends instructions to")

@enduml

Each "thing" in our system has an associated service, which exposes an HTTP API.

我们系统中的每个“事物”都有一个相关的服务,并通过一个 HTTP API 暴露出来。

Let’s work through an example happy-path flow in Command flow 1(命令流程 1): our users visit a website and can choose from products that are in stock. When they add an item to their basket, we will reserve some stock for them. When an order is complete, we confirm the reservation, which causes us to send dispatch instructions to the warehouse. Let’s also say, if this is the customer’s third order, we want to update the customer record to flag them as a VIP.

让我们通过 Command flow 1(命令流程 1) 中的一个示例“理想路径”流程来深入了解:我们的用户访问网站,可以选择有库存的产品。 当他们将商品添加到购物车中时,我们会为他们保留一些库存。当订单完成时,我们确认这一预留操作,这会促使我们向仓储发送发货指令。 我们还假设,如果这是客户的第三个订单,我们希望更新客户记录,以标记他们为 VIP。

apwp 1103
Figure 3. Command flow 1(命令流程 1)
[plantuml, apwp_1103, config=plantuml.cfg]
@startuml
scale 4

actor Customer
entity Orders
entity Batches
entity Warehouse
database CRM


== Reservation ==

  Customer -> Orders: Add product to basket
  Orders -> Batches: Reserve stock

== Purchase ==

  Customer -> Orders: Place order
  activate Orders
  Orders -> Batches: Confirm reservation
  Batches -> Warehouse: Dispatch goods
  Orders -> CRM: Update customer record
  deactivate Orders


@enduml

We can think of each of these steps as a command in our system: ReserveStock, ConfirmReservation, DispatchGoods, MakeCustomerVIP, and so forth.

我们可以将这些步骤中的每一步视为系统中的一个命令:ReserveStockConfirmReservationDispatchGoodsMakeCustomerVIP,等等。

This style of architecture, where we create a microservice per database table and treat our HTTP APIs as CRUD interfaces to anemic models, is the most common initial way for people to approach service-oriented design.

这种架构风格是最常见的服务化设计初始方式,其中我们为每个数据库表创建一个微服务,并将 HTTP API 视为贫血模型的 CRUD 接口。

This works fine for systems that are very simple, but it can quickly degrade into a distributed ball of mud.

对于非常简单的系统来说,这种方式运转得 还算可以,但它很快就可能演变成一个分布式的泥球。

To see why, let’s consider another case. Sometimes, when stock arrives at the warehouse, we discover that items have been water damaged during transit. We can’t sell water-damaged sofas, so we have to throw them away and request more stock from our partners. We also need to update our stock model, and that might mean we need to reallocate a customer’s order.

要了解原因,让我们考虑另一个情况。有时候,当库存到达仓库时,我们会发现商品在运输过程中受到了水损。我们无法出售受水损的沙发, 因此我们不得不将其丢弃,并向合作伙伴请求更多库存。同时,我们需要更新我们的库存模型,而这可能意味着我们需要重新分配客户的订单。

Where does this logic go?

这种逻辑该放在哪里呢?

Well, the Warehouse system knows that the stock has been damaged, so maybe it should own this process, as shown in Command flow 2(命令流程 2).

嗯,仓储系统知道库存受损了,所以也许它应该负责这个流程,如 Command flow 2(命令流程 2) 所示。

apwp 1104
Figure 4. Command flow 2(命令流程 2)
[plantuml, apwp_1104, config=plantuml.cfg]
@startuml
scale 4

actor w as "Warehouse worker"
entity Warehouse
entity Batches
entity Orders
database CRM


  w -> Warehouse: Report stock damage
  activate Warehouse
  Warehouse -> Batches: Decrease available stock
  Batches -> Batches: Reallocate orders
  Batches -> Orders: Update order status
  Orders -> CRM: Update order history
  deactivate Warehouse

@enduml

This sort of works too, but now our dependency graph is a mess. To allocate stock, the Orders service drives the Batches system, which drives Warehouse; but in order to handle problems at the warehouse, our Warehouse system drives Batches, which drives Orders.

这种方式也 勉强可行,但现在我们的依赖关系图变得一团糟。为了分配库存,订单服务驱动了批次系统,而批次系统又驱动了仓储系统; 但为了处理仓储中的问题,我们的仓储系统又驱动了批次系统,而批次系统又驱动了订单服务。

Multiply this by all the other workflows we need to provide, and you can see how services quickly get tangled up.

将这个例子乘以我们需要支持的所有其他工作流,你就能看到服务如何迅速纠缠在一起。

Error Handling in Distributed Systems

分布式系统中的错误处理

"Things break" is a universal law of software engineering. What happens in our system when one of our requests fails? Let’s say that a network error happens right after we take a user’s order for three MISBEGOTTEN-RUG, as shown in Command flow with error(带有错误的命令流程).

“事情会出错”是软件工程的一条普遍规律。当我们的系统中某个请求失败时会发生什么?假设在我们接收到用户订购三个 MISBEGOTTEN-RUG 后, 立即发生了网络错误,如 Command flow with error(带有错误的命令流程) 所示。

We have two options here: we can place the order anyway and leave it unallocated, or we can refuse to take the order because the allocation can’t be guaranteed. The failure state of our batches service has bubbled up and is affecting the reliability of our order service.

在这里,我们有两个选项:我们可以继续下单,但让订单保持未分配状态,或者我们可以拒绝接受订单,因为无法保证分配成功。 批次服务的故障状态已经冒泡上来了,并且正在影响我们订单服务的可靠性。

When two things have to be changed together, we say that they are coupled. We can think of this failure cascade as a kind of temporal coupling: every part of the system has to work at the same time for any part of it to work. As the system gets bigger, there is an exponentially increasing probability that some part is degraded.

当两个事物必须一起被更改时,我们称它们是 耦合的。我们可以将这种故障级联视为一种 时间耦合:系统的每个部分都必须同时工作, 任何部分才能正常运行。随着系统规模的增大,某些部分出现性能下降的概率会以指数级增长。

apwp 1105
Figure 5. Command flow with error(带有错误的命令流程)
[plantuml, apwp_1105, config=plantuml.cfg]
@startuml
scale 4

actor Customer
entity Orders
entity Batches

Customer -> Orders: Place order
Orders -[#red]x Batches: Confirm reservation
hnote right: network error
Orders --> Customer: ???

@enduml
Connascence(关联性)

We’re using the term coupling here, but there’s another way to describe the relationships between our systems. Connascence is a term used by some authors to describe the different types of coupling.

我们在这里使用了术语 耦合,但描述我们系统之间关系还有另一种方式。共生关系(Connascence)是一些作者用于描述各种耦合类型的一个术语。

Connascence isn’t bad, but some types of connascence are stronger than others. We want to have strong connascence locally, as when two classes are closely related, but weak connascence at a distance.

共生关系并不是 糟糕的,但某些类型的共生关系比其他类型的 更强。我们希望在本地拥有强共生关系, 例如当两个类紧密相关时,但在远距离上保持弱共生关系。

In our first example of a distributed ball of mud, we see Connascence of Execution: multiple components need to know the correct order of work for an operation to be successful.

在我们第一个分布式泥球的例子中,我们看到了执行共生关系(Connascence of Execution):多个组件需要知道正确的工作顺序,操作才能成功。

When thinking about error conditions here, we’re talking about Connascence of Timing: multiple things have to happen, one after another, for the operation to work.

当考虑这里的错误情况时,我们讨论的是时间共生关系(Connascence of Timing):多个操作必须一个接一个地发生,才能使操作正常工作。

When we replace our RPC-style system with events, we replace both of these types of connascence with a weaker type. That’s Connascence of Name: multiple components need to agree only on the name of an event and the names of fields it carries.

当我们用事件替代基于 RPC 风格的系统时,我们用一种 更弱 的共生关系替代了以上两种。 这种关系是名称共生关系(Connascence of Name):多个组件只需要就事件的名称以及其携带的字段名称达成一致。

We can never completely avoid coupling, except by having our software not talk to any other software. What we want is to avoid inappropriate coupling. Connascence provides a mental model for understanding the strength and type of coupling inherent in different architectural styles. Read all about it at connascence.io.

我们永远无法完全避免耦合,除非让我们的软件不与任何其他软件交互。我们想要的是避免 不恰当的 耦合。 共生关系(Connascence)为理解不同架构风格中固有的耦合强度和类型提供了一种思维模型。 详情请参阅: connascence.io

The Alternative: Temporal Decoupling Using Asynchronous Messaging

另一种选择:使用异步消息实现时间解耦

How do we get appropriate coupling? We’ve already seen part of the answer, which is that we should think in terms of verbs, not nouns. Our domain model is about modeling a business process. It’s not a static data model about a thing; it’s a model of a verb.

我们如何实现适当的耦合?答案的一部分我们已经见过,那就是我们应该用动词而不是名词来思考。我们的领域模型是关于建模一个业务流程的。 它不是一个关于某个事物的静态数据模型,而是一个关于动词的模型。

So instead of thinking about a system for orders and a system for batches, we think about a system for ordering and a system for allocating, and so on.

因此,与其考虑一个订单系统和一个批次系统,不如考虑一个用于 下单 的系统和一个用于 分配 的系统,等等。

When we separate things this way, it’s a little easier to see which system should be responsible for what. When thinking about ordering, really we want to make sure that when we place an order, the order is placed. Everything else can happen later, so long as it happens.

当我们以这种方式分离时,更容易看出每个系统应该负责什么。当我们考虑 下单 时,我们真正想要的是确保当我们下了一个订单时, 订单会被成功下达。而其他的事情只要发生了,可以 稍后 再进行。

Note
If this sounds familiar, it should! Segregating responsibilities is the same process we went through when designing our aggregates and commands. 如果这听起来很熟悉,那是理所当然的!职责分离正是我们在设计聚合和命令时所经历的相同过程。

Like aggregates, microservices should be consistency boundaries. Between two services, we can accept eventual consistency, and that means we don’t need to rely on synchronous calls. Each service accepts commands from the outside world and raises events to record the result. Other services can listen to those events to trigger the next steps in the workflow.

与聚合类似,微服务也应该是 一致性边界。在两个服务之间,我们可以接受最终一致性,这意味着我们不需要依赖同步调用。 每个服务从外部世界接收命令,并通过事件来记录结果。其他服务可以监听这些事件来触发工作流中的下一步操作。

To avoid the Distributed Ball of Mud antipattern, instead of temporally coupled HTTP API calls, we want to use asynchronous messaging to integrate our systems. We want our BatchQuantityChanged messages to come in as external messages from upstream systems, and we want our system to publish Allocated events for downstream systems to listen to.

为了避免分布式泥球这种反模式,我们希望使用异步消息来集成系统,而不是使用时间耦合的 HTTP API 调用。 我们希望 BatchQuantityChanged 消息作为来自上游系统的外部消息传入,并希望我们的系统能够发布 Allocated 事件供下游系统监听。

Why is this better? First, because things can fail independently, it’s easier to handle degraded behavior: we can still take orders if the allocation system is having a bad day.

为什么这种方式更好?首先,因为各部分可以独立故障,所以更容易处理降级行为:即使分配系统出现问题,我们仍然可以接收订单。

Second, we’re reducing the strength of coupling between our systems. If we need to change the order of operations or to introduce new steps in the process, we can do that locally.

其次,我们降低了系统之间耦合的强度。如果我们需要更改操作的顺序或在流程中引入新的步骤,我们可以在本地完成这些更改。

Using a Redis Pub/Sub Channel for Integration

使用 Redis 发布/订阅通道进行集成

Let’s see how it will all work concretely. We’ll need some way of getting events out of one system and into another, like our message bus, but for services. This piece of infrastructure is often called a message broker. The role of a message broker is to take messages from publishers and deliver them to subscribers.

让我们来看一下它具体是如何工作的。我们需要某种方式将事件从一个系统传递到另一个系统,就像我们的消息总线,但这是针对服务的。 这种基础设施通常被称为 消息代理(message broker)。消息代理的作用是从发布者接收消息并将其传递给订阅者。

At MADE.com, we use Event Store; Kafka or RabbitMQ are valid alternatives. A lightweight solution based on Redis pub/sub channels can also work just fine, and because Redis is much more generally familiar to people, we thought we’d use it for this book.

在 MADE.com,我们使用 Event Store;Kafka 或 RabbitMQ 也是有效的替代方案。一个基于 Redis 的轻量级解决方案, 即 发布/订阅通道,同样可以很好地工作。由于 Redis 更为人所熟知,因此我们决定在本书中使用它。

Note
We’re glossing over the complexity involved in choosing the right messaging platform. Concerns like message ordering, failure handling, and idempotency all need to be thought through. For a few pointers, see [footguns]. 我们在这里略过了选择合适消息平台所涉及的复杂性。比如消息排序、故障处理以及幂等性等问题,都需要仔细考虑。有关一些提示,请参阅 [footguns]

Our new flow will look like Sequence diagram for reallocation flow(重新分配流程的序列图): Redis provides the BatchQuantityChanged event that kicks off the whole process, and our Allocated event is published back out to Redis again at the end.

我们的新流程将会像 Sequence diagram for reallocation flow(重新分配流程的序列图) 所示:Redis 提供了 BatchQuantityChanged 事件来启动整个流程, 而我们的 Allocated 事件在流程结束时会再次发布回 Redis。

apwp 1106
Figure 6. Sequence diagram for reallocation flow(重新分配流程的序列图)
[plantuml, apwp_1106, config=plantuml.cfg]
@startuml
scale 4

Redis -> MessageBus : BatchQuantityChanged event

group BatchQuantityChanged Handler + Unit of Work 1
    MessageBus -> Domain_Model : change batch quantity
    Domain_Model -> MessageBus : emit Allocate command(s)
end


group Allocate Handler + Unit of Work 2 (or more)
    MessageBus -> Domain_Model : allocate
    Domain_Model -> MessageBus : emit Allocated event(s)
end

MessageBus -> Redis : publish to line_allocated channel
@enduml

Test-Driving It All Using an End-to-End Test

通过端到端测试驱动整体功能测试

Here’s how we might start with an end-to-end test. We can use our existing API to create batches, and then we’ll test both inbound and outbound messages:

以下是我们如何从端到端测试开始的方式。我们可以使用现有的 API 创建批次,然后测试传入和传出的消息:

Example 1. An end-to-end test for our pub/sub model (tests/e2e/test_external_events.py)(针对我们的发布/订阅模型的端到端测试)
def test_change_batch_quantity_leading_to_reallocation():
    # start with two batches and an order allocated to one of them  #(1)
    orderid, sku = random_orderid(), random_sku()
    earlier_batch, later_batch = random_batchref("old"), random_batchref("newer")
    api_client.post_to_add_batch(earlier_batch, sku, qty=10, eta="2011-01-01")  #(2)
    api_client.post_to_add_batch(later_batch, sku, qty=10, eta="2011-01-02")
    response = api_client.post_to_allocate(orderid, sku, 10)  #(2)
    assert response.json()["batchref"] == earlier_batch

    subscription = redis_client.subscribe_to("line_allocated")  #(3)

    # change quantity on allocated batch so it's less than our order  #(1)
    redis_client.publish_message(  #(3)
        "change_batch_quantity",
        {"batchref": earlier_batch, "qty": 5},
    )

    # wait until we see a message saying the order has been reallocated  #(1)
    messages = []
    for attempt in Retrying(stop=stop_after_delay(3), reraise=True):  #(4)
        with attempt:
            message = subscription.get_message(timeout=1)
            if message:
                messages.append(message)
                print(messages)
            data = json.loads(messages[-1]["data"])
            assert data["orderid"] == orderid
            assert data["batchref"] == later_batch
  1. You can read the story of what’s going on in this test from the comments: we want to send an event into the system that causes an order line to be reallocated, and we see that reallocation come out as an event in Redis too. 你可以从注释中了解此测试中发生的事情:我们希望将一个事件发送到系统中,触发一个订单项的重新分配, 并且我们也希望看到该重新分配作为一个事件从 Redis 中发布出来。

  2. api_client is a little helper that we refactored out to share between our two test types; it wraps our calls to requests.post. api_client 是一个小助手,我们将其重构出来以在两种测试类型之间共享;它封装了我们对 requests.post 的调用。

  3. redis_client is another little test helper, the details of which don’t really matter; its job is to be able to send and receive messages from various Redis channels. We’ll use a channel called change_batch_quantity to send in our request to change the quantity for a batch, and we’ll listen to another channel called line_allocated to look out for the expected reallocation. redis_client 是另一个小测试助手,其具体实现细节并不重要;它的任务是能够在各种 Redis 通道中发送和接收消息。 我们将使用一个名为 change_batch_quantity 的通道发送请求以更改某个批次的数量,并监听另一个名为 line_allocated 的通道, 用于检查预期的重新分配事件。

  4. Because of the asynchronous nature of the system under test, we need to use the tenacity library again to add a retry loop—first, because it may take some time for our new line_allocated message to arrive, but also because it won’t be the only message on that channel. 由于被测试系统的异步特性,我们需要再次使用 tenacity 库来添加一个重试循环——一方面是因为我们的新 line_allocated 消息可能需要一些时间 才能到达;另一方面是因为这条消息不会是该通道上的唯一消息。

Redis Is Another Thin Adapter Around Our Message Bus

Redis 是围绕我们的消息总线的另一个轻量级适配器

Our Redis pub/sub listener (we call it an event consumer) is very much like Flask: it translates from the outside world to our events:

我们的 Redis 发布/订阅监听器(我们称之为 事件消费者)与 Flask 非常相似:它将外部世界的消息转化为我们的事件:

Example 2. Simple Redis message listener (src/allocation/entrypoints/redis_eventconsumer.py)(简单的 Redis 消息监听器)
r = redis.Redis(**config.get_redis_host_and_port())


def main():
    orm.start_mappers()
    pubsub = r.pubsub(ignore_subscribe_messages=True)
    pubsub.subscribe("change_batch_quantity")  #(1)

    for m in pubsub.listen():
        handle_change_batch_quantity(m)


def handle_change_batch_quantity(m):
    logging.debug("handling %s", m)
    data = json.loads(m["data"])  #(2)
    cmd = commands.ChangeBatchQuantity(ref=data["batchref"], qty=data["qty"])  #(2)
    messagebus.handle(cmd, uow=unit_of_work.SqlAlchemyUnitOfWork())
  1. main() subscribes us to the change_batch_quantity channel on load. main() 在加载时会将我们订阅到 change_batch_quantity 通道上。

  2. Our main job as an entrypoint to the system is to deserialize JSON, convert it to a Command, and pass it to the service layer—​much as the Flask adapter does. 作为系统入口的主要任务是反序列化 JSON,将其转换为一个 Command,并将其传递给服务层——这与 Flask 适配器的工作方式非常相似。

We also build a new downstream adapter to do the opposite job—converting domain events to public events:

我们还构建了一个新的下游适配器来执行相反的工作——将领域事件转换为公共事件:

Example 3. Simple Redis message publisher (src/allocation/adapters/redis_eventpublisher.py)(简单的 Redis 消息发布器)
r = redis.Redis(**config.get_redis_host_and_port())


def publish(channel, event: events.Event):  #(1)
    logging.debug("publishing: channel=%s, event=%s", channel, event)
    r.publish(channel, json.dumps(asdict(event)))
  1. We take a hardcoded channel here, but you could also store a mapping between event classes/names and the appropriate channel, allowing one or more message types to go to different channels. 我们在这里使用了一个硬编码的通道,但你也可以存储事件类/名称与相应通道之间的映射关系,从而允许一种或多种消息类型发送到不同的通道。

Our New Outgoing Event

我们新的传出事件

Here’s what the Allocated event will look like:

以下是 Allocated 事件的样子:

Example 4. New event (src/allocation/domain/events.py)(新事件)
@dataclass
class Allocated(Event):
    orderid: str
    sku: str
    qty: int
    batchref: str

It captures everything we need to know about an allocation: the details of the order line, and which batch it was allocated to.

它捕获了我们需要了解的所有有关分配的信息:订单项的详细信息以及它被分配到的批次。

We add it into our model’s allocate() method (having added a test first, naturally):

我们将其添加到模型的 allocate() 方法中(当然,首先需要先添加一个测试):

Example 5. Product.allocate() emits new event to record what happened (src/allocation/domain/model.py)(Product.allocate() 发出新事件以记录发生的事情)
class Product:
    ...
    def allocate(self, line: OrderLine) -> str:
        ...

            batch.allocate(line)
            self.version_number += 1
            self.events.append(
                events.Allocated(
                    orderid=line.orderid,
                    sku=line.sku,
                    qty=line.qty,
                    batchref=batch.reference,
                )
            )
            return batch.reference

The handler for ChangeBatchQuantity already exists, so all we need to add is a handler that publishes the outgoing event:

ChangeBatchQuantity 的处理器已经存在,所以我们只需要添加一个发布传出事件的处理器即可:

Example 6. The message bus grows (src/allocation/service_layer/messagebus.py)(消息总线的扩展)
HANDLERS = {
    events.Allocated: [handlers.publish_allocated_event],
    events.OutOfStock: [handlers.send_out_of_stock_notification],
}  # type: Dict[Type[events.Event], List[Callable]]

Publishing the event uses our helper function from the Redis wrapper:

发布事件时会使用我们从 Redis 封装中提供的小助手函数:

Example 7. Publish to Redis (src/allocation/service_layer/handlers.py)(发布到 Redis)
def publish_allocated_event(
    event: events.Allocated,
    uow: unit_of_work.AbstractUnitOfWork,
):
    redis_eventpublisher.publish("line_allocated", event)

Internal Versus External Events

内部事件与外部事件

It’s a good idea to keep the distinction between internal and external events clear. Some events may come from the outside, and some events may get upgraded and published externally, but not all of them will. This is particularly important if you get into event sourcing (very much a topic for another book, though).

明确区分内部事件与外部事件是一个好主意。一些事件可能来自外部,一些事件可能会被升级并发布到外部,但并不是所有事件都会如此。这一点特别重要, 如果你深入 事件溯源(尽管这非常适合另一本书的话题)。

Tip
Outbound events are one of the places it’s important to apply validation. See [appendix_validation] for some validation philosophy and examples. 传出事件是需要应用验证的重要场所之一。有关验证的理念和 示例,请参阅 [appendix_validation]
Exercise for the Reader(读者练习)

A nice simple one for this chapter: make it so that the main allocate() use case can also be invoked by an event on a Redis channel, as well as (or instead of) via the API.

本章的一个简单练习:使主要的 allocate() 用例既可以通过 Redis 通道上的事件调用,也可以(或替代)通过 API 调用。

You will likely want to add a new E2E test and feed through some changes into redis_eventconsumer.py.

你可能需要添加一个新的端到端(E2E)测试,并将一些更改引入 redis_eventconsumer.py

Wrap-Up

总结

Events can come from the outside, but they can also be published externally—​our publish handler converts an event to a message on a Redis channel. We use events to talk to the outside world. This kind of temporal decoupling buys us a lot of flexibility in our application integrations, but as always, it comes at a cost.

事件可以 来自 外部,也可以被发布到外部——我们的 publish 处理器将事件转换为 Redis 通道上的消息。我们使用事件与外部世界进行通信。 这种时间解耦为我们的应用程序集成带来了极大的灵活性,但正如往常一样,它也伴随着一定的代价。

Event notification is nice because it implies a low level of coupling, and is pretty simple to set up. It can become problematic, however, if there really is a logical flow that runs over various event notifications...It can be hard to see such a flow as it's not explicit in any program text....This can make it hard to debug and modify.

Martin Fowler, "What do you mean by 'Event-Driven'"

Table 1. Event-based microservices integration: the trade-offs(基于事件的微服务集成:权衡取舍)
Pros(优点) Cons(缺点)
  • Avoids the distributed big ball of mud. 避免了分布式泥球问题。

  • Services are decoupled: it’s easier to change individual services and add new ones. 服务是解耦的:更容易更改单个服务并添加新服务。

  • The overall flows of information are harder to see. 整体的信息流更难以直观查看。

  • Eventual consistency is a new concept to deal with. 最终一致性是需要应对的一个新概念。

  • Message reliability and choices around at-least-once versus at-most-once delivery need thinking through. 需要仔细考虑消息可靠性以及至少一次交付与至多一次交付的选择。

More generally, if you’re moving from a model of synchronous messaging to an async one, you also open up a whole host of problems having to do with message reliability and eventual consistency. Read on to [footguns].

更广泛地说,如果你从同步消息模型转向异步模型,也会引入一系列与消息可靠性和最终一致性相关的问题。请继续阅读 [footguns]