Admin Core

CAP 事件总线

CAP 是 .NET 生态中的轻量级分布式事务与事件驱动解决方案,提供可靠的事务性消息传递机制,支持跨服务的发布/订阅模式,无缝集成 Kafka、RabbitMQ 等主流消息队列。CAP 确保分布式系统在异步通信场景下实现最终一致性,有效简

CAP 是 .NET 生态中的轻量级分布式事务与事件驱动解决方案,提供可靠的事务性消息传递机制,支持跨服务的发布/订阅模式,无缝集成 Kafka、RabbitMQ 等主流消息队列。CAP 确保分布式系统在异步通信场景下实现最终一致性,有效简化微服务架构中复杂事务的协调与管理工作。

更多信息请参阅 CAP 官方文档
若尚未安装 RabbitMQ,建议先完成 RabbitMQ 安装部署 再进行后续开发。


一、环境配置

1.1 NuGet 包依赖

Host.csproj 中按需配置 CAP 的存储(SQL Server)和消息队列(RabbitMQ):

1
2
3
4
5
6
7
8
9
<ItemGroup>
<!-- CAP 核心包 -->
<PackageReference Include="DotNetCore.CAP.SqlServer" Version="8.3.2" />
<PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="8.3.2" />
<PackageReference Include="DotNetCore.CAP.Dashboard" Version="8.3.2" />

<!-- 数据库驱动 -->
<PackageReference Include="FreeSql.Provider.SqlServer" Version="3.5.104" />
</ItemGroup>

1.2 全局配置(Program.cs)

Program.cs 中初始化 CAP 并注入相关依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
new HostApp(new HostAppOptions
{
ConfigurePostServices = context =>
{
// 加载应用配置和程序集
var appConfig = AppInfo.GetRequiredService<AppConfig>(false);
var assemblies = AssemblyHelper.GetAssemblyList(appConfig.AssemblyNames);

// 获取数据库和 RabbitMQ 配置
var dbConfig = AppInfo.GetRequiredService<DbConfig>(false);
var rabbitMQ = context.Configuration.GetSection("CAP:RabbitMq").Get<RabbitMQOptions>();

// 配置 CAP
context.Services.AddCap(config =>
{
config.Version = "v1"; // 环境隔离标识
config.UseSqlServer(dbConfig.ConnectionString); // SQL Server 存储
config.UseRabbitMQ(mqConfig => // RabbitMQ 传输
{
mqConfig.HostName = rabbitMQ.HostName;
mqConfig.Port = rabbitMQ.Port;
mqConfig.UserName = rabbitMQ.UserName;
mqConfig.Password = rabbitMQ.Password;
mqConfig.ExchangeName = rabbitMQ.ExchangeName;
});
config.UseDashboard(); // 启用监控面板
}).AddSubscriberAssembly(assemblies); // 自动注册订阅者
}
}).Run(args, typeof(Program).Assembly);

二、定义订阅与事件

2.1 订阅命名(契约层)

Api.Contracts 项目的 Core/Consts 目录下定义订阅名常量:

1
2
3
4
5
6
7
8
/// <summary>
/// 订阅命名常量(确保全局唯一)
/// </summary>
public class SubscribeNames
{
[Description("模块操作事件")]
public const string ModuleAdd = "zhontai.admin.module.add";
}

2.2 事件类(契约层)

Api.Contracts 项目的 Services/Module/Events 目录下定义事件类:

1
2
3
4
5
6
7
8
9
/// <summary>
/// 模块新增事件(约定以 Event 作为后缀名,需支持序列化)
/// </summary>
public class ModuleAddEvent
{
public long ModuleId { get; set; }
public string ModuleName { get; set; }
// 其他业务字段...
}

三、发布消息

通过 ICapPublisher 在服务接口中发送消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[Order(1010)]
[DynamicApi(Area = ApiConsts.AreaName)]
public class ModuleService : IDynamicApi
{
private readonly AppRepositoryBase<ModuleEntity> _moduleRep;
private readonly ICapPublisher _capPublisher;

public ModuleService(
AppRepositoryBase<ModuleEntity> moduleRep,
ICapPublisher capPublisher)
{
_moduleRep = moduleRep;
_capPublisher = capPublisher;
}

/// <summary>
/// 新增模块并发布事件
/// </summary>
public async Task<long> AddAsync(ModuleAddInput input)
{
var entity = Mapper.Map<ModuleEntity>(input);
await _moduleRep.InsertAsync(entity);

// 发布 CAP 消息
var moduleAddEvent = input.Adapt<ModuleAddEvent>();
await _capPublisher.PublishAsync(
SubscribeNames.ModuleAdd,
moduleAddEvent
);

return entity.Id;
}
}

四、订阅消息

创建订阅服务类,继承 ICapSubscribe 并标记订阅方法:

1
2
3
4
5
6
7
8
9
10
11
12
public class ModuleSubscribeService : ICapSubscribe
{
/// <summary>
/// 处理模块新增事件
/// </summary>
[NonAction] // 防止被暴露为 API 端点
[CapSubscribe(SubscribeNames.ModuleAdd)]
public async Task HandleModuleAddAsync(ModuleAddEvent @event)
{
// 业务逻辑:发送通知、更新缓存等
}
}

关键规则

规则 说明
接口继承 必须实现ICapSubscribe 接口
特性标记 订阅方法需标注[CapSubscribe("事件名")]
参数匹配 事件参数类型需与发布时保持一致

五、监控与调试

5.1 CAP Dashboard

通过 http://your-domain/cap 访问监控面板,可查看消息状态、重试记录等关键信息。

5.2 日志分析

  • 成功订阅:检查日志确认事件是否被正常消费
  • 失败处理:CAP 自动重试(默认 50 次),可在 Dashboard 中监控处理进度

注意事项

  1. 版本隔离:通过 config.Version 区分不同环境的消息,避免环境间消息串扰
  2. 配置检查:确保 RabbitMQ 和 SQL Server 连接字符串配置正确
  3. 事件序列化:事件类需支持序列化,建议使用简单数据结构

#消息队列 #RabbitMQ #分布式 #分布式/消息传递 #中台/分布式微服务