设置WebSocket连接及请求处理

public class WebsocketService
{
    public static ConcurrentDictionary<string, WebSocket> Sockets = new ConcurrentDictionary<string, WebSocket>();
    //当前请求实例
    public WebSocket Socket = null;

    public string SocketId;

    public WebsocketService()
    {

    }

    public async Task DoWork(HttpContext ctx)
    {
        try
        {
            if (ctx.WebSockets.IsWebSocketRequest)
            {
                Socket = await ctx.WebSockets.AcceptWebSocketAsync();
                SocketId = ctx.Connection.Id;
                Sockets.TryAdd(SocketId, Socket);
                //执行监听
                await EchoLoop();
            }
        }
        catch (Exception ex)
        {

        }
    }

    /// <summary>
    /// 监听
    /// </summary>
    /// <returns></returns>
    public async Task EchoLoop()
    {
        //创建缓存区
        var buffer = new byte[1024];
        var seg = new ArraySegment<byte>(buffer);
        while (Socket.State == WebSocketState.Open)
        {
            var incoming = await Socket.ReceiveAsync(seg, CancellationToken.None);
            //判断类型读取
            if (incoming.MessageType == WebSocketMessageType.Text)
            {
                //incoming.Count  代表,请求内容字节数量
                string userMessage = Encoding.UTF8.GetString(seg.Array, 0, incoming.Count);
                await ReceiveMsgAsync(userMessage);
            }
        }
    }

    /// <summary>
    /// 接收信息
    /// </summary>
    /// <param name="data"></param>
    /// <returns></returns>
    public async Task ReceiveMsgAsync(string data)
    {
        try
        {
            // 处理接收信息
            var msg = JsonHelper.DeserializeObject<object>(data ?? "");
            // 发送信息
            await SendMsgAsync("");
        }
        catch (Exception ex)
        {

        }
    }

    /// <summary>
    /// 发送信息(从哪个接收发送回哪个)
    /// </summary>
    /// <param name="obj"></param>
    /// <returns></returns>
    public async Task SendMsgAsync(object obj)
    {
        try
        {
            if (Socket.State == WebSocketState.Open)
            {
                var msg = JsonHelper.SerializeObject(obj);
                ArraySegment<byte> segResult = new ArraySegment<byte>(Encoding.UTF8.GetBytes(msg));
                await Socket.SendAsync(segResult, WebSocketMessageType.Text, true, CancellationToken.None);
            }
            else
            {
                Sockets.TryRemove(SocketId, out Socket);
            }
        }
        catch (Exception ex)
        {

        }
    }

    /// <summary>
    /// 数据推送(所有客户端)
    /// </summary>
    /// <param name="obj"></param>
    /// <returns></returns>
    public static async Task PushMsgAsync(object obj)
    {
        try
        {
            var msg = JsonHelper.SerializeObject(obj);
            ArraySegment<byte> segResult = new ArraySegment<byte>(Encoding.UTF8.GetBytes(msg));
            foreach (var item in Sockets)
            {
                if (item.Value.State != WebSocketState.Open)
                {
                    Sockets.TryRemove(item);
                    continue;
                }
                await item.Value.SendAsync(segResult, WebSocketMessageType.Text, true, CancellationToken.None);
            }
        }
        catch (Exception ex)
        {

        }
    }
}

添加WebSocket中间件并监听WebSocket请求

Program.csapp.Run();前添加

//开启并绑定websocket
app.UseWebSockets();
app.Map("/tunnel/runtime", con => {
    con.UseWebSockets();
    con.Use(async (ctx, next) => {
        //创建监听websocket
        var ws = new WebsocketService();
        await ws.DoWork(ctx);
        await next.Invoke();
    });
});