一、前言
最近在做一个小项目,需要前后端进行对话交互,为了轻量化选择了因此SSE(Server-Sent Events
,服务器发送事件)作为前后端交互的桥梁。前后端约定返回的消息类型有:
event
:消息的状态(content
、complete
、error
),由此来分辨后端返回的消息是否有异常content
:消息的内容
后端返回格式大致如下:
@PostMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE, consumes =
MediaType.APPLICATION_JSON_VALUE)
public Flux<ServerSentEvent<String>> chat(@RequestBody Message msg) {
return service.chat(msg);
}
@Override
public Flux<ServerSentEvent<String>> chat(MessageReq dto) {
StringBuffer msgBuilder = new StringBuffer();
AtomicReference<String> sourceIds = new AtomicReference<>();
return buildChatFlux(dto) //数据源
.filter(res -> res != null && !res.isEmpty()) //过滤
.map(res -> { //转换
Map<String, Object> map = JsonUtil.stringToMap(res);
Object content = map.getOrDefault("data", "");
ArrayList<String> quoteList= (ArrayList<String>) map.getOrDefault("quoteList",new ArrayList<>());
return Tuples.of(content, sourceIdList);
})
.flatMap(tuple -> { //业务逻辑处理
String content = String.valueOf(tuple.getT1());
List<String> sourceIdList = tuple.getT2();
List<ServerSentEvent<String>> events = new ArrayList<>();
if (!sourceIdList.isEmpty()) {
List<TextRecordVO> quoteList = recordMapper.listByIds(sourceIdList);
String quoteJson = JsonUtil.beanToString(quoteList);
events.add(ServerSentEvent.builder(quoteJson).event("quoteList").build());
}
if (!content.isEmpty()) {
events.add(ServerSentEvent.builder(content).event("content").build());
msgBuilder.append(content);
}
return Flux.fromIterable(events);
})
.concatWith(Flux.defer(() -> doFinish(msgBuilder, sourceIds.get(), dto)))
.onErrorResume(ex -> {
log.error("GPT调用出现错误,原因:", ex);
return Flux.just(ServerSentEvent.builder(ERROR_MSG).event("error").build());
})
.subscribeOn(Schedulers.boundedElastic());
}
前端大致如下:
port.onMessage.addListener((msg) => {
const data = msg.result;
console.log(data);
let event = data.event;
switch (event) {
case "content":
this.msg += data.data;
break;
case "complete":
this.loading = false;
this.isShowClose = false;
break;
default:
break;
}
this.scrollToBottom();
});
二、问题描述
上线后前端收到的消息偶尔会丢失data.event
属性导致消息缺失。
三、解决思路(可能出现问题的地方)
- 前端解析异常
- 后端GPT返回消息时出现异常
- 后端处理GPT返回的消息、准备发送给前端时出现异常
- 后端发送消息给前端过程中出现数据丢失
通过后端打印接收到的GPT返回的消息、经过后端处理后准备发送给前端时的消息我们最终发现:
后端接受GPT返回的消息、经过处理包装以后准备返回给前端这个过程中都没有问题,也就是上述前面3个方向都没有问题,最后我们锁定了第4条 后端发送消息给前端过程中出现数据丢失
:
@PostMapping(value = "/figuresEvent", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
publiclux<ServerSentEvent<String>> eventGeneration(@RequestBody MessageReq msg) {
return gptService.figuresEvent(msg)
.doOnNext(event -> {
String eventName = event.event();
String eventData = event.data();
log.info(" SSE Event \n {} \n",event);
log.info("event : " + (eventName != null ? eventName : "null"));
log.info("data : " + (eventData != null ? eventData : "null"));
log.info("-----------------------------\n");
})
;
}
经过上述的思考以及验证之后,我们最终把目光锁定在了ServerSentEvent.builder(content).event("content").build()
这一行,我们怀疑Spring WebFlux 的 ServerSentEventHttpMessageWriter在处理我们的消息的时候偶尔丢失了我们的数据。为了验证我们的猜想,我们对SSE进行了更加深入地了解,最终验证了我们的猜想:Spring WebFlux 的 ServerSentEventHttpMessageWriter确实不是100%不可靠:
关于 Spring WebFlux 的 ServerSentEventHttpMessageWriter
可能被认为是不可靠的原因,主要集中在几个方面:
- 事件类型(event type)丢失问题:在某些情况下,
ServerSentEventHttpMessageWriter
可能不会正确地输出所有的 SSE 事件字段。例如,event
字段有时可能不会被写入响应流中,导致前端只能接收到data
字段而丢失了event
类型信息。这使得客户端难以区分不同类型的事件。 - 兼容性与版本问题:Spring 框架的不同版本之间可能存在不一致的行为,特别是在处理复杂的响应类型时。如果开发者依赖于特定行为,而在升级 Spring 版本后发现这种行为发生了改变,可能会遇到意外的问题。
- 自定义需求的支持有限:默认的
ServerSentEventHttpMessageWriter
提供的功能可能不足以满足所有自定义需求,比如添加额外的字段(如id
,retry
等),或者对如何格式化每个事件有更多的控制要求。 - 错误处理和调试难度:当使用
ServerSentEventHttpMessageWriter
遇到问题时,可能较难进行调试,因为它的实现细节对于大多数用户来说是隐藏的。如果出现异常情况,理解具体哪里出了问题可能需要深入研究框架内部的工作原理。
四、解决方案
经过上述的猜想与验证以后,解决问题的答案已经呼之欲出了:我们可以自己构造一个包含event
和data
的对象,通过封装以后把这个对象直接返回给前端,再由前端解析、展示:
SseMessage
对象:
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* SSE数据对象
*/
@Data
@NoArgsConstructor
public class SseMessage {
private String data;
private String event;
public SseMessage(String data,String event) {
this.data = data;
this.event = event;
}
}
chat方法
:
@Override
@SuppressWarnings("unchecked")
public Flux<String> chat(MessageReq dto) {
StringBuffer msgBuilder = new StringBuffer();
AtomicReference<String> quoteList = new AtomicReference<>();
return buildChatFlux(dto) //数据源
.filter(res -> res != null && !res.isEmpty()) //过滤
.map(res -> { //转换
Map<String, Object> map = JsonUtil.stringToMap(res);
Object content = map.getOrDefault("data", "");
ArrayList<String> sourceIdList = (ArrayList<String>) map.getOrDefault(
"quoteList", new ArrayList<>());
return Tuples.of(content, sourceIdList);
})
.flatMap(tuple -> { //业务逻辑处理
String content = String.valueOf(tuple.getT1());
List<String> sourceIdList = tuple.getT2();
Flux<String> quoteFlux = null;
if (!sourceIdList.isEmpty()) {
List<TextRecordVO> quoteList = recordMapper.listByIds(sourceIdList);
String quoteJson = JsonUtil.beanToString(quoteList);
quoteFlux = Flux.just(packSSEMessage(quoteJson, "quoteList"));
}
if (!content.isEmpty()) {
msgBuilder.append(content);
}
Flux<String> contentFlux = Flux.just(packSSEMessage(content, "content"));
if (quoteFlux == null) return contentFlux;
return contentFlux.concatWith(quoteFlux);
})
.concatWith(Flux.defer(() -> doFinish(msgBuilder, quoteList.get(), dto)))
.onErrorResume(ex -> {
log.error("GPT调用出现错误,原因:", ex);
return Flux.just(packSSEMessage(ERROR_MSG, "error"));
})
.subscribeOn(Schedulers.boundedElastic());
}
packSSEMessage
消息包装方法:
private String packSSEMessage(String content, String event) {
try {
SseMessage sse = new SseMessage(content, event);
return new ObjectMapper().writeValueAsString(sse);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
通过上述调整以后再由前端通过json解析,我们就能保证EES在进行数据传输的时候,不会丢失必要属性。