.net使用cap实现消息异步处理
作者:假装我不帅
CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点,本文给大家介绍了.net下使用cap实现消息异步处理,需要的朋友可以参考下
介绍
CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。
新建项目
新建.net7web项目
安装依赖包
安装软件
安装redis和Sql Server
修改代码
新建RedisConfigModel
namespace CAPStu01.Models; public class RedisConfigModel { /// <summary> /// 服务器地址 /// </summary> public string Host { get; set; } /// <summary> /// 端口号 /// </summary> public int Port { get; set; } /// <summary> /// 密码 /// </summary> public string Pwd { get; set; } }
修改appsettings.json
{ "Logging": { "LogLevel": { "Default": "Information", "Microsoft.AspNetCore": "Warning" } }, "ConnectionStrings": { "SQlServer": "server=127.0.0.1;User ID=sa;Password=xxxx;database=capstu;Encrypt=True;TrustServerCertificate=True;connection timeout=600;" }, "RedisConfig": { "Host": "127.0.0.1", "Port": 6379, "Pwd": "" } }
修改Program.cs
using CAPStu01.Models; var builder = WebApplication.CreateBuilder(args); builder.Services.AddControllers(); var redisConfig = builder.Configuration.GetSection("RedisConfig").Get<RedisConfigModel>(); var connectionStr = builder.Configuration.GetConnectionString("SQlServer") ?? ""; builder.Services.AddCap(x => { x.UseRedis(options => { if (options.Configuration != null && redisConfig != null) { options.Configuration.EndPoints.Add(redisConfig.Host, redisConfig.Port); options.Configuration.DefaultDatabase = 0; options.Configuration.Password = redisConfig?.Pwd ?? ""; } }); x.UseSqlServer(sqlServerOptions => { sqlServerOptions.Schema = "dbo"; sqlServerOptions.ConnectionString = connectionStr; }); //开启面板 x.UseDashboard(d => { //允许匿名访问 d.AllowAnonymousExplicit = true; }); }); var app = builder.Build(); app.UseRouting(); app.MapControllers(); app.Run();
新建HomeController
using DotNetCore.CAP; using Microsoft.AspNetCore.Mvc; namespace CAPStu01.Controllers; [ApiController] public class HomeController:ControllerBase { public HomeController() { } /// <summary> /// 发送消息 /// </summary> /// <returns></returns> [HttpGet("/")] public IActionResult Index([FromServices]ICapPublisher capBus) { capBus.Publish("test.show.time","你好,CAP"); return Content("发送消息成功"); } /// <summary> /// 接受消息 /// </summary> /// <param name="data"></param> [NonAction] [CapSubscribe("test.show.time")] public void ReceiveMessage(string data) { Console.WriteLine("message data is:" + data); } }
结果
如果使用redis需要定期清理streams内容
安装freeredis,修改Program.cs
builder.Services.AddSingleton<IRedisClient>(new RedisClient($"{redisConfig.Host}:{redisConfig.Port},password={redisConfig.Pwd},defaultDatabase=0"));
新增清除方法
private readonly IRedisClient _redisClient; public HomeController(IRedisClient redisClient) { _redisClient = redisClient; } /// <summary> /// 清除已处理的redis数据 /// </summary> /// <returns></returns> [HttpGet("/clear")] public IActionResult ClearAckStream() { var groups = _redisClient.XInfoGroups("test.show.time"); var unreandMsgs = new List<string>(); //获取所有的未读消息 foreach (var group in groups) { if (group.pending > 0) { //有未读消息 var unReadList = _redisClient.XPending("test.show.time", group.name); if (unReadList.count > 0) { var groupInfo = _redisClient.XPending("test.show.time", group.name); var unreandList = _redisClient.XPending("test.show.time", group.name, groupInfo.minId, groupInfo.maxId, groupInfo.count); foreach (var unre in unreandList) { unreandMsgs.Add(unre.id); } } } } //获取全部的消息 var allMsgs = _redisClient.XRange("test.show.time", "-", "+"); foreach (var msg in allMsgs) { if (unreandMsgs.Contains(msg.id)) { //这个消息未读则跳过 continue; } //删除已处理的消息 _redisClient.XDel("test.show.time", msg.id); } return Content($"共处理未读消息:{unreandMsgs.Count}个,已读消息{allMsgs.Length}个"); }
以上就是.net使用cap实现消息异步处理的详细内容,更多关于.net cap消息处理的资料请关注脚本之家其它相关文章!