Event Source,即Server-sent events,主要优点是基于HTTP协议进行长连接,除了在网页里用JavaScript可以接收消息,其他的应用只要支持HTTP协议都没问题;而RabbitMQ完整实现了AMQP协议,即Advanced Message Queuing Protocol,非常成熟稳定。本文所谓的“代替”,适合的场景非常有限,如果要追求消息推送的稳定可靠,还是建议直接使用RabbitMQ。
EventSource和RabbitMQ的比较
注意,这里的比较,是限定在服务端推送消息到客户端这个使用场景。
EventSource
优点: 基于http,可以支持web网页,可以支持大量终端连接(扩展应用实例的数量即可),集成账号的权限。
缺点: 新技术风险(是否支持代理转发?推断是“可以”,待实测);终端是长连接到分布式部署的应用实例上,消息推送代码逻辑会变复杂,增加了应用实例的负载。
RabbitMQ
优点: 客户端直接连接RabbitMQ,充分利用RabbitMQ的处理能力,成熟技术。
缺点: 不支持Web网页;终端增加后,RabbitMQ可能会成为瓶颈;RabbitMQ的密码记录在终端,有安全风险。
Spring Boot推送EventSource(Server-sent events) 方式一:DeferredResult Controller 1 2 3 4 5 6 7 8 9 10 11 @GetMapping(value = "/msg", produces = "text/event-stream;charset=UTF-8") public DeferredResult<String> msg (HttpServletRequest request, HttpServletResponse response) throws IOException { response.setCharacterEncoding(StandardCharsets.UTF_8.name()); response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE); response.setStatus(HttpServletResponse.SC_OK); msgService.removeErrorResponse(); msgService.getResponseList().add(response); return new DeferredResult <>(0L ); }
Service 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 @Lazy(value = false) @Service public class MsgService implements ApplicationListener <ContextClosedEvent> { private final CopyOnWriteArrayList<HttpServletResponse> responseList = new CopyOnWriteArrayList <>(); public CopyOnWriteArrayList<HttpServletResponse> getResponseList () { return this .responseList; } public synchronized void removeErrorResponse () { for (HttpServletResponse response : this .responseList) { try { PrintWriter writer = response.getWriter(); if (writer == null ) { this .responseList.remove(response); continue ; } else if (writer.checkError()) { writer.close(); this .responseList.remove(response); continue ; } } catch (Exception e) { e.printStackTrace(); this .responseList.remove(response); } } } @Scheduled(initialDelay = 0, fixedDelay = 3 * 1000) public void run () { this .removeErrorResponse(); Iterator<HttpServletResponse> iterator = responseList.iterator(); Random rand = new Random (); int num = rand.nextInt(100 ); while (iterator.hasNext()) { PrintWriter writer; try { writer = iterator.next().getWriter(); if (writer == null || writer.checkError()) { continue ; } writer.write("data:msg: hello, the random num is: " + num + "\n\n" ); writer.flush(); } catch (Exception e) { e.printStackTrace(); } } } @Override public void onApplicationEvent (ContextClosedEvent event) { for (HttpServletResponse response : this .responseList) { try { PrintWriter writer = response.getWriter(); if (writer != null ) { writer.flush(); writer.close(); } this .responseList.remove(response); } catch (Exception e) { e.printStackTrace(); } } } }
方式二:SseEmitter Controller 1 2 3 4 5 6 7 @GetMapping("/stream-sse") public SseEmitter streamSseMvc (HttpServletRequest request) { SseEmitter emitter = new SseEmitter (0L ); emitter.send(SseEmitter.event().name("connected" ).data("connected" )); sseService.getEmitterList().add(emitter); return emitter; }
Service 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Lazy(value = false) @Service public class SseService implements ApplicationListener <ContextClosedEvent> { private final Logger logger = LoggerFactory.getLogger(this .getClass()); private final CopyOnWriteArrayList<SseEmitter> emitterList = new CopyOnWriteArrayList <>(); public CopyOnWriteArrayList<SseEmitter> getEmitterList () { return this .emitterList; } @Scheduled(initialDelay = 0, fixedDelay = 3 * 1000) public void run () { Random rand = new Random (); int num = rand.nextInt(100 ); for (SseEmitter emitter : this .emitterList) { try { emitter.send("data:msg: hello, the random num is: " + num + "\n\n" ); } catch (Exception e) { if (e instanceof IllegalStateException || e instanceof IOException) { logger.info("error: {}" , e.getMessage()); emitter.completeWithError(e); this .emitterList.remove(emitter); } else { e.printStackTrace(); } } } } @Override public void onApplicationEvent (ContextClosedEvent event) { for (SseEmitter emitter : this .emitterList) { try { emitter.complete(); } catch (Exception ex) { ex.printStackTrace(); } } this .emitterList.clear(); } }
C#客户端接收消息 C#客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 _cancellationTokenSource = new CancellationTokenSource(); await Task.Run(async () =>{ using (var client = new HttpClient()) { var request = new HttpRequestMessage(HttpMethod.Get, "http://localhost/stream-sse" ); request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream" )); using (var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead)) { response.EnsureSuccessStatusCode(); using (var stream = await response.Content.ReadAsStreamAsync()) using (var reader = new StreamReader(stream)) { while (!reader.EndOfStream) { if (_cancellationTokenSource.IsCancellationRequested) { break ; } var line = await reader.ReadLineAsync(); if (string .IsNullOrEmpty(line)) continue ; if (line.StartsWith("data:" )) { var data = line.Substring("data:" .Length); Console.WriteLine(data); } } } } } }, _cancellationTokenSource.Token);
扩展优化以及填坑实录 如何设置一个合适的超时时间? 在前面的示例代码里,不管是DeferredResult还是SseEmitter,在新建实例设置超时参数时,我们都是给的0(永不超时)。在实际案例里,可以结合当前用户的会话来设置超时,比如根据会话超时时间或者JWT(Json Web Token)的超时时间来设置,这样可以及时进行资源的释放。
如何解决Nginx代理的问题? 在前面分析Server-sent events这个技术方案时,就有担心是否能通过代理的问题。实测的情况来看,有Nginx时,确实会受到影响。
好在Nginx非常成熟,这些问题早有解决方案。
客户端无法建立长连接 Nginx使用自定义Header“X-Accel-Buffering”来设置连接的代理缓存,no适用于Comet和HTTP流式应用程序的无缓冲响应,yes缓存响应,默认yes。
所以较合理的解决方案是在Spring Boot服务端建立连接时设置一下这个Header,而不是去修改Nginx的proxy_buffering
为off
,那是全局设置,影响所有的连接。
1 2 3 4 5 @GetMapping("/stream-sse") public SseEmitter streamSse (HttpServletRequest request, HttpServletResponse response) { response.setHeader("X-Accel-Buffering" , "no" ); ... }
502 Bad Gateway(错误的网关) Server-sent events的长连接,会让Nginx和Spring Boot应用的连接超限,表现出来的现象就是客服端请求时报502错误。
设置Nginx的最大连接数
Nginx的最大连接数受限于系统的最大打开文件数,如果系统的最大打开文件数小于Nginx的最大连接数,也会出现502错误。所以,在调整Nginx的最大连接数之前,先调整系统的最大打开文件数。
nginx.conf 1 2 3 4 5 6 7 8 # 进程最大可打开文件数 worker_rlimit_nofile 65535; events { ... # 设置Nginx的最大连接数 worker_connections 65535; ... }
核实配置是否生效(先获取nginx worker的进程id,然后查看该id的limits):
1 2 3 4 5 6 7 8 9 10 11 $ ps -aux|grep nginx ... www-data 868753 0.0 1.3 107728 53208 ? S May04 0:13 nginx: worker process www-data 868754 0.0 1.0 96876 42040 ? S May04 0:00 nginx: worker process ... $ cat /proc/868753/limits Limit Soft Limit Hard Limit Units ... Max open files 65535 65535 files ...
让Nginx更好地支持长连接 可以针对Server-sent events的请求,设置一些Nginx的参数,让Nginx更好地支持长连接。
nginx.conf 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 server { ... location /your-path/sse { proxy_pass http://your-apps; # 设置HTTP协议版本为1.1,该协议的TCP连接默认是keepalive的,不需要再设置`proxy_set_header Connection 'keep-alive'`。 proxy_http_version 1.1; proxy_set_header Connection ''; chunked_transfer_encoding off; proxy_buffering off; proxy_cache off; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # 等待后端服务器响应的时间,单位为秒,默认值60。 proxy_read_timeout 360; } ... }
让Spring Boot应用更好地支持长连接 Undertow的worker线程默认是CPU内核数乘以8,可以调整为更大的值,来应对Server-sent events建立的长连接比较多的情况:
application.yml 1 spring.undertow.threads.worker: 256
数据库连接池溢出 在使用Server-sent events进行消息推送之后,服务端运行一段时间就会报下述异常:
1 2 3 HikariPool-1 - Connection is not available, request timed out after 6001ms. ... [org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection]
这就是所谓的“数据库连接池溢出”了,查了一下数据库的连接数监控,确实已经满载:
上图中间的波谷是重启应用的现象,应用刚启动的时候数据库连接数量比较低,然后很快就达到连接池的上限,这个时候再有获取连接的请求就会报错。
这个问题非常严重,不解决的话服务端很快就会因为连接池溢出而无法正常提供服务。
这个问题也比较隐蔽,不容易排查。
代码中是不是忘了释放连接? 最初是指点打点,既然“连接池溢出”,那就是代码哪里写得不科学,没有及时释放连接了。
在Server-sent events这部分,就是设置SseEmitter的超时,是根据当前用户的会话超时来设置的,查询了一下当前用户的登录日志。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public Long getExpiredSeconds (String jwtId) { if (StringUtils.hasText(jwtId)) { Optional<LoginUser> optional = loginUserRepository.findById(jwtId); if (optional.isPresent()) { LoginUser loginUser = optional.get(); if (dLoginUser.getExpireTime() != null ) { return (loginUser.getExpireTime().getTime() - System.currentTimeMillis()) / 1000 ; } } } return 0L ; }
仔细核查了新增的代码,都是典型的JPA数据操作,没有涉及到数据库连接的管理,所以也就不存在连接释放的问题。
SseEmitter的使用方式不当? 既然不是忘了释放连接,那是不是SseEmitter的使用方式需要注意什么呢?查了Spring framework的官方文档,也没有特殊的说明。
ResponseBodyEmitter
SseEmitter是对ResponseBodyEmitter的扩展。
使用leak-detection-threshold定位没有释放连接的代码 HikariCP连接池配置的leak-detection-threshold参数,可以用来检测连接是否及时释放。大致的逻辑是应用在从连接池获取连接时,就根据leak-detection-threshold这个参数,来设置一个定时检测,如果时间到了连接还没有释放就触发一个异常。
所以这个参数平时不用打开,徒增服务端对CPU和内存的消耗而已。
尽管只是触发异常,不会执行其他操作,比如去关闭连接,但是参数设置之后,对连接池溢出的排查很有用,让我们准确定位到问题所在。没有释放连接的代码,就是getExpiredSeconds这个方法。
解决问题(open-in-view) 定位到问题之后,我们先注释掉这个查询操作,验证一下是不是它的原因。从数据库监控来看,不进行查询操作之后,即不调用getExpiredSeconds,确实没有连接释放方面的问题了。
定位到问题所在,就比较好解决了。
比较简单粗暴的方案,就是设置固定的SseEmitter超时参数,不在Controller方法里执行数据库操作。
但是排查到这里,我们已经可以总结出来了,SseEmitter是长连接,所以数据的连接也一直Hold住了?也就是说Spring framework为这个request自动开了一个连接,直到request结束才关闭?
带着这个现象和疑惑,最后发现可以通过spring JPA的这个设置来解决问题: spring.jpa.open-in-view
1 Register OpenEntityManagerInViewInterceptor. Binds a JPA EntityManager to the thread for the entire processing of the request.
大致意思是open-in-view设置为true时,Spring Framework会注册一个EntityManage拦截器,这个拦截器在判断到request中有JPA操作时,就为这个请求绑定一个EntityManager,并且在request的整个处理过程中一直Hold(维持)住这个EntityManage不释放,默认值是true。
这就跟我们这个案例对上了,Server-sent events是长连接,所以为这个连接绑定的EntityManager就不会关闭,一直占住,这就完全失去连接池的意义了,没有共享只有独占。
所以折腾这么久,问题的解决很简单,修改Spring Boot的配置:
application.yml 1 spring.jpa.open-in-view: false