微服务教程 第 07 部分:集成服务:使用分布式事件
微服务之间通信的另一种常见方式是消息传递。通过发布和处理消息,一个微服务可以在另一个微服务中发生事件时执行操作。
ABP 提供了两种类型的事件总线用于松耦合通信:
本地事件总线 适用于进程内消息传递。然而,它不适用于微服务,因为它无法跨不同进程通信。对于分布式系统,请考虑使用分布式事件总线。
分布式事件总线 适用于进程间消息传递,例如微服务之间的分布式事件发布和订阅。然而,除非你配置外部消息代理,否则 ABP 的分布式事件总线默认作为本地(进程内)工作(实际上,默认情况下它使用本地事件总线)。
在本教程中,我们将使用分布式事件总线在 Order 和 Catalog 微服务之间进行通信。
发布事件
在示例场景中,我们希望在创建新订单时发布一个事件。订购服务将在知道新订单创建时发布该事件。目录服务将订阅该事件,并在新订单创建时收到通知。这将减少与新订单相关的产品的库存数量。该场景非常简单,让我们来实现它。
定义事件类
在 IDE 中打开 CloudCrm.OrderingService .NET 解决方案,创建一个 Events 文件夹,并在 CloudCrm.OrderingService.Contracts 项目下创建一个名为 OrderPlacedEto 的新类:
using System;
namespace CloudCrm.OrderingService.Events;
public class OrderPlacedEto
{
public string CustomerName { get; set; }
public Guid ProductId { get; set; }
}
OrderPlacedEto 非常简单。它是一个普通的 C# 类,用于传输与事件相关的数据(ETO 是 Event Transfer Object 的缩写,是一种建议的命名约定,但不是必需的)。如果需要,你可以添加更多属性,但对于本教程来说,这已经足够了。
使用 IDistributedEventBus 服务
IDistributedEventBus 服务将事件发布到事件总线。到目前为止,订购服务仅创建一个订单并插入数据库。让我们更改这一点并发布 OrderPlacedEto 事件,为此打开 CloudCrm.OrderingService 项目,并按如下所示更新 OrderAppService 类:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using CloudCrm.CatalogService.IntegrationServices;
using CloudCrm.OrderingService.Entities;
using CloudCrm.OrderingService.Enums;
using CloudCrm.OrderingService.Events;
using CloudCrm.OrderingService.Localization;
using Volo.Abp.Application.Services;
using Volo.Abp.Domain.Repositories;
using Volo.Abp.EventBus.Distributed;
namespace CloudCrm.OrderingService.Services;
public class OrderAppService : ApplicationService, IOrderAppService
{
private readonly IRepository<Order, Guid> _orderRepository;
private readonly IProductIntegrationService _productIntegrationService;
private readonly IDistributedEventBus _distributedEventBus;
public OrderAppService(
IRepository<Order, Guid> orderRepository,
IProductIntegrationService productIntegrationService,
IDistributedEventBus distributedEventBus)
{
LocalizationResource = typeof(OrderingServiceResource);
_orderRepository = orderRepository;
_productIntegrationService = productIntegrationService;
_distributedEventBus = distributedEventBus;
}
public async Task<List<OrderDto>> GetListAsync()
{
var orders = await _orderRepository.GetListAsync();
// 准备我们需要的产品列表
var productIds = orders.Select(o => o.ProductId).Distinct().ToList();
var products = (await _productIntegrationService
.GetProductsByIdsAsync(productIds))
.ToDictionary(p => p.Id, p => p.Name);
var orderDtos = ObjectMapper.Map<List<Order>, List<OrderDto>>(orders);
orderDtos.ForEach(orderDto =>
{
orderDto.ProductName = products[orderDto.ProductId];
});
return orderDtos;
}
public async Task CreateAsync(OrderCreationDto input)
{
// 创建一个新的 Order 实体
var order = new Order
{
CustomerName = input.CustomerName,
ProductId = input.ProductId,
State = OrderState.Placed
};
// 将其保存到数据库
await _orderRepository.InsertAsync(order);
// 发布事件,以便其他微服务可以获知
await _distributedEventBus.PublishAsync(
new OrderPlacedEto
{
ProductId = order.ProductId,
CustomerName = order.CustomerName
});
}
}
OrderAppService.CreateAsync 方法创建一个新的 Order 实体,将其保存到数据库,最后发布一个 OrderPlacedEto 事件。
订阅事件
目录服务将订阅 OrderPlacedEto 事件,并减少与新订单相关的产品的库存数量。让我们来实现它。
添加对 CloudCrm.OrderingService.Contracts 包的引用
由于 OrderPlacedEto 类在 CloudCrm.OrderingService.Contracts 项目中,我们必须将该包的引用添加到目录服务。这次,我们将使用 ABP Studio 的导入模块功能(作为我们在上一部分的“添加对 CloudCrm.CatalogService.Contracts 包的引用”部分中使用的替代方法)。
打开 ABP Studio UI,如果应用程序正在运行则停止。然后,打开解决方案资源管理器面板,右键单击 CloudCrm.CatalogService。从上下文菜单中选择导入模块:
在打开的对话框中,找到并选择 CloudCrm.OrderingService 模块,勾选安装此模块选项,单击确定按钮:
一旦你单击确定按钮,订购服务就被导入到目录服务。它会打开安装模块对话框:
在这里,在左侧选择 CloudCrm.OrderingService.Contracts 包(因为我们想添加该包引用),在中间区域选择 CloudCrm.CatalogService 包(因为我们想将包引用添加到该项目)。
你可以检查 ABP Studio 的解决方案资源管理器面板以查看模块和项目引用(依赖关系):
处理 OrderPlacedEto 事件
现在,是时候在目录服务中处理 OrderPlacedEto 事件了。在 IDE 中打开 CloudCrm.CatalogService .NET 解决方案。创建一个新的 Orders 文件夹,并在 CloudCrm.CatalogService 项目内的该文件夹中添加一个名为 OrderEventHandler 的新类:
using System;
using System.Threading.Tasks;
using CloudCrm.CatalogService.Products;
using CloudCrm.OrderingService.Events;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Repositories;
using Volo.Abp.EventBus.Distributed;
namespace CloudCrm.CatalogService.Orders;
public class OrderEventHandler :
IDistributedEventHandler<OrderPlacedEto>,
ITransientDependency
{
private readonly IProductRepository _productRepository;
public OrderEventHandler(IProductRepository productRepository)
{
_productRepository = productRepository;
}
public async Task HandleEventAsync(OrderPlacedEto eventData)
{
// 查找相关产品
var product = await _productRepository.FindAsync(eventData.ProductId);
if (product == null)
{
return;
}
// 减少库存数量
product.StockCount = product.StockCount - 1;
// 在数据库中更新实体
await _productRepository.UpdateAsync(product);
}
}
OrderEventHandler 类实现了 IDistributedEventHandler<OrderPlacedEto> 接口来处理 OrderPlacedEto 事件。当事件发布时,HandleEventAsync 方法被调用。在此方法中,我们找到相关产品,将库存数量减少一,并在数据库中更新实体。
实现 ITransientDependency 将 OrderEventHandler 类注册到依赖注入系统作为瞬态对象。
测试订单创建
为了保持本教程的简洁,我们将不会为创建订单实现用户界面。相反,我们将使用 Swagger UI 来创建订单。在 ABP Studio 中打开解决方案运行器面板,并使用启动操作来启动 CloudCrm.OrderingService 和 CloudCrm.CatalogService 应用程序。然后,使用全部启动操作来启动解决方案运行器根项目中列出的其余应用程序。
一旦应用程序运行就绪,浏览 CloudCrm.OrderingService 应用程序。使用 POST /api/ordering/order 端点创建一个新订单:
找到 Order API,单击 Try it out 按钮,在 Request body 部分输入一个示例值,然后单击 Execute 按钮:
{
"customerName": "David",
"productId": "5995897b-1de9-7272-b31c-3a165bbe7b18"
}
重要: 这里,你应该输入数据库中产品表的一个有效的产品 ID!
一旦你按下 Execute 按钮,一个新订单就被创建了。此时,你可以检查 /Orders 页面,看看新订单是否被列出。你也可以检查 /Products 页面,看看在 CloudCrm.Web 应用程序中相关产品的库存数量是否减少了一。
以下是 CloudCrm.Web 应用程序的订单和产品页面的示例截图:
我们为产品 A 创建了一个新订单。结果,产品 A 的库存数量从 53 减少到 52,并且订单页面添加了一新行。
结论
在本教程中,我们使用分布式事件总线在 Order 和 Catalog 微服务之间进行通信。我们在新订单创建时发布了一个事件,并在目录服务中处理该事件以减少相关产品的库存数量。这是一个简单的例子,但它展示了如何使用分布式事件在微服务之间进行通信。
抠丁客








