分布式事件总线
分布式事件总线系统允许发布和订阅能够跨应用程序/服务边界传输的事件。您可以使用分布式事件总线在微服务或应用程序之间异步发送和接收消息。
提供者
分布式事件总线系统提供了一个抽象层,可以由任何供应商/提供者实现。目前有四种开箱即用的提供者:
LocalDistributedEventBus是默认实现,它将分布式事件总线作为进程内工作。是的!如果您没有配置真正的分布式提供者,**默认实现的工作方式就像本地事件总线**一样。AzureDistributedEventBus使用 Azure 服务总线实现分布式事件总线。请参阅 Azure 服务总线集成文档了解如何配置。RabbitMqDistributedEventBus使用 RabbitMQ 实现分布式事件总线。请参阅 RabbitMQ 集成文档了解如何配置。KafkaDistributedEventBus使用 Kafka 实现分布式事件总线。请参阅 Kafka 集成文档了解如何配置。RebusDistributedEventBus使用 Rebus 实现分布式事件总线。请参阅 Rebus 集成文档了解如何配置。
使用本地事件总线作为默认设置有几个重要优势。最重要的一点是:它允许您编写兼容分布式架构的代码。您现在可以编写一个单体应用程序,以后可以拆分为微服务。通过分布式事件(而不是本地事件)在有界上下文之间(或应用程序模块之间)进行通信是一个很好的实践。
例如,预构建的应用程序模块被设计为在分布式系统中作为服务工作,同时它们也可以作为单体应用程序中的模块工作,而不依赖于外部消息代理。
发布事件
以下各节介绍了发布分布式事件的两种方式。
使用 IDistributedEventBus 发布事件
可以注入IDistributedEventBus并用于发布分布式事件。
示例:当产品的库存数量发生变化时发布分布式事件
using System;
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
namespace AbpDemo
{
public class MyService : ITransientDependency
{
private readonly IDistributedEventBus _distributedEventBus;
public MyService(IDistributedEventBus distributedEventBus)
{
_distributedEventBus = distributedEventBus;
}
public virtual async Task ChangeStockCountAsync(Guid productId, int newCount)
{
await _distributedEventBus.PublishAsync(
new StockCountChangedEto
{
ProductId = productId,
NewCount = newCount
}
);
}
}
}
PublishAsync方法接收事件对象,该对象负责保存与事件相关的数据。它是一个简单的普通类:
using System;
namespace AbpDemo
{
[EventName("MyApp.Product.StockChange")]
public class StockCountChangedEto
{
public Guid ProductId { get; set; }
public int NewCount { get; set; }
}
}
即使您不需要传输任何数据,也需要创建一个类(在这种情况下是一个空类)。
Eto是我们约定使用的事件传输对象后缀。虽然不是必需的,但我们发现它有助于识别此类事件类(就像应用程序层上的DTOs一样)。
事件名称
EventName属性是可选的,但建议使用。如果您没有为事件类型(ETO类)声明它,则事件名称将是事件类的全名,在本例中为AbpDemo.StockCountChangedEto。
关于事件对象的序列化
事件传输对象(ETO)必须是可序列化的,因为当它们被传输到进程外时,将被序列化/反序列化为JSON或其他格式。
避免循环引用、多态性、私有设置器,并作为良好实践(虽然某些序列化器可能容忍),如果存在其他构造函数,请提供默认(空)构造函数,就像DTO一样。
在实体/聚合根类内部发布事件
实体无法通过依赖注入注入服务,但在实体/聚合根类内部发布分布式事件是非常常见的。
示例:在聚合根方法内发布分布式事件
using System;
using Volo.Abp.Domain.Entities;
namespace AbpDemo
{
public class Product : AggregateRoot<Guid>
{
public string Name { get; set; }
public int StockCount { get; private set; }
private Product() { }
public Product(Guid id, string name)
: base(id)
{
Name = name;
}
public void ChangeStockCount(int newCount)
{
StockCount = newCount;
//添加要发布的事件
AddDistributedEvent(
new StockCountChangedEto
{
ProductId = Id,
NewCount = newCount
}
);
}
}
}
AggregateRoot类定义了AddDistributedEvent来添加一个新的分布式事件,当聚合根对象保存(创建、更新或删除)到数据库时,该事件会被发布。
如果一个实体发布了这样的事件,以受控的方式更改相关属性是一个好的做法,就像上面的例子一样 -
StockCount只能通过ChangeStockCount方法更改,这保证了事件的发布。
IGeneratesDomainEvents 接口
实际上,添加分布式事件并不是AggregateRoot类独有的。您可以为任何实体类实现IGeneratesDomainEvents。但是,AggregateRoot默认实现了它,并使其对您来说更容易。
不建议为非聚合根的实体实现此接口,因为它可能不适用于某些数据库提供程序。例如,它适用于EF Core,但不适用于MongoDB。
它是如何实现的?
调用AddDistributedEvent不会立即发布事件。事件在您将更改保存到数据库时发布;
- 对于EF Core,它在
DbContext.SaveChanges时发布。 - 对于MongoDB,当您调用存储库的
InsertAsync、UpdateAsync或DeleteAsync方法时发布(因为MongoDB没有变更跟踪系统)。
订阅事件
服务可以实现IDistributedEventHandler<TEvent>来处理事件。
示例:处理上面定义的StockCountChangedEto事件
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EventBus.Distributed;
namespace AbpDemo
{
public class MyHandler
: IDistributedEventHandler<StockCountChangedEto>,
ITransientDependency
{
public async Task HandleEventAsync(StockCountChangedEto eventData)
{
var productId = eventData.ProductId;
}
}
}
就是这样。
MyHandler会被ABP自动发现,并且每当发生StockCountChangedEto事件时,都会调用HandleEventAsync。- 如果您使用分布式消息代理,如RabbitMQ,ABP会自动订阅消息代理上的事件,获取消息,并执行处理程序。
- 如果事件处理程序成功执行(未抛出任何异常),它会向消息代理发送确认(ACK)。
您可以在此处注入任何服务并执行任何所需的逻辑。单个事件处理程序类可以通过为每个事件类型实现IDistributedEventHandler<TEvent>接口来订阅多个事件。
如果您在事件处理程序中执行数据库操作并使用存储库,您可能需要创建一个工作单元,因为某些存储库方法需要在活动的工作单元内工作。将处理方法标记为virtual并为该方法添加[UnitOfWork]属性,或者手动使用IUnitOfWorkManager创建工作单元作用域。
处理程序类必须注册到依赖注入(DI)系统中。上面的示例使用
ITransientDependency来实现这一点。有关更多选项,请参阅DI文档。
监控分布式事件
ABP允许您在应用程序接收或发送分布式事件时保持通知。此功能使您能够跟踪应用程序内的事件流,并根据接收或发送的分布式事件采取适当的操作。
接收的事件
当您的应用程序从分布式事件总线接收到事件时,会发布DistributedEventReceived本地事件。DistributedEventReceived类具有以下字段:
Source: 它表示分布式事件的来源。来源可以是Direct、Inbox、Outbox。EventName: 它表示接收到的事件的名称。EventData: 它表示与接收到的事件关联的实际数据。由于它是object类型,因此可以保存任何类型的数据。
示例:在您的应用程序从分布式事件总线接收到事件时获得通知
public class DistributedEventReceivedHandler : ILocalEventHandler<DistributedEventReceived>, ITransientDependency
{
public async Task HandleEventAsync(DistributedEventReceived eventData)
{
// TODO: 实现您的逻辑...
}
}
发送的事件
当您的应用程序向分布式事件总线发送事件时,会发布DistributedEventSent本地事件。DistributedEventSent类具有以下字段:
Source: 它表示分布式事件的来源。来源可以是Direct、Inbox、Outbox。EventName: 它表示发送的事件的名称。EventData: 它表示与发送的事件关联的实际数据。由于它是object类型,因此可以保存任何类型的数据。
示例:在您的应用程序向分布式事件总线发送事件时获得通知
public class DistributedEventSentHandler : ILocalEventHandler<DistributedEventSent>, ITransientDependency
{
public async Task HandleEventAsync(DistributedEventSent eventData)
{
// TODO: 实现您的逻辑...
}
}
通过如上例所示订阅DistributedEventReceived和DistributedEventSent本地事件,您可以将事件跟踪功能无缝集成到您的应用程序中。这使您能够有效地监控消息流,诊断任何潜在问题,并深入了解分布式消息系统的行为。
预定义事件
一旦您配置了,ABP会为实体的创建、更新和删除操作自动发布分布式事件。
事件类型
有三种预定义的事件类型:
EntityCreatedEto<T>在类型为T的实体被创建时发布。EntityUpdatedEto<T>在类型为T的实体被更新时发布。EntityDeletedEto<T>在类型为T的实体被删除时发布。
这些类型是泛型的。T实际上是事件传输对象(ETO)的类型,而不是实体的类型。因为实体对象不能作为事件数据的一部分传输。因此,通常为实体类定义一个ETO类,例如为Product实体定义ProductEto。
订阅事件
订阅自动事件与订阅常规分布式事件相同。
示例:在产品更新时获得通知
using System.Threading.Tasks;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities.Events.Distributed;
using Volo.Abp.EventBus.Distributed;
namespace AbpDemo
{
public class MyHandler :
IDistributedEventHandler<EntityUpdatedEto<ProductEto>>,
ITransientDependency
{
public async Task HandleEventAsync(EntityUpdatedEto<ProductEto> eventData)
{
var productId = eventData.Entity.Id;
//TODO
}
}
}
MyHandler实现了IDistributedEventHandler<EntityUpdatedEto<ProductEto>>。- 需要将您的处理程序类注册到依赖注入系统中。像本示例中那样实现
ITransientDependency是一种简单的方法。
配置
您可以在模块的ConfigureServices中配置AbpDistributedEntityEventOptions以添加选择器。
示例:配置示例
Configure<AbpDistributedEntityEventOptions>(options =>
{
//为所有实体启用
options.AutoEventSelectors.AddAll();
//为单个实体启用
options.AutoEventSelectors.Add<Product>();
//为命名空间(及子命名空间)中的所有实体启用
options.AutoEventSelectors.AddNamespace("MyProject.Products");
//应返回true以选择类型的自定义谓词表达式
options.AutoEventSelectors.Add(
type => type.Namespace.StartsWith("MyProject.")
);
//忽略单个实体
options.IgnoredEventSelectors.Add<IgnoredProductEntity>();
});
type.Namespace.StartsWith("MyProject.")提供了灵活性来决定是否应为给定的实体类型发布事件。返回true以接受Type。IgnoredEventSelectors用于忽略指定实体类型的事件。如果您为所有实体启用了事件,但想忽略某些实体,这很有用。
您可以添加多个选择器。如果一个实体类型匹配了任何一个选择器,那么它将被选中。
事件传输对象
一旦您为实体启用自动事件,ABP就会开始在此实体发生更改时发布事件。如果您没有为实体指定相应的事件传输对象(ETO),ABP将使用一个标准类型,名为EntityEto,它只有两个属性:
EntityType(string): 实体类的全名(包括命名空间)。KeysAsString(string): 已更改实体的主键。如果它有一个主键,此属性将是主键值。对于复合键,它将包含所有键,用,(逗号)分隔。
因此,您可以实现IDistributedEventHandler<EntityUpdatedEto<EntityEto>>来订阅更新事件。然而,订阅这样一个通用事件并不是一个好的做法,因为您将在单个处理程序中处理所有实体的更新事件(因为它们都使用相同的ETO对象)。您可以为实体类型定义相应的ETO类型。
示例:声明为Product实体使用ProductEto
public class ProductEto
{
public Guid Id { get; set; }
public string Name { get; set; }
public float Price { get; set; }
}
然后您可以使用AbpDistributedEntityEventOptions.EtoMappings选项将您的Product实体映射到ProductEto:
Configure<AbpDistributedEntityEventOptions>(options =>
{
options.AutoEventSelectors.Add<Product>();
options.EtoMappings.Add<Product, ProductEto>();
});
此示例;
- 添加了一个选择器,允许为
Product实体发布创建、更新和删除事件。 - 配置使用
ProductEto作为与Product相关事件发布的事件传输对象。
分布式事件系统使用对象到对象映射系统将
Product对象映射到ProductEto对象。因此,您还需要配置对象映射(Product->ProductEto)。您可以查看对象到对象映射文档了解如何操作。
实体同步器
在分布式(或微服务)系统中,通常需要订阅另一个服务的实体类型的更改事件,这样您就可以在所订阅的实体更改时获得通知。在这种情况下,您可以如前一部分所述使用ABP的预定义事件。
如果您的目的是存储远程实体的本地副本,您通常会订阅远程实体的创建、更新和删除事件,并在事件处理程序中更新本地数据库。ABP提供了一个预构建的EntitySynchronizer基类,以使该操作更容易。
假设在目录微服务中有一个Product实体(可能是一个聚合根实体),并且您希望在Ordering微服务中保存产品的副本,使用一个本地OrderProduct实体。实际上,OrderProduct类的属性将是Product属性的子集,因为在Ordering微服务中不需要所有产品数据(但是,如果需要,可以进行完整复制)。此外,OrderProduct实体可能具有在Ordering微服务中填充和使用的其他属性。
建立同步的第一步是在目录微服务中定义一个ETO(事件传输对象)类,用于传输事件数据。假设Product实体有一个Guid主键,您的ETO可以如下所示:
[EventName("product")]
public class ProductEto : EntityEto<Guid>
{
// 您的 Product 属性在这里...
}
ProductEto可以放在一个共享项目(DLL)中,该库被目录和Ordering微服务引用。或者,如果您不想在服务之间引入公共项目依赖,可以在Ordering微服务中放置ProductEto类的副本。在这种情况下,EventName属性变得至关重要,用于映射两个服务中的ProductEto类(您应该使用相同的事件名称)。
定义ETO类后,您应该配置ABP为Product实体发布自动(创建、更新和删除)事件,如前一部分所述:
Configure<AbpDistributedEntityEventOptions>(options =>
{
options.AutoEventSelectors.Add<Product>();
options.EtoMappings.Add<Product, ProductEto>();
});
最后,您应该在Ordering微服务中创建一个继承自EntitySynchronizer类的类:
public class ProductSynchronizer : EntitySynchronizer<OrderProduct, Guid, ProductEto>
{
public ProductSynchronizer(
IObjectMapper objectMapper,
IRepository<OrderProduct, Guid> repository
) : base(objectMapper, repository)
{
}
}
这个类的主要点是它订阅源实体的创建、更新和删除事件,并在数据库中更新本地实体。它使用对象映射器系统从ProductEto对象创建或更新OrderProduct对象。因此,您还应该配置对象映射器以使其正常工作。否则,您应该通过覆盖ProductSynchronizer类中的MapToEntityAsync(TSourceEntityEto)和MapToEntityAsync(TSourceEntityEto,TEntity)方法手动执行对象映射。
如果您的实体具有复合主键(请参阅实体文档),那么您应该继承EntitySynchronizer<TEntity, TSourceEntityEto>类(只需在前面的示例中不使用Guid泛型参数),并实现FindLocalEntityAsync以使用Repository在本地数据库中查找实体。
EntitySynchronizer与实体版本控制系统兼容(请参阅实体文档)。因此,即使事件接收顺序错乱,它也能按预期工作。如果本地数据库中的实体版本比接收到的事件中的实体新,则忽略该事件。您应该为实体和ETO类实现IHasEntityVersion接口(对于此示例,您应该为Product、ProductEto和OrderProduct类实现)。
如果您想忽略某些类型的更改事件,可以在类的构造函数中设置IgnoreEntityCreatedEvent、IgnoreEntityUpdatedEvent和IgnoreEntityDeletedEvent。示例:
public class ProductSynchronizer
: EntitySynchronizer<OrderProduct, Guid, ProductEto>
{
public ProductSynchronizer(
IObjectMapper objectMapper,
IRepository<OrderProduct, Guid> repository
) : base(objectMapper, repository)
{
IgnoreEntityDeletedEvent = true;
}
}
请注意,
EntitySynchronizer只能在您使用它之后创建/更新实体。如果您有一个具有现有数据的现有系统,您应该手动复制数据一次,因为EntitySynchronizer将开始工作。
事务与异常处理
分布式事件总线在进程内工作(因为默认实现是LocalDistributedEventBus),除非您配置了实际的提供者(例如Kafka或RabbitMQ)。进程内事件总线总是在您发布事件的同一个工作单元作用域内执行事件处理程序。这意味着,如果事件处理程序抛出异常,则相关工作单元(数据库事务)将回滚。通过这种方式,您的应用程序逻辑和事件处理逻辑变得具有事务性(原子性)和一致性。如果您想忽略事件处理程序中的错误,必须在处理程序中使用try-catch块,并且不应重新抛出异常。
当您切换到实际的分布式事件总线提供者(例如Kafka或RabbitMQ)时,事件处理程序将在不同的进程/应用程序中执行,因为它们的目的是创建分布式系统。在这种情况下,实现事务性事件发布的唯一方法是使用事务性事件的发件箱/收件箱部分中说明的发件箱/收件箱模式。
如果您未配置发件箱/收件箱模式或使用LocalDistributedEventBus,则默认情况下,事件在工作单元结束时发布,恰好在工作单元完成之前(这意味着事件处理程序中的异常仍会回滚工作单元),即使您在工作单元中间发布它们。如果您想立即发布事件,可以在使用IDistributedEventBus.PublishAsync方法时将onUnitOfWorkComplete设置为false。
建议保持默认行为,除非您有特殊要求。在实体/聚合根类内部发布事件时(请参阅在实体/聚合根类内部发布事件部分),
onUnitOfWorkComplete选项不可用。
事务性事件的发件箱/收件箱
事务性发件箱模式用于在与操作应用程序数据库相同的事务中发布分布式事件。当您启用发件箱时,分布式事件将与您的数据更改在同一事务中保存到数据库中,然后由单独的 后台工作器 通过重试系统发送到实际的消息代理。通过这种方式,它确保了数据库状态与已发布事件之间的一致性。
另一方面,事务性收件箱模式首先将传入事件保存到数据库中。然后(在 后台工作器 中)以事务性方式执行事件处理程序,并在同一事务中将事件从收件箱队列中删除。它通过将已处理的消息保留一段时间并丢弃从消息代理接收到的重复事件,确保事件仅被执行一次。
为您的应用程序启用事件发件箱和收件箱系统需要一些手动步骤。请按照以下部分的说明进行操作以使其运行。
发件箱和收件箱可以分别启用和配置,因此如果您愿意,可以只使用其中之一。
先决条件
- 当您运行应用程序/服务的多个实例时,发件箱/收件箱系统使用分布式锁系统来处理并发性。因此,您应该按照本文档中的说明使用其中一个提供者配置分布式锁系统。
- 发件箱/收件箱系统开箱即用地支持 Entity Framework Core(EF Core)和 MongoDB 数据库提供者。因此,您的应用程序应使用其中一种数据库提供者。对于其他数据库提供者,请参阅实现自定义数据库提供者部分。
如果您使用MongoDB,请确保启用了MongoDB 4.0版本中引入的多文档数据库事务。请参阅 MongoDB 文档的事务部分。
启用事件发件箱
启用事件发件箱取决于您的数据库提供者。
为 Entity Framework Core 启用事件发件箱
打开您的DbContext类,实现IHasEventOutbox接口。您最终应该在DbContext类中添加一个DbSet属性:
public DbSet<OutgoingEventRecord> OutgoingEvents { get; set; }
在DbContext类的OnModelCreating方法中添加以下行:
builder.ConfigureEventOutbox();
使用标准的Add-Migration和Update-Database命令将更改应用到您的数据库。如果想使用命令行终端,请在数据库集成项目的根目录中运行以下命令:
dotnet ef migrations add "Added_Event_Outbox"
dotnet ef database update
最后,在您的模块类的ConfigureServices方法中编写以下配置代码(将YourDbContext替换为您自己的DbContext类):
Configure<AbpDistributedEventBusOptions>(options =>
{
options.Outboxes.Configure(config =>
{
config.UseDbContext<YourDbContext>();
});
});
为 MongoDB 启用事件发件箱
打开您的DbContext类,实现IHasEventOutbox接口。您最终应该在DbContext类中添加一个IMongoCollection属性:
public IMongoCollection<OutgoingEventRecord> OutgoingEvents => Collection<OutgoingEventRecord>();
在DbContext类的CreateModel方法中添加以下行:
modelBuilder.ConfigureEventOutbox();
最后,在您的模块类的ConfigureServices方法中编写以下配置代码(将YourDbContext替换为您自己的DbContext类):
Configure<AbpDistributedEventBusOptions>(options =>
{
options.Outboxes.Configure(config =>
{
config.UseMongoDbContext<MyProjectNameDbContext>();
});
});
发件箱的分布式锁定
重要提示:发件箱发送服务使用分布式锁来确保应用程序的单个实例并发使用发件箱队列。每个数据库的分布式锁键应该是唯一的。
config对象(在options.Outboxes.Configure(...)方法中)有一个DatabaseName属性,该属性用于分布式锁键以确保唯一性。DatabaseName由UseDbContext方法自动设置,从YourDbContext类的ConnectionStringName属性获取数据库名称。因此,如果您的系统中有多个数据库,请确保对同一数据库使用相同的连接字符串名称,但对不同的数据库使用不同的连接字符串名称。如果无法确保这一点,可以手动设置config.DatabaseName(在UseDbContext行之后)以确保唯一性。
启用事件收件箱
启用事件收件箱取决于您的数据库提供者。
为 Entity Framework Core 启用事件收件箱
打开您的DbContext类,实现IHasEventInbox接口。您最终应该在DbContext类中添加一个DbSet属性:
public DbSet<IncomingEventRecord> IncomingEvents { get; set; }
在DbContext类的OnModelCreating方法中添加以下行:
builder.ConfigureEventInbox();
使用标准的Add-Migration和Update-Database命令将更改应用到您的数据库。如果想使用命令行终端,请在数据库集成项目的根目录中运行以下命令:
dotnet ef migrations add "Added_Event_Inbox"
dotnet ef database update
最后,在您的模块类的ConfigureServices方法中编写以下配置代码(将YourDbContext替换为您自己的DbContext类):
Configure<AbpDistributedEventBusOptions>(options =>
{
options.Inboxes.Configure(config =>
{
config.UseDbContext<YourDbContext>();
});
});
为 MongoDB 启用事件收件箱
打开您的DbContext类,实现IHasEventInbox接口。您最终应该在DbContext类中添加一个IMongoCollection属性:
public IMongoCollection<IncomingEventRecord> IncomingEvents => Collection<IncomingEventRecord>();
在DbContext类的CreateModel方法中添加以下行:
modelBuilder.ConfigureEventInbox();
最后,在您的模块类的ConfigureServices方法中编写以下配置代码(将YourDbContext替换为您自己的DbContext类):
Configure<AbpDistributedEventBusOptions>(options =>
{
options.Inboxes.Configure(config =>
{
config.UseMongoDbContext<MyProjectNameDbContext>();
});
});
收件箱的分布式锁定
重要提示:收件箱处理服务使用分布式锁来确保应用程序的单个实例并发使用收件箱队列。每个数据库的分布式锁键应该是唯一的。
config对象(在options.Inboxes.Configure(...)方法中)有一个DatabaseName属性,该属性用于分布式锁键以确保唯一性。DatabaseName由UseDbContext方法自动设置,从YourDbContext类的ConnectionStringName属性获取数据库名称。因此,如果您的系统中有多个数据库,请确保对同一数据库使用相同的连接字符串名称,但对不同的数据库使用不同的连接字符串名称。如果无法确保这一点,可以手动设置config.DatabaseName(在UseDbContext行之后)以确保唯一性。
额外配置
默认配置在大多数情况下已足够。但是,您可能希望为发件箱和收件箱设置一些选项。
发件箱配置
记住发件箱是如何配置的:
Configure<AbpDistributedEventBusOptions>(options =>
{
options.Outboxes.Configure(config =>
{
// TODO: 设置选项
});
});
此处,config对象上提供以下属性:
IsSendingEnabled(默认值:true):可以设置为false以禁用向实际事件总线发送发件箱事件。如果禁用此功能,事件仍可以添加到发件箱,但不会发送。如果您有多个应用程序(或应用程序实例)写入发件箱,但使用其中一个发送事件,这会很有帮助。Selector:用于过滤用于此配置的事件(ETO)类型的谓词。应返回true以选择事件。默认情况下选择所有事件。如果您想忽略发件箱中的某些ETO类型,或者想定义命名的发件箱配置并将事件分组到这些配置中,这尤其有用。请参阅命名配置部分。ImplementationType:实现发件箱数据库操作的类的类型。这通常在您调用UseDbContext时设置,如前所示。有关高级用法,请参阅实现自定义发件箱/收件箱数据库提供者部分。DatabaseName:用于此发件箱配置的数据库的唯一名称。请参阅启用事件发件箱/收件箱部分末尾的重要提示段落。
收件箱配置
记住收件箱是如何配置的:
Configure<AbpDistributedEventBusOptions>(options =>
{
options.Inboxes.Configure(config =>
{
// TODO: 设置选项
});
});
此处,config对象上提供以下属性:
IsProcessingEnabled(默认值:true):可以设置为false以禁用处理(执行)收件箱中的事件。如果禁用此功能,事件仍可以接收,但不会执行。如果您有多个应用程序(或应用程序实例),但使用其中一个执行事件处理程序,这会很有帮助。EventSelector:用于过滤用于此配置的事件(ETO)类型的谓词。如果您想忽略收件箱中的某些ETO类型,或者想定义命名的收件箱配置并将事件分组到这些配置中,这尤其有用。请参阅命名配置部分。HandlerSelector:用于过滤用于此配置的事件处理类型(实现IDistributedEventHandler<TEvent>接口的类)的谓词。如果您想忽略收件箱处理中的某些事件处理程序类型,或者想定义命名的收件箱配置并将事件处理程序分组到这些配置中,这尤其有用。请参阅命名配置部分。ImplementationType:实现收件箱数据库操作的类的类型。这通常在您调用UseDbContext时设置,如前所示。有关高级用法,请参阅实现自定义发件箱/收件箱数据库提供者部分。DatabaseName:用于此发件箱配置的数据库的唯一名称。请参阅启用事件收件箱部分末尾的重要提示段落。
AbpEventBusBoxesOptions
AbpEventBusBoxesOptions可用于微调收件箱和发件箱系统的工作方式。对于大多数系统,使用默认值就足够了,但您可以根据需要进行配置以优化系统。
就像所有的选项类一样,可以在模块类的ConfigureServices方法中配置AbpEventBusBoxesOptions,如下面的代码块所示:
Configure<AbpEventBusBoxesOptions>(options =>
{
// TODO: 配置选项
});
AbpEventBusBoxesOptions有以下属性可以配置:
BatchPublishOutboxEvents:可用于启用或禁用批量发布事件到消息代理。如果分布式事件总线提供者支持,批量发布将工作。如果不支持,则作为后备逻辑逐个发送事件。保持启用状态,因为它在可能的情况下具有显著的性能优势。默认值为true(启用)。PeriodTimeSpan:收件箱和发件箱消息处理器检查数据库中是否有新事件的周期。默认值为2秒(TimeSpan.FromSeconds(2))。CleanOldEventTimeIntervalSpan:事件收件箱系统定期检查并删除数据库中旧的已处理事件。您可以设置此值以确定检查周期。默认值为6小时(TimeSpan.FromHours(6))。WaitTimeToDeleteProcessedInboxEvents:收件箱事件即使成功处理,也不会从数据库中立即删除。这是为了防止系统多次处理同一事件(如果事件代理发送两次)。此配置值决定了保留已处理事件的时间。默认值为2小时(TimeSpan.FromHours(2))。InboxWaitingEventMaxCount:一次从数据库收件箱中查询的最大事件数。默认值为1000。OutboxWaitingEventMaxCount:一次从数据库发件箱中查询的最大事件数。默认值为1000。DistributedLockWaitDuration:当运行同一应用程序的多个实例时,ABP使用分布式锁来防止并发访问数据库中的收件箱和发件箱消息。如果应用程序的实例无法获取锁,它会在一段时间后重试。这是该时间的配置。默认值为15秒(TimeSpan.FromSeconds(15))。InboxProcessorFailurePolicy:收件箱处理器失败的处理策略。默认值为Retry。可能的值有:Retry:当前异常及后续事件将在下一个周期中继续按顺序处理。RetryLater:跳过导致异常的事件,继续处理后续事件。失败的事件将在延迟后重试,延迟时间随每次重试加倍,从配置的InboxProcessorRetryBackoffFactor开始(例如,10、20、40、80秒)。默认最大重试次数为10(可配置)。如果在达到最大重试次数后仍然失败,则丢弃事件。Discard:导致异常的事件将被丢弃,不会重试。
InboxProcessorRetryBackoffFactor:当InboxProcessorFailurePolicy为RetryLater时使用的初始重试延迟因子(双精度)。重试延迟计算为:delay = InboxProcessorRetryBackoffFactor × 2^retryCount。默认值为10。
跳过发件箱
IDistributedEventBus.PublishAsync方法提供了一个可选参数useOutbox,默认设置为true。如果您想绕过发件箱并立即发布事件,可以为特定的事件发布操作将其设置为false。
高级主题
命名配置
本节解释的所有概念也适用于收件箱配置。我们将仅针对发件箱提供示例,以保持文档简洁。
请参见以下发件箱配置代码:
Configure<AbpDistributedEventBusOptions>(options =>
{
options.Outboxes.Configure(config =>
{
//TODO
});
});
这等同于以下代码:
Configure<AbpDistributedEventBusOptions>(options =>
{
options.Outboxes.Configure("Default", config =>
{
//TODO
});
});
此代码中的Default表示配置名称。如果您未指定它(如前面的代码块),则使用Default作为配置名称。
这意味着您可以定义多个具有不同名称的发件箱(以及收件箱)配置。ABP将运行所有已配置的发件箱。
如果您的应用程序有多个数据库,并且您希望为不同的数据库运行不同的发件箱队列,则可能需要多个发件箱。在这种情况下,您可以使用Selector选项来决定哪些事件应由某个发件箱处理。请参阅上面的额外配置部分。
实现自定义发件箱/收件箱数据库提供者
如果您的应用程序或服务使用的数据库提供者不是 EF Core 和 MongoDB,您应该手动将发件箱/收件箱系统与您的数据库提供者集成。
发件箱和收件箱表/数据必须与您的应用程序数据存储在同一个数据库中(因为我们希望创建一个单一数据库事务,该事务包含应用程序的数据库操作和发件箱/收件箱表操作)。否则,您应该关心分布式(多数据库)事务支持,大多数供应商不提供此支持,并且可能需要额外配置。
ABP提供了IEventOutbox和IEventInbox抽象作为发件箱/收件箱系统的扩展点。您可以通过实现这些接口来创建类,并将它们注册到依赖注入。
一旦实现了自定义事件箱,就可以配置AbpDistributedEventBusOptions以使用您的事件箱类:
Configure<AbpDistributedEventBusOptions>(options =>
{
options.Outboxes.Configure(config =>
{
config.ImplementationType = typeof(MyOutbox); //您的发件箱类
});
options.Inboxes.Configure(config =>
{
config.ImplementationType = typeof(MyInbox); //您的收件箱类
});
});
抠丁客


