C#教程

关注公众号 jb51net

关闭
首页 > 软件编程 > C#教程 > .net cap消息处理

.net使用cap实现消息异步处理

作者:假装我不帅

CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点,本文给大家介绍了.net下使用cap实现消息异步处理,需要的朋友可以参考下

介绍

CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。

新建项目

新建.net7web项目

安装依赖包

安装软件

安装redis和Sql Server

修改代码

新建RedisConfigModel

namespace CAPStu01.Models;

public class RedisConfigModel
{
    /// <summary>
    /// 服务器地址
    /// </summary>
    public string Host { get; set; }

    /// <summary>
    /// 端口号
    /// </summary>
    public int Port { get; set; }

    /// <summary>
    /// 密码
    /// </summary>
    public string Pwd { get; set; }
}

修改appsettings.json

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "ConnectionStrings": {
    "SQlServer": "server=127.0.0.1;User ID=sa;Password=xxxx;database=capstu;Encrypt=True;TrustServerCertificate=True;connection timeout=600;"
  },
  "RedisConfig": {
    "Host": "127.0.0.1",
    "Port": 6379,
    "Pwd": ""
  }
}

修改Program.cs

using CAPStu01.Models;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
var redisConfig = builder.Configuration.GetSection("RedisConfig").Get<RedisConfigModel>();
var connectionStr = builder.Configuration.GetConnectionString("SQlServer") ?? "";
builder.Services.AddCap(x =>
{
    x.UseRedis(options =>
    {
        if (options.Configuration != null && redisConfig != null)
        {
            options.Configuration.EndPoints.Add(redisConfig.Host, redisConfig.Port);
            options.Configuration.DefaultDatabase = 0;
            options.Configuration.Password = redisConfig?.Pwd ?? "";
        }
    });
    x.UseSqlServer(sqlServerOptions =>
    {
        sqlServerOptions.Schema = "dbo";
        sqlServerOptions.ConnectionString = connectionStr;
    });
    //开启面板
    x.UseDashboard(d =>
    {
        //允许匿名访问
        d.AllowAnonymousExplicit = true;
    });
});
var app = builder.Build();

app.UseRouting();
app.MapControllers();
app.Run();

新建HomeController

using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;

namespace CAPStu01.Controllers;

[ApiController]
public class HomeController:ControllerBase
{
    public HomeController()
    {
        
    }

    /// <summary>
    /// 发送消息
    /// </summary>
    /// <returns></returns>
    [HttpGet("/")]
    public IActionResult Index([FromServices]ICapPublisher capBus)
    {
        capBus.Publish("test.show.time","你好,CAP");
        return Content("发送消息成功");
    }
    
    /// <summary>
    /// 接受消息
    /// </summary>
    /// <param name="data"></param>
    [NonAction]
    [CapSubscribe("test.show.time")]
    public void ReceiveMessage(string data)
    {
        Console.WriteLine("message data is:" + data);
    }
}

结果

如果使用redis需要定期清理streams内容

安装freeredis,修改Program.cs

builder.Services.AddSingleton<IRedisClient>(new RedisClient($"{redisConfig.Host}:{redisConfig.Port},password={redisConfig.Pwd},defaultDatabase=0"));

新增清除方法

private readonly IRedisClient _redisClient;

public HomeController(IRedisClient redisClient)
{
    _redisClient = redisClient;
}

/// <summary>
/// 清除已处理的redis数据
/// </summary>
/// <returns></returns>
[HttpGet("/clear")]
public IActionResult ClearAckStream()
{
    var groups = _redisClient.XInfoGroups("test.show.time");
    var unreandMsgs = new List<string>();
    //获取所有的未读消息
    foreach (var group in groups)
    {
        if (group.pending > 0)
        {
            //有未读消息
            var unReadList = _redisClient.XPending("test.show.time", group.name);
            if (unReadList.count > 0)
            {
                var groupInfo = _redisClient.XPending("test.show.time", group.name);
                var unreandList = _redisClient.XPending("test.show.time", group.name, groupInfo.minId, groupInfo.maxId,
                    groupInfo.count);
                foreach (var unre in unreandList)
                {
                    unreandMsgs.Add(unre.id);
                }
            }
        }
    }
    //获取全部的消息
    var allMsgs = _redisClient.XRange("test.show.time", "-", "+");
    foreach (var msg in allMsgs)
    {
        if (unreandMsgs.Contains(msg.id))
        {
            //这个消息未读则跳过
            continue;
        }
        //删除已处理的消息
        _redisClient.XDel("test.show.time", msg.id);
    }

    return Content($"共处理未读消息:{unreandMsgs.Count}个,已读消息{allMsgs.Length}个");
}

以上就是.net使用cap实现消息异步处理的详细内容,更多关于.net cap消息处理的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文