官网: https://cap.dotnetcore.xyz/
是一个 EventBus,同时也是一个在微服务或者 SOA 系统中解决分布式事务问题的一个框架。它有助于创建可扩展,可靠并且易于更改的微服务系统。
事件总线是一种机制,它允许不同的组件彼此通信而不彼此了解。 组件可以将事件发送到 Eventbus,而无需知道是谁来接听或有多少其他人来接听。 组件也可以侦听 Eventbus 上的事件,而无需知道谁发送了事件。 这样,组件可以相互通信而无需相互依赖。 同样,很容易替换一个组件。 只要新组件了解正在发送和接收的事件,其他组件就永远不会知道.
在你的.NET Core 项目中,通过 NuGet 包管理器安装 CAP。
dotnet add package DotNetCore.CAP
dotnet add package DotNetCore.CAP.RabbitMQ
dotnet add package DotNetCore.CAP.MySql
dotnet add package DotNetCore.CAP.Dashboard #Dashboard
dotnet add package Pomelo.EntityFrameworkCore.MySql #这个之后主要用于幂等性判断,可以不要
/// <summary>
/// 添加分布式事务服务
/// </summary>
/// <param name="services">服务集合</param>
/// <param name="capSection">cap链接项</param>
/// <param name="rabbitMQSection">rabbitmq配置项</param>
/// <param name="expiredTime">成功消息过期时间</param>
/// <returns></returns>
public static IServiceCollection AddMCodeCap(this IServiceCollection services, Action<CapOptions> configure = null, string capSection = "cap", string rabbitMQSection = "rabbitmq")
{
var rabbitMQOptions = ServiceProviderServiceExtensions.GetRequiredService<IConfiguration>(services.BuildServiceProvider()).GetSection(rabbitMQSection).Get<RabbitMQOptions>();
var logger = ServiceProviderServiceExtensions.GetRequiredService<ILogger<CapContext>>(services.BuildServiceProvider());
if (rabbitMQOptions == null)
{
throw new ArgumentNullException("rabbitmq not config.");
}
var capJson = ServiceProviderServiceExtensions.GetRequiredService<IConfiguration>(services.BuildServiceProvider()).GetValue<string>(capSection);
if (string.IsNullOrEmpty(capJson))
{
throw new ArgumentException("cap未设置");
}
//services.AddDbContext<CapContext>(options => options.UseMySql(capJson, ServerVersion.AutoDetect(capJson)));
services.AddCap(x =>
{
//使用RabbitMQ传输
x.UseRabbitMQ(opt => { opt = rabbitMQOptions; });
////使用MySQL持久化
x.UseMySql(capJson);
//x.UseEntityFramework<CapContext>();
x.UseDashboard();
//成功消息的过期时间(秒)
x.SucceedMessageExpiredAfter = 10 * 24 * 3600;
x.FailedRetryCount = 5;
//失败回调,通过企业微信,短信通知人工干预
x.FailedThresholdCallback = (e) =>
{
if (e.MessageType == MessageType.Publish)
{
logger.LogError("Cap发送消息失败;" + JsonExtension.Serialize(e.Message));
}
else if (e.MessageType == MessageType.Subscribe)
{
logger.LogError("Cap接收消息失败;" + JsonExtension.Serialize(e.Message));
}
};
configure?.Invoke(x);
});
return services;
}
internal class JsonExtension
{
private static readonly JsonSerializerSettings _jsonSerializerSettings;
internal static JsonSerializerSettings CustomSerializerSettings;
static JsonExtension()
{
_jsonSerializerSettings = DefaultSerializerSettings;
}
internal static JsonSerializerSettings DefaultSerializerSettings
{
get
{
var settings = new JsonSerializerSettings();
// 设置如何将日期写入JSON文本。默认值为“IsoDateFormat”
//settings.DateFormatHandling = DateFormatHandling.IsoDateFormat;
// 设置在序列化和反序列化期间如何处理DateTime时区。默认值为 “RoundtripKind”
//settings.DateTimeZoneHandling = DateTimeZoneHandling.RoundtripKind;
// 设置在序列化和反序列化期间如何处理默认值。默认值为“Include”
//settings.DefaultValueHandling = DefaultValueHandling.Include;
// 设置写入JSON文本时DateTime和DateTimeOffset值的格式,以及读取JSON文本时预期的日期格式。默认值为“ yyyy'-'MM'-'dd'T'HH':'mm':'ss.FFFFFFFK ”。
settings.DateFormatString = "yyyy-MM-dd HH:mm:ss";
// 设置在序列化和反序列化期间如何处理空值。默认值为“Include”
//settings.NullValueHandling = NullValueHandling.Include;
// 设置序列化程序在将.net对象序列化为JSON时使用的契约解析器
settings.ContractResolver = new CamelCasePropertyNamesContractResolver();
// 设置如何处理引用循环(例如,类引用自身)。默认值为“Error”。
settings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore;
// 是否格式化文本
settings.Formatting = Formatting.Indented;
//支持将Enum 由默认 Number类型 转换为String
//settings.SerializerSettings.Converters.Add(new StringEnumConverter());
//将long类型转为string
//settings.SerializerSettings.Converters.Add(new NumberConverter(NumberConverterShip.Int64));
return settings;
}
}
public static T Deserialize<T>(string json, JsonSerializerSettings serializerSettings = null)
{
if (string.IsNullOrEmpty(json)) return default;
if (serializerSettings == null) serializerSettings = _jsonSerializerSettings;
//值类型和String类型
if (typeof(T).IsValueType || typeof(T) == typeof(string))
{
return (T)Convert.ChangeType(json, typeof(T));
}
return JsonConvert.DeserializeObject<T>(json, CustomSerializerSettings ?? serializerSettings);
}
public static object Deserialize(string json, Type type, JsonSerializerSettings serializerSettings = null)
{
if (string.IsNullOrEmpty(json)) return default;
if (serializerSettings == null) serializerSettings = _jsonSerializerSettings;
return JsonConvert.DeserializeObject(json,type, CustomSerializerSettings ?? serializerSettings);
}
public static string Serialize<T>(T obj, JsonSerializerSettings serializerSettings = null)
{
if (obj is null) return string.Empty;
if (obj is string) return obj.ToString();
if (serializerSettings == null) serializerSettings = _jsonSerializerSettings;
return JsonConvert.SerializeObject(obj, CustomSerializerSettings ?? serializerSettings);
}
}
appsettings.json
{
"cap": "Server=127.0.0.1;Port=3306;Database=spring;Uid=root;Pwd=123456;Allow User Variables=true;Pooling=true;Min Pool Size=0;Max Pool Size=100;Connection Lifetime=0;",
"rabbitmq": {
"HostName": "127.0.0.1",
"Port": 5672,
"UserName": "guest",
"Password": "guest",
"VirtualHost": "/"
}
}
public class YourService
{
private readonly ICapPublisher _capPublisher;
public YourService(ICapPublisher capPublisher)
{
_capPublisher = capPublisher;
}
public async Task DoSomethingAsync()
{
// ... 业务逻辑 ...
await _capPublisher.PublishAsync("your.event.name", new YourEventData { /* ... */ },"callback.name");
}
}
你需要实现一个事件处理器来订阅并处理事件。这通常是通过继承 ICapSubscribe 接口或使用 CAP 的[CapSubscribe]属性来实现的
public class YourEventHandler : ICapSubscribe
{
[CapSubscribe("your.event.name")]
public async Task Handle(YourEventData eventData)
{
// 处理事件逻辑
}
}
或者,使用特性:
[CapSubscribe("your.event.name")]
public class YourEventHandler
{
public async Task Handle(YourEventData eventData)
{
// 处理事件逻辑
}
}
默认值:cap.queue.
默认的消费者组的名字,在不同的 Transports 中对应不同的名字,可以通过自定义此值来自定义不同 Transports 中的名字,以便于查看。
默认值:Null
为订阅 Group 统一添加前缀。 https://github.com/dotnetcore/CAP/pull/780
默认值: Null
为 Topic 统一添加前缀。 https://github.com/dotnetcore/CAP/pull/780
默认值:v1
用于给消息指定版本来隔离不同版本服务的消息,常用于A/B测试或者多服务版本的场景。以下是其应用场景:
默认值:60 秒
在消息发送的时候,如果发送失败,CAP将会对消息进行重试,此配置项用来配置每次重试的间隔时间。
在消息消费的过程中,如果消费失败,CAP将会对消息进行重试消费,此配置项用来配置每次重试的间隔时间。
默认值:1
消费者线程并行处理消息的线程数,当这个值大于1时,将不能保证消息执行的顺序。
默认值:50
重试的最大次数。当达到此设置值时,将不会再继续重试,通过改变此参数来设置重试的最大次数。
默认值:24*3600 秒(1天后)
成功消息的过期时间(秒)。 当消息发送或者消费成功时候,在时间达到
SucceedMessageExpiredAfter
秒时候将会从 Persistent 中删除,你可以通过指定此值来设置过期的时间。
默认值:15 24 3600 秒(15天后)
失败消息的过期时间(秒)。 当消息发送或者消费失败时候,在时间达到
FailedMessageExpiredAfter
秒时候将会从 Persistent 中删除,你可以通过指定此值来设置过期的时间。
默认值: false
默认情况下,发送的消息都先放置到内存同一个Channel中,然后线性处理。 如果设置为 true,则发送消息的任务将由.NET线程池并行处理,这会大大提高发送的速度。
某些情况下,消费者需要返回值以告诉发布者执行结果,以便于发布者实施一些动作,通常情况下这属于补偿范围。
你可以在消费者执行的代码中通过重新发布一个新消息来通知上游,CAP 提供了一种简单的方式来做到这一点。 你可以在发送的时候指定
callbackName
来得到消费者的执行结果,通常这仅适用于点对点的消费。以下是一个示例。
例如,在一个电商程序中,订单初始状态为 pending,当商品数量成功扣除时将状态标记为 succeeded ,否则为 failed。
意味着你可以调整序列化配置
自定义序列化
public class MessageSerializer : ISerializer
{
public Message Deserialize(string json)
{
return JsonExtension.Deserialize<Message>(json);
}
public object Deserialize(object value, Type valueType)
{
if (value is JToken jToken)
{
return jToken.ToObject(valueType);
}
throw new NotSupportedException("Type is not of type JToken");
}
public ValueTask<Message> DeserializeAsync(TransportMessage transportMessage, Type valueType)
{
if (valueType == null || transportMessage.Body.IsEmpty)
{
return ValueTask.FromResult(new DotNetCore.CAP.Messages.Message(transportMessage.Headers, null));
}
var json = Encoding.UTF8.GetString(transportMessage.Body.ToArray());
return ValueTask.FromResult(new DotNetCore.CAP.Messages.Message(transportMessage.Headers, JsonExtension.Deserialize(json, valueType)));
}
public bool IsJsonType(object jsonObject)
{
return jsonObject is JsonToken || jsonObject is JToken;
}
public string Serialize(Message message)
{
return JsonExtension.Serialize(message);
}
public ValueTask<TransportMessage> SerializeAsync(Message message)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
if (message.Value == null)
{
return ValueTask.FromResult(new TransportMessage(message.Headers, null));
}
var json = JsonExtension.Serialize(message.Value);
return ValueTask.FromResult(new TransportMessage(message.Headers, Encoding.UTF8.GetBytes(json)));
}
}
然后将你的实现注册到容器中:
services.AddSingleton<DotNetCore.CAP.Serialization.ISerializer, MessageSerializer>();
services.AddCap(x =>
{ xxx
}
CAP 不直接提供开箱即用的基于 DTC 或者 2PC 的分布式事务,相反我们提供一种可以用于解决在分布式事务遇到的问题的一种解决方案。
在分布式环境中,由于涉及通讯的开销,使用基于2PC或DTC的分布式事务将非常昂贵,在性能方面也同样如此。另外由于基于2PC或DTC的分布式事务同样受 CAP定理 的约束,当发生网络分区时它将不得不放弃可用性(CAP中的A)。
针对于分布式事务的处理,CAP 采用的是“异步确保”这种方案。类似于Java中Seata的Saga模式
在说幂等性之前,我们先来说下关于消费端的消息交付。
由于CAP不是使用的 MS DTC 或其他类型的2PC分布式事务机制,所以存在至少消息严格交付一次的问题,具体的说在基于消息的系统中,存在以下三种可能:
在CAP中,我们采用的交付保证为 At Least Once。
由于我们具有临时存储介质(数据库表),也许可以做到 At Most Once, 但是为了严格保证消息不会丢失,我们没有提供相关功能或配置。
通常情况下,保证消息被执行多次而不会产生意外结果是很自然的一种方式是采用操作对象自带的一些幂等功能。比如:
数据库提供的
insert ON DUPLICATE KEY update
或者是采取类型的程序判断行为。
另外一种处理幂等性的方式就是在消息传递的过程中传递ID,然后由单独的消息跟踪器来处理。
下面我们基于MySql和Redis实现显式处理幂等消息
public interface IMessageTracker
{
Task<bool> HasProcessedAsync(string msgId);
bool HasProcessed(string msgId);
Task MarkAsProcessedAsync(string msgId);
void MarkAsProcessed(string msgId);
}
internal class MessageTrackLog
{
public MessageTrackLog(string messageId)
{
MessageId = messageId;
CreatedTime = DateTime.Now;
}
public string MessageId { get; set; }
public DateTime CreatedTime { get; set; }
}
public class MessageData<T>
{
public string Id { get; set; }
public T MessageBody { get; set; }
public DateTime CreatedTime { get; set; }
public MessageData(T messageBody)
{
MessageBody = messageBody;
CreatedTime = DateTime.Now;
Id = SnowflakeGenerator.Instance().GetId().ToString();
}
}
internal class SnowflakeGenerator
{
private static long machineId;//机器ID
private static long datacenterId = 0L;//数据ID
private static long sequence = 0L;//计数从零开始
private static long twepoch = 687888001020L; //惟一时间随机量
private static long machineIdBits = 5L; //机器码字节数
private static long datacenterIdBits = 5L;//数据字节数
public static long maxMachineId = -1L ^ -1L << (int)machineIdBits; //最大机器ID
private static long maxDatacenterId = -1L ^ (-1L << (int)datacenterIdBits);//最大数据ID
private static long sequenceBits = 12L; //计数器字节数,12个字节用来保存计数码
private static long machineIdShift = sequenceBits; //机器码数据左移位数,就是后面计数器占用的位数
private static long datacenterIdShift = sequenceBits + machineIdBits;
private static long timestampLeftShift = sequenceBits + machineIdBits + datacenterIdBits; //时间戳左移动位数就是机器码+计数器总字节数+数据字节数
public static long sequenceMask = -1L ^ -1L << (int)sequenceBits; //一微秒内能够产生计数,若是达到该值则等到下一微妙在进行生成
private static long lastTimestamp = -1L;//最后时间戳
private static object syncRoot = new object();//加锁对象
static SnowflakeGenerator snowflake;
static SnowflakeGenerator()
{
snowflake = new SnowflakeGenerator();
}
public static SnowflakeGenerator Instance()
{
if (snowflake == null)
snowflake = new SnowflakeGenerator();
return snowflake;
}
public SnowflakeGenerator()
{
Snowflakes(0L, -1);
}
public SnowflakeGenerator(long machineId)
{
Snowflakes(machineId, -1);
}
public SnowflakeGenerator(long machineId, long datacenterId)
{
Snowflakes(machineId, datacenterId);
}
private void Snowflakes(long machineId, long datacenterId)
{
if (machineId >= 0)
{
if (machineId > maxMachineId)
{
throw new Exception("机器码ID非法");
}
SnowflakeGenerator.machineId = machineId;
}
if (datacenterId >= 0)
{
if (datacenterId > maxDatacenterId)
{
throw new Exception("数据中心ID非法");
}
SnowflakeGenerator.datacenterId = datacenterId;
}
}
/// <summary>
/// 生成当前时间戳
/// </summary>
/// <returns>毫秒</returns>
private static long GetTimestamp()
{
//让他2000年开始
return (long)(DateTime.UtcNow - new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalMilliseconds;
}
/// <summary>
/// 获取下一微秒时间戳
/// </summary>
/// <param name="lastTimestamp"></param>
/// <returns></returns>
private static long GetNextTimestamp(long lastTimestamp)
{
long timestamp = GetTimestamp();
int count = 0;
while (timestamp <= lastTimestamp)//这里获取新的时间,可能会有错,这算法与comb同样对机器时间的要求很严格
{
count++;
if (count > 10)
throw new Exception("机器的时间可能不对");
System.Threading.Thread.Sleep(1);
timestamp = GetTimestamp();
}
return timestamp;
}
/// <summary>
/// 获取长整形的ID
/// </summary>
/// <returns></returns>
public long GetId()
{
lock (syncRoot)
{
long timestamp = GetTimestamp();
if (SnowflakeGenerator.lastTimestamp == timestamp)
{ //同一微妙中生成ID
sequence = (sequence + 1) & sequenceMask; //用&运算计算该微秒内产生的计数是否已经到达上限
if (sequence == 0)
{
//一微妙内产生的ID计数已达上限,等待下一微妙
timestamp = GetNextTimestamp(SnowflakeGenerator.lastTimestamp);
}
}
else
{
//不一样微秒生成ID
sequence = 0L;
}
if (timestamp < lastTimestamp)
{
throw new Exception("时间戳比上一次生成ID时时间戳还小,故异常");
}
SnowflakeGenerator.lastTimestamp = timestamp; //把当前时间戳保存为最后生成ID的时间戳
long Id = ((timestamp - twepoch) << (int)timestampLeftShift)
| (datacenterId << (int)datacenterIdShift)
| (machineId << (int)machineIdShift)
| sequence;
return Id;
}
}
}
internal class RedisMessageTracker : IMessageTracker
{
#region 属性和字段
private const string KEY_PREFIX = "msgtracker:"; // 默认Key前缀
private const int DEFAULT_CACHE_TIME = 60 * 60 * 24 * 3; // 默认缓存时间为3天,单位为秒
private readonly IDatabase _redisDatabase;
#endregion
//依赖StackExchange.Redis;
public RedisMessageTracker(ConnectionMultiplexer multiplexer)
{
_redisDatabase = multiplexer.GetDatabase();
}
public bool HasProcessed(string msgId)
{
return _redisDatabase.KeyExists(KEY_PREFIX + msgId);
}
public async Task<bool> HasProcessedAsync(string msgId)
{
return await _redisDatabase.KeyExistsAsync(KEY_PREFIX + msgId);
}
public void MarkAsProcessed(string msgId)
{
var msgRecord = new MessageTrackLog(msgId);
_redisDatabase.StringSet($"{KEY_PREFIX}{msgId}", JsonExtension.Serialize(msgRecord), TimeSpan.FromMinutes(DEFAULT_CACHE_TIME));
}
public async Task MarkAsProcessedAsync(string msgId)
{
var msgRecord = new MessageTrackLog(msgId);
await _redisDatabase.StringSetAsync($"{KEY_PREFIX}{msgId}", JsonExtension.Serialize(msgRecord), TimeSpan.FromMinutes(DEFAULT_CACHE_TIME));
}
}
public static IServiceCollection AddRedisMessageTracker(this IServiceCollection services)
{
services.AddScoped<IMessageTracker, RedisMessageTracker>();
return services;
}
internal class MySqlMessageTracker : IMessageTracker
{
private readonly CapContext _capContext;
public MySqlMessageTracker(CapContext capContext)
{
_capContext = capContext;
}
public bool HasProcessed(string msgId)
{
return _capContext.MessageTrackLogs.Any(x => x.MessageId == msgId);
}
public Task<bool> HasProcessedAsync(string msgId)
{
return _capContext.MessageTrackLogs.AnyAsync(x => x.MessageId == msgId);
}
public void MarkAsProcessed(string msgId)
{
MessageTrackLog messageTrackLog = new MessageTrackLog(msgId);
_capContext.MessageTrackLogs.Add(messageTrackLog);
_capContext.SaveChanges();
}
public async Task MarkAsProcessedAsync(string msgId)
{
MessageTrackLog messageTrackLog = new MessageTrackLog(msgId);
await _capContext.MessageTrackLogs.AddAsync(messageTrackLog);
await _capContext.SaveChangesAsync();
}
}
internal class CapContext : DbContext
{
public CapContext(DbContextOptions<CapContext> options)
: base(options)
{
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
// 可以在这里进行模型配置
modelBuilder.Entity<MessageTrackLog>().ToTable("message_track_log");
modelBuilder.Entity<MessageTrackLog>().HasKey(b => b.MessageId);
}
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
base.OnConfiguring(optionsBuilder);
}
public DbSet<MessageTrackLog> MessageTrackLogs { get; set; }
}
public static IServiceCollection AddMySqlMessageTracker(this IServiceCollection services)
{
services.AddScoped<IMessageTracker, MySqlMessageTracker>();
var serviceProvider = services.BuildServiceProvider();
using (var context = serviceProvider.GetService<CapContext>())
{
context.Database.ExecuteSqlRaw(@"
CREATE TABLE IF NOT EXISTS `message_track_log` (
`MessageId` varchar(255) CHARACTER SET utf8mb4 NOT NULL,
`CreatedTime` datetime NOT NULL,
CONSTRAINT `PK_message_track_log` PRIMARY KEY (`MessageId`)
) CHARACTER SET=utf8mb4;
");
}
return services;
}
使用
[ApiController]
[Route("[controller]")]
public class WeatherForecastController : ControllerBase
{
private readonly IMessageTracker _messageTracker;
public WeatherForecastController(IMessageTracker messageTracker)
{
_messageTracker = messageTracker;
}
[CapSubscribe("order.test")]
[NonAction]
public void OrderTest(MessageData<string> messageData)
{
try
{
if (_messageTracker.HasProcessed(messageData.Id))
return;
Console.WriteLine("业务逻辑:"+messageData.MessageBody);
//xxxx
_messageTracker.MarkAsProcessed(messageData.Id);
}
catch (Exception ex)
{
throw ex;
}
}
}
CAP的 Dashboard 使用 Consul 作为服务发现来显示其他节点的数据,然后你就在任意节点的 Dashboard 中切换到 Servers 页面看到其他的节点。
通过点击 Switch 按钮来切换到其他的节点看到其他节点的数据,而不必访问很多地址来分别查看。
以下是一个配置示例, 你需要在每个节点分别配置:
services.AddCap(x =>
{
x.UseMySql(Configuration.GetValue<string>("ConnectionString"));
x.UseRabbitMQ("localhost");
x.UseDashboard();
x.UseConsulDiscovery(_ =>
{
_.DiscoveryServerHostName = "localhost";
_.DiscoveryServerPort = 8500;
_.CurrentNodeHostName = Configuration.GetValue<string>("ASPNETCORE_HOSTNAME");
_.CurrentNodePort = Configuration.GetValue<int>("ASPNETCORE_PORT");
_.NodeId = Configuration.GetValue<string>("NodeId");
_.NodeName = Configuration.GetValue<string>("NodeName");
});
});
首先,你需要安装Dashboard的 NuGet 包。
PM> Install-Package DotNetCore.CAP.Dashboard
然后,在配置中添加如下代码:
services.AddCap(x =>
{
x.UseDashboard();
});
默认情况下,你可以访问
http://localhost:xxx/cap
这个地址打开Dashboard。
默认值:N/A
当位于代理后时,通过配置此参数可以指定代理请求前缀。
默认值:'/cap'
你可以通过修改此配置项来更改Dashboard的访问路径。
默认值:2000 毫秒
此配置项用来配置Dashboard 前端 获取状态接口(/stats)的轮询时间
Default: true
显式允许对 CAP 仪表板 API 进行匿名访问,当启用ASP.NET Core 全局授权筛选器请启用 AllowAnonymous。
Default: null.
Dashboard 的授权策略。 需设置
AllowAnonymousExplicit
为 false。