SSE(Server-Sent Events)通讯服务

什么是SSE

SSE(Server-Sent Events)是一种用于实现服务器主动向客户端推送数据的技术,也被称为“事件流”(Event Stream)。它基于 HTTP 协议,利用了其长连接特性,在客户端与服务器之间建立一条持久化连接,并通过这条连接实现服务器向客户端的实时数据推送。

SSE技术的基本原理

客户端向服务器发送一个GET请求,带有指定的header,表示可以接收事件流类型,并禁用任何的事件缓存。
服务器返回一个响应,带有指定的header,表示事件的媒体类型和编码,以及使用分块传输编码(chunked)来流式传输动态生成的内容。
服务器在有数据更新时,向客户端发送一个或多个名称:值字段组成的事件,由单个换行符分隔。事件之间由两个换行符分隔。服务器可以发送事件数据、事件类型、事件ID和重试时间等字段。
客户端使用EventSource接口来创建一个对象,打开连接,并订阅onopen、onmessage和onerror等事件处理程序来处理连接状态和接收消息。
客户端可以使用GET查询参数来传递数据给服务器,也可以使用close方法来关闭连接。

SSE和Socket的区别

SSE(Server-Sent Events)和 WebSocket 都是实现服务器向客户端实时推送数据的技术,但它们在某些方面还是有一定的区别。

技术实现
SSE 基于 HTTP 协议,利用了其长连接特性,通过浏览器向服务器发送一个 HTTP 请求,建立一条持久化的连接。而 WebSocket 则是通过特殊的升级协议(HTTP/1.1 Upgrade 或者 HTTP/2)建立新的 TCP 连接,与传统 HTTP 连接不同。

数据格式
SSE 可以传输文本和二进制格式的数据,但只支持单向数据流,即只能由服务器向客户端推送数据。WebSocket 支持双向数据流,客户端和服务器可以互相发送消息,并且没有消息大小限制。

连接状态
SSE 的连接状态仅有三种:已连接、连接中、已断开。连接状态是由浏览器自动维护的,客户端无法手动关闭或重新打开连接。而 WebSocket 连接的状态更灵活,可以手动打开、关闭、重连等。

兼容性
SSE 是标准的 Web API,可以在大部分现代浏览器和移动设备上使用。但如果需要兼容老版本的浏览器(如 IE6/7/8),则需要使用 polyfill 库进行兼容。而 WebSocket 在一些老版本 Android 手机上可能存在兼容性问题,需要使用一些特殊的 API 进行处理。

安全性
SSE 的实现比较简单,都是基于 HTTP 协议的,与普通的 Web 应用没有太大差异,因此风险相对较低。WebSocket 则需要通过额外的安全措施(如 SSL/TLS 加密)来确保数据传输的安全性,避免被窃听和篡改,否则可能会带来安全隐患。

总体来说,SSE 和 WebSocket 都有各自的优缺点,适用于不同的场景和需求。如果只需要服务器向客户端单向推送数据,并且应用在前端的浏览器环境中,则 SSE 是一个更加轻量级、易于实现和维护的选择。而如果需要双向传输数据、支持自定义协议、或者在更加复杂的网络环境中应用,则 WebSocket 可能更加适合。

SSE适用于场景

SSE适用场景是指服务器向客户端实时推送数据的场景,例如:

  • 股票价格更新:服务器可以根据股市的变化,实时地将股票价格推送给客户端,让客户端能够及时了解股票的走势和行情。

  • 新闻实时推送:服务器可以根据新闻的更新,实时地将新闻内容或标题推送给客户端,让客户端能够及时了解最新的新闻动态和信息。

  • 在线聊天:服务器可以根据用户的发送,实时地将聊天消息推送给客户端,让客户端能够及时收到和回复消息。

  • 实时监控:服务器可以根据设备的状态,实时地将监控数据或报警信息推送给客户端,让客户端能够及时了解设备的运行情况和异常情况。

SSE适用场景的特点是:

  • 数据更新频繁:服务器需要不断地将最新的数据推送给客户端,保持数据的实时性和准确性。

  • 低延迟:服务器需要尽快地将数据推送给客户端,避免数据的延迟和过期。

  • 单向通信:服务器只需要向客户端推送数据,而不需要接收客户端的数据。

