设置WebSocket连接及请求处理
public class WebsocketService
{
public static ConcurrentDictionary<string, WebSocket> Sockets = new ConcurrentDictionary<string, WebSocket>();
//当前请求实例
public WebSocket Socket = null;
public string SocketId;
public WebsocketService()
{
}
public async Task DoWork(HttpContext ctx)
{
try
{
if (ctx.WebSockets.IsWebSocketRequest)
{
Socket = await ctx.WebSockets.AcceptWebSocketAsync();
SocketId = ctx.Connection.Id;
Sockets.TryAdd(SocketId, Socket);
//执行监听
await EchoLoop();
}
}
catch (Exception ex)
{
}
}
/// <summary>
/// 监听
/// </summary>
/// <returns></returns>
public async Task EchoLoop()
{
//创建缓存区
var buffer = new byte[1024];
var seg = new ArraySegment<byte>(buffer);
while (Socket.State == WebSocketState.Open)
{
var incoming = await Socket.ReceiveAsync(seg, CancellationToken.None);
//判断类型读取
if (incoming.MessageType == WebSocketMessageType.Text)
{
//incoming.Count 代表,请求内容字节数量
string userMessage = Encoding.UTF8.GetString(seg.Array, 0, incoming.Count);
await ReceiveMsgAsync(userMessage);
}
}
}
/// <summary>
/// 接收信息
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
public async Task ReceiveMsgAsync(string data)
{
try
{
// 处理接收信息
var msg = JsonHelper.DeserializeObject<object>(data ?? "");
// 发送信息
await SendMsgAsync("");
}
catch (Exception ex)
{
}
}
/// <summary>
/// 发送信息(从哪个接收发送回哪个)
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
public async Task SendMsgAsync(object obj)
{
try
{
if (Socket.State == WebSocketState.Open)
{
var msg = JsonHelper.SerializeObject(obj);
ArraySegment<byte> segResult = new ArraySegment<byte>(Encoding.UTF8.GetBytes(msg));
await Socket.SendAsync(segResult, WebSocketMessageType.Text, true, CancellationToken.None);
}
else
{
Sockets.TryRemove(SocketId, out Socket);
}
}
catch (Exception ex)
{
}
}
/// <summary>
/// 数据推送(所有客户端)
/// </summary>
/// <param name="obj"></param>
/// <returns></returns>
public static async Task PushMsgAsync(object obj)
{
try
{
var msg = JsonHelper.SerializeObject(obj);
ArraySegment<byte> segResult = new ArraySegment<byte>(Encoding.UTF8.GetBytes(msg));
foreach (var item in Sockets)
{
if (item.Value.State != WebSocketState.Open)
{
Sockets.TryRemove(item);
continue;
}
await item.Value.SendAsync(segResult, WebSocketMessageType.Text, true, CancellationToken.None);
}
}
catch (Exception ex)
{
}
}
}
添加WebSocket中间件并监听WebSocket请求
在Program.cs中app.Run();前添加
//开启并绑定websocket
app.UseWebSockets();
app.Map("/tunnel/runtime", con => {
con.UseWebSockets();
con.Use(async (ctx, next) => {
//创建监听websocket
var ws = new WebsocketService();
await ws.DoWork(ctx);
await next.Invoke();
});
}); 


评论一下吧
取消回复 评论守则