microblog | 微博客
原创
访问
0
获赞
0
评论
相关推荐
暂无数据
最新文章
暂无数据
热门文章
暂无数据

小心!Spring WebFlux 的 ServerSentEventHttpMessageWriter并不是那么可靠!!!

写完bug就找女朋友 06月06日 11:19:35 1 19 0
分类专栏: Java SpringBoot JavaWeb 文章标签: SSE Flux

一、前言

      最近在做一个小项目,需要前后端进行对话交互,为了轻量化选择了因此SSE(Server-Sent Events,服务器发送事件)作为前后端交互的桥梁。前后端约定返回的消息类型有:

  • event:消息的状态(contentcompleteerror),由此来分辨后端返回的消息是否有异常
  • content:消息的内容

      后端返回格式大致如下:

@PostMapping(value = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE) public Flux<ServerSentEvent<String>> chat(@RequestBody Message msg) { return service.chat(msg); }
@Override public Flux<ServerSentEvent<String>> chat(MessageReq dto) { StringBuffer msgBuilder = new StringBuffer(); AtomicReference<String> sourceIds = new AtomicReference<>(); return buildChatFlux(dto) //数据源 .filter(res -> res != null && !res.isEmpty()) //过滤 .map(res -> { //转换 Map<String, Object> map = JsonUtil.stringToMap(res); Object content = map.getOrDefault("data", ""); ArrayList<String> quoteList= (ArrayList<String>) map.getOrDefault("quoteList",new ArrayList<>()); return Tuples.of(content, sourceIdList); }) .flatMap(tuple -> { //业务逻辑处理 String content = String.valueOf(tuple.getT1()); List<String> sourceIdList = tuple.getT2(); List<ServerSentEvent<String>> events = new ArrayList<>(); if (!sourceIdList.isEmpty()) { List<TextRecordVO> quoteList = recordMapper.listByIds(sourceIdList); String quoteJson = JsonUtil.beanToString(quoteList); events.add(ServerSentEvent.builder(quoteJson).event("quoteList").build()); } if (!content.isEmpty()) { events.add(ServerSentEvent.builder(content).event("content").build()); msgBuilder.append(content); } return Flux.fromIterable(events); }) .concatWith(Flux.defer(() -> doFinish(msgBuilder, sourceIds.get(), dto))) .onErrorResume(ex -> { log.error("GPT调用出现错误,原因:", ex); return Flux.just(ServerSentEvent.builder(ERROR_MSG).event("error").build()); }) .subscribeOn(Schedulers.boundedElastic()); }

      前端大致如下:

port.onMessage.addListener((msg) => { const data = msg.result; console.log(data); let event = data.event; switch (event) { case "content": this.msg += data.data; break; case "complete": this.loading = false; this.isShowClose = false; break; default: break; } this.scrollToBottom(); });

二、问题描述

      上线后前端收到的消息偶尔会丢失data.event属性导致消息缺失。

三、解决思路(可能出现问题的地方)

  1. 前端解析异常
  2. 后端GPT返回消息时出现异常
  3. 后端处理GPT返回的消息、准备发送给前端时出现异常
  4. 后端发送消息给前端过程中出现数据丢失

      通过后端打印接收到的GPT返回的消息、经过后端处理后准备发送给前端时的消息我们最终发现:

      后端接受GPT返回的消息、经过处理包装以后准备返回给前端这个过程中都没有问题,也就是上述前面3个方向都没有问题,最后我们锁定了第4条 后端发送消息给前端过程中出现数据丢失

@PostMapping(value = "/figuresEvent", produces = MediaType.TEXT_EVENT_STREAM_VALUE) publiclux<ServerSentEvent<String>> eventGeneration(@RequestBody MessageReq msg) { return gptService.figuresEvent(msg) .doOnNext(event -> { String eventName = event.event(); String eventData = event.data(); log.info(" SSE Event \n {} \n",event); log.info("event : " + (eventName != null ? eventName : "null")); log.info("data : " + (eventData != null ? eventData : "null")); log.info("-----------------------------\n"); }) ; }

      经过上述的思考以及验证之后,我们最终把目光锁定在了ServerSentEvent.builder(content).event("content").build() 这一行,我们怀疑Spring WebFlux 的 ServerSentEventHttpMessageWriter在处理我们的消息的时候偶尔丢失了我们的数据。为了验证我们的猜想,我们对SSE进行了更加深入地了解,最终验证了我们的猜想:Spring WebFlux 的 ServerSentEventHttpMessageWriter确实不是100%不可靠

      关于 Spring WebFlux 的 ServerSentEventHttpMessageWriter 可能被认为是不可靠的原因,主要集中在几个方面:

  1. 事件类型(event type)丢失问题:在某些情况下,ServerSentEventHttpMessageWriter 可能不会正确地输出所有的 SSE 事件字段。例如,event 字段有时可能不会被写入响应流中,导致前端只能接收到 data 字段而丢失了 event 类型信息。这使得客户端难以区分不同类型的事件。
  2. 兼容性与版本问题:Spring 框架的不同版本之间可能存在不一致的行为,特别是在处理复杂的响应类型时。如果开发者依赖于特定行为,而在升级 Spring 版本后发现这种行为发生了改变,可能会遇到意外的问题。
  3. 自定义需求的支持有限:默认的 ServerSentEventHttpMessageWriter 提供的功能可能不足以满足所有自定义需求,比如添加额外的字段(如 id, retry 等),或者对如何格式化每个事件有更多的控制要求。
  4. 错误处理和调试难度:当使用 ServerSentEventHttpMessageWriter 遇到问题时,可能较难进行调试,因为它的实现细节对于大多数用户来说是隐藏的。如果出现异常情况,理解具体哪里出了问题可能需要深入研究框架内部的工作原理。

四、解决方案 

      经过上述的猜想与验证以后,解决问题的答案已经呼之欲出了:我们可以自己构造一个包含eventdata的对象,通过封装以后把这个对象直接返回给前端,再由前端解析、展示:

SseMessage对象:

import lombok.Data; import lombok.NoArgsConstructor; /** * SSE数据对象 */ @Data @NoArgsConstructor public class SseMessage { private String data; private String event; public SseMessage(String data,String event) { this.data = data; this.event = event; } }

chat方法:

@Override @SuppressWarnings("unchecked") public Flux<String> chat(MessageReq dto) { StringBuffer msgBuilder = new StringBuffer(); AtomicReference<String> quoteList = new AtomicReference<>(); return buildChatFlux(dto) //数据源 .filter(res -> res != null && !res.isEmpty()) //过滤 .map(res -> { //转换 Map<String, Object> map = JsonUtil.stringToMap(res); Object content = map.getOrDefault("data", ""); ArrayList<String> sourceIdList = (ArrayList<String>) map.getOrDefault( "quoteList", new ArrayList<>()); return Tuples.of(content, sourceIdList); }) .flatMap(tuple -> { //业务逻辑处理 String content = String.valueOf(tuple.getT1()); List<String> sourceIdList = tuple.getT2(); Flux<String> quoteFlux = null; if (!sourceIdList.isEmpty()) { List<TextRecordVO> quoteList = recordMapper.listByIds(sourceIdList); String quoteJson = JsonUtil.beanToString(quoteList); quoteFlux = Flux.just(packSSEMessage(quoteJson, "quoteList")); } if (!content.isEmpty()) { msgBuilder.append(content); } Flux<String> contentFlux = Flux.just(packSSEMessage(content, "content")); if (quoteFlux == null) return contentFlux; return contentFlux.concatWith(quoteFlux); }) .concatWith(Flux.defer(() -> doFinish(msgBuilder, quoteList.get(), dto))) .onErrorResume(ex -> { log.error("GPT调用出现错误,原因:", ex); return Flux.just(packSSEMessage(ERROR_MSG, "error")); }) .subscribeOn(Schedulers.boundedElastic()); }

packSSEMessage消息包装方法:

private String packSSEMessage(String content, String event) { try { SseMessage sse = new SseMessage(content, event); return new ObjectMapper().writeValueAsString(sse); } catch (JsonProcessingException e) { throw new RuntimeException(e); } }

      通过上述调整以后再由前端通过json解析,我们就能保证EES在进行数据传输的时候,不会丢失必要属性。



评论区

登录后参与交流、获取后续更新提醒

目录
暂无数据