ChatGPT 返回的数据 就是使用的SSE 技术
实时数据大屏 如果只是需要展示 实时的数据可以使用SSE技术 而不是非要使用webSocket

服务端(.Net Core API)

在服务器端,需要使用 text/event-stream 作为响应的Content-Type,并按照以下格式发送数据

event: <event name>
data: <data content>
id: <event id>
retry: <reconnection time>

其中:

  • event字段是可选的,用于指定事件的名称;
  • data字段是必须的,用于指定数据的内容;
  • id字段是可选的,用于指定事件的标识符;
  • retry字段是可选的,用于指定客户端在连接断开后重新连接的时间间隔(以毫秒为单位)。
  • 每个字段都必须以换行符(==\n==)结尾,并且每个消息都必须以两个换行符(==\n\n==)结尾。
using Microsoft.AspNetCore.Mvc;
using System.Diagnostics;

namespace API.Controllers
{
    [ApiController]
    [Route("api/[controller]/[action]")]
    public class AIController : ControllerBase
    {
        [HttpGet]
        public async Task GetMessage(CancellationToken cancellationToken)
        {
           
            Response.Headers.Add("Content-Type", "text/event-stream;charset=utf-8");
            Response.Headers.Add("Cache-Control", "no-cache");
            Response.Headers.Add("Connection", "keep-alive");

            try
            {
                while (!cancellationToken.IsCancellationRequested)
                {
                    Debug.WriteLine("输出中");
                    await Response.WriteAsync("event: custom" + "\n");
                    await Response.WriteAsync("data: " + DateTime.Now.ToLongTimeString() + "\n\n");
                    await Response.Body.FlushAsync(); // 刷新缓冲区,立即发送数据到客户端
                    await Task.Delay(1000); // 等待1秒后再次发送
                }
            }
            catch (OperationCanceledException)
            {
				
            }
           
        }
    }
}


//下面是一个实际的接口Demo,此接口, 可能返回json格式, 也可能返回event-stream流

/// <summary>
/// 规范+监理档案查询
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
[HttpGet]
public async Task StandardSpvFileSearchChat(string message, CancellationToken cancellationToken, bool stream = false)
{
    var r = await _chatService.StandardSpvFileSearchChat(message);
    if (stream&& r.ResultCode=="0000")
    {
        Response.ContentType = "text/event-stream;charset=utf-8";
        Response.Headers.Add("Cache-Control", "no-cache");
        Response.Headers.Add("Connection", "keep-alive");
        string rstr = r.Data["anwser"];
        //模拟随机生成效果, 一次不一定发多少字
        // 确保字符串不为空
        if (string.IsNullOrEmpty(rstr))
        {
            Response.Headers.Add("Content-Type", "application/json");
            string rrr = JsonConvert.SerializeObject(r);
            await Response.WriteAsync(rrr);
            await Response.Body.FlushAsync();
        }
        Random _random = new Random();
        int _lastIndex = 0;
        try
        {
            while (!cancellationToken.IsCancellationRequested && _lastIndex < rstr.Length)
            {
                // 随机选择本次要取的字符数量(1到10个)
                int charsToTake = _random.Next(1, 11);

                // 确保charsToTake不会超过字符串剩余长度
                if (_lastIndex + charsToTake > rstr.Length)
                {
                    charsToTake = rstr.Length - _lastIndex;
                }

                // 取子串并打印
                string substring = rstr.Substring(_lastIndex, charsToTake);
                await Response.WriteAsync("event: message" + "\n");
                await Response.WriteAsync("data: " + substring + "\n\n");
                await Response.Body.FlushAsync(); // 刷新缓冲区,立即发送数据到客户端

                // 更新_lastIndex以准备下一次迭代
                _lastIndex += charsToTake;

                // 模拟0.1秒延迟
                await Task.Delay(100);
            }

            await Response.WriteAsync("event: message" + "\n");
            await Response.WriteAsync("data: [Done]" + "\n\n");
            await Response.Body.FlushAsync();
            //普通循环, 一次发一个字
            //for (int i = 0; i < rstr.Length; i++)
            //{
            //    await Response.WriteAsync("event: message" + "\n");
            //    await Response.WriteAsync("data: " + rstr[i] + "\n\n");
            //    await Response.Body.FlushAsync(); // 刷新缓冲区,立即发送数据到客户端
            //    await Task.Delay(100); // 等待1秒后再次发送
            //}
        }
        catch (OperationCanceledException)
        {

        }
    }
    else
    {
        Response.Headers.Add("Content-Type", "application/json");
        string rrr = JsonConvert.SerializeObject(r);
        await Response.WriteAsync(rrr);
        await Response.Body.FlushAsync();
    }
}

