SSE(Server-Sent Events)通讯服务
什么是SSE
SSE(Server-Sent Events)是一种用于实现服务器主动向客户端推送数据的技术,也被称为“事件流”(Event Stream)。它基于 HTTP 协议,利用了其长连接特性,在客户端与服务器之间建立一条持久化连接,并通过这条连接实现服务器向客户端的实时数据推送。
SSE技术的基本原理
客户端向服务器发送一个GET请求,带有指定的header,表示可以接收事件流类型,并禁用任何的事件缓存。
服务器返回一个响应,带有指定的header,表示事件的媒体类型和编码,以及使用分块传输编码(chunked)来流式传输动态生成的内容。
服务器在有数据更新时,向客户端发送一个或多个名称:值字段组成的事件,由单个换行符分隔。事件之间由两个换行符分隔。服务器可以发送事件数据、事件类型、事件ID和重试时间等字段。
客户端使用EventSource接口来创建一个对象,打开连接,并订阅onopen、onmessage和onerror等事件处理程序来处理连接状态和接收消息。
客户端可以使用GET查询参数来传递数据给服务器,也可以使用close方法来关闭连接。
SSE和Socket的区别
SSE(Server-Sent Events)和 WebSocket 都是实现服务器向客户端实时推送数据的技术,但它们在某些方面还是有一定的区别。
技术实现
SSE 基于 HTTP 协议,利用了其长连接特性,通过浏览器向服务器发送一个 HTTP 请求,建立一条持久化的连接。而 WebSocket 则是通过特殊的升级协议(HTTP/1.1 Upgrade 或者 HTTP/2)建立新的 TCP 连接,与传统 HTTP 连接不同。
数据格式
SSE 可以传输文本和二进制格式的数据,但只支持单向数据流,即只能由服务器向客户端推送数据。WebSocket 支持双向数据流,客户端和服务器可以互相发送消息,并且没有消息大小限制。
连接状态
SSE 的连接状态仅有三种:已连接、连接中、已断开。连接状态是由浏览器自动维护的,客户端无法手动关闭或重新打开连接。而 WebSocket 连接的状态更灵活,可以手动打开、关闭、重连等。
兼容性
SSE 是标准的 Web API,可以在大部分现代浏览器和移动设备上使用。但如果需要兼容老版本的浏览器(如 IE6/7/8),则需要使用 polyfill 库进行兼容。而 WebSocket 在一些老版本 Android 手机上可能存在兼容性问题,需要使用一些特殊的 API 进行处理。
安全性
SSE 的实现比较简单,都是基于 HTTP 协议的,与普通的 Web 应用没有太大差异,因此风险相对较低。WebSocket 则需要通过额外的安全措施(如 SSL/TLS 加密)来确保数据传输的安全性,避免被窃听和篡改,否则可能会带来安全隐患。
总体来说,SSE 和 WebSocket 都有各自的优缺点,适用于不同的场景和需求。如果只需要服务器向客户端单向推送数据,并且应用在前端的浏览器环境中,则 SSE 是一个更加轻量级、易于实现和维护的选择。而如果需要双向传输数据、支持自定义协议、或者在更加复杂的网络环境中应用,则 WebSocket 可能更加适合。
SSE适用于场景
SSE适用场景是指服务器向客户端实时推送数据的场景,例如:
股票价格更新:服务器可以根据股市的变化,实时地将股票价格推送给客户端,让客户端能够及时了解股票的走势和行情。
新闻实时推送:服务器可以根据新闻的更新,实时地将新闻内容或标题推送给客户端,让客户端能够及时了解最新的新闻动态和信息。
在线聊天:服务器可以根据用户的发送,实时地将聊天消息推送给客户端,让客户端能够及时收到和回复消息。
实时监控:服务器可以根据设备的状态,实时地将监控数据或报警信息推送给客户端,让客户端能够及时了解设备的运行情况和异常情况。
SSE适用场景的特点是:
数据更新频繁:服务器需要不断地将最新的数据推送给客户端,保持数据的实时性和准确性。
低延迟:服务器需要尽快地将数据推送给客户端,避免数据的延迟和过期。
单向通信:服务器只需要向客户端推送数据,而不需要接收客户端的数据。
ChatGPT 返回的数据 就是使用的SSE 技术
实时数据大屏 如果只是需要展示 实时的数据可以使用SSE技术 而不是非要使用webSocket
服务端(.Net Core API)
在服务器端,需要使用 text/event-stream
作为响应的Content-Type,并按照以下格式发送数据
event: <event name>
data: <data content>
id: <event id>
retry: <reconnection time>
其中:
- event字段是可选的,用于指定事件的名称;
- data字段是必须的,用于指定数据的内容;
- id字段是可选的,用于指定事件的标识符;
- retry字段是可选的,用于指定客户端在连接断开后重新连接的时间间隔(以毫秒为单位)。
- 每个字段都必须以换行符(==\n==)结尾,并且每个消息都必须以两个换行符(==\n\n==)结尾。
using Microsoft.AspNetCore.Mvc;
using System.Diagnostics;
namespace API.Controllers
{
[ApiController]
[Route("api/[controller]/[action]")]
public class AIController : ControllerBase
{
[HttpGet]
public async Task GetMessage(CancellationToken cancellationToken)
{
Response.Headers.Add("Content-Type", "text/event-stream;charset=utf-8");
Response.Headers.Add("Cache-Control", "no-cache");
Response.Headers.Add("Connection", "keep-alive");
try
{
while (!cancellationToken.IsCancellationRequested)
{
Debug.WriteLine("输出中");
await Response.WriteAsync("event: custom" + "\n");
await Response.WriteAsync("data: " + DateTime.Now.ToLongTimeString() + "\n\n");
await Response.Body.FlushAsync(); // 刷新缓冲区,立即发送数据到客户端
await Task.Delay(1000); // 等待1秒后再次发送
}
}
catch (OperationCanceledException)
{
}
}
}
}
//下面是一个实际的接口Demo,此接口, 可能返回json格式, 也可能返回event-stream流
/// <summary>
/// 规范+监理档案查询
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
[HttpGet]
public async Task StandardSpvFileSearchChat(string message, CancellationToken cancellationToken, bool stream = false)
{
var r = await _chatService.StandardSpvFileSearchChat(message);
if (stream&& r.ResultCode=="0000")
{
Response.ContentType = "text/event-stream;charset=utf-8";
Response.Headers.Add("Cache-Control", "no-cache");
Response.Headers.Add("Connection", "keep-alive");
string rstr = r.Data["anwser"];
//模拟随机生成效果, 一次不一定发多少字
// 确保字符串不为空
if (string.IsNullOrEmpty(rstr))
{
Response.Headers.Add("Content-Type", "application/json");
string rrr = JsonConvert.SerializeObject(r);
await Response.WriteAsync(rrr);
await Response.Body.FlushAsync();
}
Random _random = new Random();
int _lastIndex = 0;
try
{
while (!cancellationToken.IsCancellationRequested && _lastIndex < rstr.Length)
{
// 随机选择本次要取的字符数量(1到10个)
int charsToTake = _random.Next(1, 11);
// 确保charsToTake不会超过字符串剩余长度
if (_lastIndex + charsToTake > rstr.Length)
{
charsToTake = rstr.Length - _lastIndex;
}
// 取子串并打印
string substring = rstr.Substring(_lastIndex, charsToTake);
await Response.WriteAsync("event: message" + "\n");
await Response.WriteAsync("data: " + substring + "\n\n");
await Response.Body.FlushAsync(); // 刷新缓冲区,立即发送数据到客户端
// 更新_lastIndex以准备下一次迭代
_lastIndex += charsToTake;
// 模拟0.1秒延迟
await Task.Delay(100);
}
await Response.WriteAsync("event: message" + "\n");
await Response.WriteAsync("data: [Done]" + "\n\n");
await Response.Body.FlushAsync();
//普通循环, 一次发一个字
//for (int i = 0; i < rstr.Length; i++)
//{
// await Response.WriteAsync("event: message" + "\n");
// await Response.WriteAsync("data: " + rstr[i] + "\n\n");
// await Response.Body.FlushAsync(); // 刷新缓冲区,立即发送数据到客户端
// await Task.Delay(100); // 等待1秒后再次发送
//}
}
catch (OperationCanceledException)
{
}
}
else
{
Response.Headers.Add("Content-Type", "application/json");
string rrr = JsonConvert.SerializeObject(r);
await Response.WriteAsync(rrr);
await Response.Body.FlushAsync();
}
}
前端(JS)
//开启sse连接
if (typeof(EventSource) !== "undefined") {
// 创建一个EventSource实例,指向你的SSE服务的URL
var source = new EventSource("http://localhost:5260/api/AI/GetMessage");
// 当服务器发送一个消息时,会触发这个事件
source.onmessage = function(event) {
// 处理服务器发送的消息
console.log("Received message: ", event.data);
};
// 当与服务器的连接打开时,会触发这个事件
source.onopen = function(event) {
console.log("Connection is open!");
};
// 当与服务器的连接关闭时,会触发这个事件
source.onerror = function(event) {
console.log("Connection is closed!");
};
// 监听服务端的test事件类型, 并输出data数据
source.addEventListener('custom', function(event) {
console.log(event.data); // 输出接收到的数据
}, false);
} else {
// 浏览器不支持SSE
console.log("Sorry, your browser does not support server-sent events...");
}
//退出/关闭sse连接
source.close()
//如果后端接口既可能返回json也可能返回event-stream, 则前端可使用如下方式进行判断
let url="http://10.8.1.108:8099/api/Chat/SpvFileSearchChat?message=安全帽佩戴要求&stream=true"
fetch(url)
.then(response => {
const contentType = response.headers.get('content-type');
if (contentType.includes('application/json')) {
// 处理JSON响应
return response.json().then(jsonData => {
console.log("Received JSON:", jsonData);
});
} else if (contentType.includes('text/event-stream')) {
// 处理SSE响应
const reader = response.body.getReader();
function processSSEStream() {
reader.read().then(({done, value}) => {
if (done) {
console.log('SSE Stream closed');
return;
}
// SSE数据通常以"data: "开头,这里简单处理以提取data部分
const dataText = new TextDecoder('utf-8').decode(value);
const lines = dataText.split('\n').filter(line => line.trim());
lines.forEach(line => {
if (line.startsWith('data: ')) {
console.log("SSE Data:", line.substring(6)); // 去除"data: "前缀
}
});
// 继续读取流
processSSEStream();
}).catch(error => {
console.error('Error reading SSE stream', error);
});
}
// 开始处理SSE流
processSSEStream();
} else {
throw new Error(`Unsupported content type: ${contentType}`);
}
})
.catch(error => {
console.error('Fetch error:', error);
});
前端(.Net 控制台)
nuget 搜索安装-> 3v.EvtSource
using EvtSource;
var url = "http://localhost:5260/api/AI/GetMessage"; // 替换为你的SSE服务地址
var evt = new EventSourceReader(new Uri(url)).Start();
evt.MessageReceived += (object sender, EventSourceMessageEventArgs e) => Console.WriteLine($"{e.Event} : {e.Message}");
evt.Disconnected += async (object sender, DisconnectEventArgs e) =>
{
Console.WriteLine($"Retry: {e.ReconnectDelay} - Error: {e.Exception}");
await Task.Delay(e.ReconnectDelay);
if (!evt.IsDisposed)
{
evt.Start(); // Reconnect to the same URL
}
};
Task.Run(() =>
{
Task.Delay(4000).GetAwaiter().GetResult();
evt.Dispose();
});
Console.ReadKey();
前端(JS, 服务端接口为post请求时)
@microsoft/fetch-event-source 是一个由微软提供的库,用于在客户端和服务器之间建立基于 EventSource 的连接。EventSource 是一种 HTTP 协议,允许服务器向客户端推送实时事件流。该库提供了对 EventSource 协议的封装,使得在前端 JavaScript 中使用 EventSource 变得更加方便。
GitHub地址: https://github.com/Azure/fetch-event-source
在 @microsoft/fetch-event-source 中,主要使用 fetchEventSource 函数来创建一个新的 EventSource 连接。这个函数接受一个 URL 参数,以及一个配置对象,其中可以包含一些选项,如请求方法、请求头、请求体等。当服务器向客户端推送事件时,可以通过 onmessage 回调函数来处理这些事件。此外,还可以提供 onerror 和 onclose 回调函数来处理连接错误和关闭事件
安装
pnpm install @microsoft/fetch-event-source
使用
// 测试前端SSE调用 import { fetchEventSource } from '@microsoft/fetch-event-source' const testSSE = () => { const OPENAI_API_KEY = 'YOUR_OPENAI_API_KEY' const OPENAI_COMPLETION_ENDPOINT = 'https://api.openai.com/v1/chat/completions' const requestData = { model: 'gpt-3.5-turbo', messages: [ { role: 'user', content: '我想去西安旅游7天' } ], stream: true } let respString = '' fetchEventSource(OPENAI_COMPLETION_ENDPOINT, { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${OPENAI_API_KEY}`, }, body: JSON.stringify(requestData), async onopen(response) { if (response.ok && response.headers.get('content-type') === 'text/event-stream') { // everything's good console.log('everything's good') } else if (response.status >= 400 && response.status < 500 && response.status !== 429) { console.log('请求错误') } else { console.log('其他错误') } }, async onmessage(event) { // 表示整体结束 if (event.data === '[DONE]') { console.log('结束') return } const jsonData = JSON.parse(event.data) // 如果等于stop表示结束 if (jsonData.choices[0].finish_reason === 'stop') { return } // 判断role存在,进行排除 if (jsonData.choices[0].delta.role !== undefined) { respString = jsonData.choices[0].delta.role + ': ' return } if (jsonData.choices[0].delta.content !== undefined) { respString += jsonData.choices[0].delta.content console.log(respString) } }, async onerror(error) { console.error('Error:', error) }, async onclose() { // if the server closes the connection unexpectedly, retry: console.log('关闭连接') } }) console.log('测试SSE') }
前端(.Net 控制台, 服务端接口为post请求时)
// 引入 https://github.com/lknbv5/EvtSource 库,(3v.EvtSource的改进版本)
using EvtSource;
var url = "http://10.8.2.251:3001/v1/chat/completions"; // 替换为你的SSE服务地址
Console.Write("提问:");
while (true)
{
var a = Console.ReadLine();
if (!string.IsNullOrWhiteSpace(a))
{
var data = $@"{{
""model"":""qwen-long"",
""messages"": [
{{ ""role"": ""system"", ""content"": ""用中文回答用户的问题"" }},
{{ ""role"": ""user"", ""content"": ""{a}""}}
],
""temperature"": 0.7,
""stream"": true
}}";
var content = new StringContent(data, Encoding.UTF8, "application/json");
var evt = new EventSourceReader(new Uri(url), content, ("Authorization", "Bearer sk-Z2egX6DGBQ6BcRN19fBf460d192045869d6b1c60867d4e2b")).Start();
Console.WriteLine("思考中...请稍后");
var isfirst = 0;
evt.MessageReceived += (object sender, EventSourceMessageEventArgs e) =>
{
if (isfirst == 0)
{
Console.Write("回答:");
isfirst = 1;
}
if (e.Message == "[DONE]")
{
Console.WriteLine("\n");
Console.Write("提问:");
}
else
{
var message = JObject.Parse(e.Message)["choices"][0]["delta"]["content"].ToString();
Thread.Sleep(100);
Console.Write(message);
}
};
}
}