CAP 是 .NET 生态中的轻量级分布式事务与事件驱动解决方案,提供可靠的事务性消息传递机制,支持跨服务的发布/订阅模式,无缝集成 Kafka、RabbitMQ 等主流消息队列。CAP 确保分布式系统在异步通信场景下实现最终一致性,有效简化微服务架构中复杂事务的协调与管理工作。
更多信息请参阅 CAP 官方文档。
若尚未安装 RabbitMQ,建议先完成 RabbitMQ 安装部署 再进行后续开发。
一、环境配置
1.1 NuGet 包依赖
在 Host.csproj 中按需配置 CAP 的存储(SQL Server)和消息队列(RabbitMQ):
1 2 3 4 5 6 7 8 9
| <ItemGroup> <PackageReference Include="DotNetCore.CAP.SqlServer" Version="8.3.2" /> <PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="8.3.2" /> <PackageReference Include="DotNetCore.CAP.Dashboard" Version="8.3.2" />
<PackageReference Include="FreeSql.Provider.SqlServer" Version="3.5.104" /> </ItemGroup>
|
1.2 全局配置(Program.cs)
在 Program.cs 中初始化 CAP 并注入相关依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| new HostApp(new HostAppOptions { ConfigurePostServices = context => { var appConfig = AppInfo.GetRequiredService<AppConfig>(false); var assemblies = AssemblyHelper.GetAssemblyList(appConfig.AssemblyNames);
var dbConfig = AppInfo.GetRequiredService<DbConfig>(false); var rabbitMQ = context.Configuration.GetSection("CAP:RabbitMq").Get<RabbitMQOptions>();
context.Services.AddCap(config => { config.Version = "v1"; config.UseSqlServer(dbConfig.ConnectionString); config.UseRabbitMQ(mqConfig => { mqConfig.HostName = rabbitMQ.HostName; mqConfig.Port = rabbitMQ.Port; mqConfig.UserName = rabbitMQ.UserName; mqConfig.Password = rabbitMQ.Password; mqConfig.ExchangeName = rabbitMQ.ExchangeName; }); config.UseDashboard(); }).AddSubscriberAssembly(assemblies); } }).Run(args, typeof(Program).Assembly);
|
二、定义订阅与事件
2.1 订阅命名(契约层)
在 Api.Contracts 项目的 Core/Consts 目录下定义订阅名常量:
1 2 3 4 5 6 7 8
|
public class SubscribeNames { [Description("模块操作事件")] public const string ModuleAdd = "zhontai.admin.module.add"; }
|
2.2 事件类(契约层)
在 Api.Contracts 项目的 Services/Module/Events 目录下定义事件类:
1 2 3 4 5 6 7 8 9
|
public class ModuleAddEvent { public long ModuleId { get; set; } public string ModuleName { get; set; } }
|
三、发布消息
通过 ICapPublisher 在服务接口中发送消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| [Order(1010)] [DynamicApi(Area = ApiConsts.AreaName)] public class ModuleService : IDynamicApi { private readonly AppRepositoryBase<ModuleEntity> _moduleRep; private readonly ICapPublisher _capPublisher;
public ModuleService( AppRepositoryBase<ModuleEntity> moduleRep, ICapPublisher capPublisher) { _moduleRep = moduleRep; _capPublisher = capPublisher; }
public async Task<long> AddAsync(ModuleAddInput input) { var entity = Mapper.Map<ModuleEntity>(input); await _moduleRep.InsertAsync(entity);
var moduleAddEvent = input.Adapt<ModuleAddEvent>(); await _capPublisher.PublishAsync( SubscribeNames.ModuleAdd, moduleAddEvent );
return entity.Id; } }
|
四、订阅消息
创建订阅服务类,继承 ICapSubscribe 并标记订阅方法:
1 2 3 4 5 6 7 8 9 10 11 12
| public class ModuleSubscribeService : ICapSubscribe { [NonAction] [CapSubscribe(SubscribeNames.ModuleAdd)] public async Task HandleModuleAddAsync(ModuleAddEvent @event) { } }
|
关键规则
| 规则 |
说明 |
| 接口继承 |
必须实现ICapSubscribe 接口 |
| 特性标记 |
订阅方法需标注[CapSubscribe("事件名")] |
| 参数匹配 |
事件参数类型需与发布时保持一致 |
五、监控与调试
5.1 CAP Dashboard
通过 http://your-domain/cap 访问监控面板,可查看消息状态、重试记录等关键信息。
5.2 日志分析
- 成功订阅:检查日志确认事件是否被正常消费
- 失败处理:CAP 自动重试(默认 50 次),可在 Dashboard 中监控处理进度
注意事项
- 版本隔离:通过
config.Version 区分不同环境的消息,避免环境间消息串扰
- 配置检查:确保 RabbitMQ 和 SQL Server 连接字符串配置正确
- 事件序列化:事件类需支持序列化,建议使用简单数据结构
#消息队列 #RabbitMQ #分布式 #分布式/消息传递 #中台/分布式微服务