前端(JS)

//开启sse连接
if (typeof(EventSource) !== "undefined") {
  // 创建一个EventSource实例,指向你的SSE服务的URL
  var source = new EventSource("http://localhost:5260/api/AI/GetMessage");
 
  // 当服务器发送一个消息时,会触发这个事件
  source.onmessage = function(event) {
    // 处理服务器发送的消息
    console.log("Received message: ", event.data);
  };
 
  // 当与服务器的连接打开时,会触发这个事件
  source.onopen = function(event) {
    console.log("Connection is open!");
  };
 
  // 当与服务器的连接关闭时,会触发这个事件
  source.onerror = function(event) {
    console.log("Connection is closed!");
  };
  
  // 监听服务端的test事件类型, 并输出data数据
  source.addEventListener('custom', function(event) {
        console.log(event.data); // 输出接收到的数据
    }, false);
    
} else {
  // 浏览器不支持SSE
  console.log("Sorry, your browser does not support server-sent events...");
}


//退出/关闭sse连接
source.close()


//如果后端接口既可能返回json也可能返回event-stream, 则前端可使用如下方式进行判断

let url="http://10.8.1.108:8099/api/Chat/SpvFileSearchChat?message=安全帽佩戴要求&stream=true"
fetch(url)
  .then(response => {
    const contentType = response.headers.get('content-type');

    if (contentType.includes('application/json')) {
      // 处理JSON响应
      return response.json().then(jsonData => {
        console.log("Received JSON:", jsonData);
      });
    } else if (contentType.includes('text/event-stream')) {
      // 处理SSE响应
      const reader = response.body.getReader();
      function processSSEStream() {
        reader.read().then(({done, value}) => {
          if (done) {
            console.log('SSE Stream closed');
            return;
          }
          // SSE数据通常以"data: "开头,这里简单处理以提取data部分
          const dataText = new TextDecoder('utf-8').decode(value);
          const lines = dataText.split('\n').filter(line => line.trim());
          lines.forEach(line => {
            if (line.startsWith('data: ')) {
              console.log("SSE Data:", line.substring(6)); // 去除"data: "前缀
            }
          });

          // 继续读取流
          processSSEStream();
        }).catch(error => {
          console.error('Error reading SSE stream', error);
        });
      }
      
      // 开始处理SSE流
      processSSEStream();
    } else {
      throw new Error(`Unsupported content type: ${contentType}`);
    }
  })
  .catch(error => {
    console.error('Fetch error:', error);
  });

前端(.Net 控制台)

nuget 搜索安装->  3v.EvtSource
using EvtSource;

var url = "http://localhost:5260/api/AI/GetMessage"; // 替换为你的SSE服务地址
var evt = new EventSourceReader(new Uri(url)).Start();
evt.MessageReceived += (object sender, EventSourceMessageEventArgs e) => Console.WriteLine($"{e.Event} : {e.Message}");
evt.Disconnected += async (object sender, DisconnectEventArgs e) =>
{
    Console.WriteLine($"Retry: {e.ReconnectDelay} - Error: {e.Exception}");
    await Task.Delay(e.ReconnectDelay);
    if (!evt.IsDisposed)
    {
        evt.Start(); // Reconnect to the same URL
    }
};

Task.Run(() =>
{
    Task.Delay(4000).GetAwaiter().GetResult();
    evt.Dispose();
});

Console.ReadKey();

前端(JS, 服务端接口为post请求时)

@microsoft/fetch-event-source 是一个由微软提供的库,用于在客户端和服务器之间建立基于 EventSource 的连接。EventSource 是一种 HTTP 协议,允许服务器向客户端推送实时事件流。该库提供了对 EventSource 协议的封装,使得在前端 JavaScript 中使用 EventSource 变得更加方便。
GitHub地址: https://github.com/Azure/fetch-event-source
在 @microsoft/fetch-event-source 中,主要使用 fetchEventSource 函数来创建一个新的 EventSource 连接。这个函数接受一个 URL 参数,以及一个配置对象,其中可以包含一些选项,如请求方法、请求头、请求体等。当服务器向客户端推送事件时,可以通过 onmessage 回调函数来处理这些事件。此外,还可以提供 onerror 和 onclose 回调函数来处理连接错误和关闭事件

  • 安装

    pnpm install @microsoft/fetch-event-source
  • 使用

    // 测试前端SSE调用
    import { fetchEventSource } from '@microsoft/fetch-event-source'
    const testSSE = () => {
      const OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY'
      const OPENAI_COMPLETION_ENDPOINT = 'https://api.openai.com/v1/chat/completions'
      const requestData = {
        model: 'gpt-3.5-turbo',
        messages: [
          {
            role: 'user',
            content: '我想去西安旅游7天'
          }
        ],
        stream: true
      }
      let respString = ''
      fetchEventSource(OPENAI_COMPLETION_ENDPOINT, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${OPENAI_API_KEY}`,
        },
        body: JSON.stringify(requestData),
        async onopen(response) {
          if (response.ok && response.headers.get('content-type') === 'text/event-stream') {
            // everything's good
            console.log('everything's good')
          } else if (response.status >= 400 && response.status < 500 && response.status !== 429) {
            console.log('请求错误')
          } else {
            console.log('其他错误')
          }
        },
        async onmessage(event) {
          // 表示整体结束
          if (event.data === '[DONE]') {
            console.log('结束')
            return
          }
          const jsonData = JSON.parse(event.data)
          // 如果等于stop表示结束
          if (jsonData.choices[0].finish_reason === 'stop') {
            return
          }
          // 判断role存在,进行排除
          if (jsonData.choices[0].delta.role !== undefined) {
            respString = jsonData.choices[0].delta.role + ': '
            return
          }
          if (jsonData.choices[0].delta.content !== undefined) {
            respString += jsonData.choices[0].delta.content
            console.log(respString)
          }
        },
        async onerror(error) {
          console.error('Error:', error)
        },
        async onclose() {
          // if the server closes the connection unexpectedly, retry:
          console.log('关闭连接')
        }
      })
      console.log('测试SSE')
    }

前端(.Net 控制台, 服务端接口为post请求时)

// 引入 https://github.com/lknbv5/EvtSource 库,(3v.EvtSource的改进版本)

using EvtSource;

var url = "http://10.8.2.251:3001/v1/chat/completions"; // 替换为你的SSE服务地址
Console.Write("提问:");
while (true)
{
    var a = Console.ReadLine();
    if (!string.IsNullOrWhiteSpace(a))
    {
        var data = $@"{{ 
          ""model"":""qwen-long"",
          ""messages"": [ 
            {{ ""role"": ""system"", ""content"": ""用中文回答用户的问题"" }},
            {{ ""role"": ""user"", ""content"": ""{a}""}}
          ], 
          ""temperature"": 0.7, 
          ""stream"": true
        }}";
        var content = new StringContent(data, Encoding.UTF8, "application/json");
        var evt = new EventSourceReader(new Uri(url), content, ("Authorization", "Bearer sk-Z2egX6DGBQ6BcRN19fBf460d192045869d6b1c60867d4e2b")).Start();
        Console.WriteLine("思考中...请稍后");
        var isfirst = 0;
        evt.MessageReceived += (object sender, EventSourceMessageEventArgs e) =>
        {
            if (isfirst == 0)
            {
                Console.Write("回答:");
                isfirst = 1;
            }
            if (e.Message == "[DONE]")
            {
                Console.WriteLine("\n");
                Console.Write("提问:");
            }
            else
            {
                var message = JObject.Parse(e.Message)["choices"][0]["delta"]["content"].ToString();
                Thread.Sleep(100);
                Console.Write(message);
            }
        };
    }
}