在桌面应用里使用EventSource来代替直连RabbitMQ

Event Source,即Server-sent events,主要优点是基于HTTP协议进行长连接,除了在网页里用JavaScript可以接收消息,其他的应用只要支持HTTP协议都没问题;而RabbitMQ完整实现了AMQP协议,即Advanced Message Queuing Protocol,非常成熟稳定。本文所谓的“代替”,适合的场景非常有限,如果要追求消息推送的稳定可靠,还是建议直接使用RabbitMQ。

EventSource和RabbitMQ的比较

注意,这里的比较,是限定在服务端推送消息到客户端这个使用场景。

EventSource

  • 优点: 基于http,可以支持web网页,可以支持大量终端连接(扩展应用实例的数量即可),集成账号的权限。
  • 缺点: 新技术风险(是否支持代理转发?推断是“可以”,待实测);终端是长连接到分布式部署的应用实例上,消息推送代码逻辑会变复杂,增加了应用实例的负载。

RabbitMQ

  • 优点: 客户端直接连接RabbitMQ,充分利用RabbitMQ的处理能力,成熟技术。
  • 缺点: 不支持Web网页;终端增加后,RabbitMQ可能会成为瓶颈;RabbitMQ的密码记录在终端,有安全风险。

Spring Boot推送EventSource(Server-sent events)

方式一:DeferredResult

Controller
1
2
3
4
5
6
7
8
9
10
11
@GetMapping(value = "/msg", produces = "text/event-stream;charset=UTF-8")
public DeferredResult<String> msg(HttpServletRequest request, HttpServletResponse response) throws IOException {
response.setCharacterEncoding(StandardCharsets.UTF_8.name());
response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);
response.setStatus(HttpServletResponse.SC_OK);

msgService.removeErrorResponse();
msgService.getResponseList().add(response);

return new DeferredResult<>(0L);
}
Service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Lazy(value = false)
@Service
public class MsgService implements ApplicationListener<ContextClosedEvent> {
private final CopyOnWriteArrayList<HttpServletResponse> responseList = new CopyOnWriteArrayList<>();

public CopyOnWriteArrayList<HttpServletResponse> getResponseList() {
return this.responseList;
}

public synchronized void removeErrorResponse() {
for (HttpServletResponse response : this.responseList) {
try {
PrintWriter writer = response.getWriter();
if (writer == null) {
this.responseList.remove(response);
continue;
} else if (writer.checkError()) {
writer.close();
this.responseList.remove(response);
continue;
}
} catch (Exception e) {
e.printStackTrace();
this.responseList.remove(response);
}
}
}

@Scheduled(initialDelay = 0, fixedDelay = 3 * 1000)
public void run() {
this.removeErrorResponse();
Iterator<HttpServletResponse> iterator = responseList.iterator();
Random rand = new Random();
int num = rand.nextInt(100);
while (iterator.hasNext()) {
PrintWriter writer;
try {
writer = iterator.next().getWriter();
if (writer == null || writer.checkError()) {
continue;
}
writer.write("data:msg: hello, the random num is: " + num + "\n\n");
writer.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
}

/**
* 清理
* 不适合用@PreDestroy,客户端长连接会导致一直“Waiting for active requests to complete”,直到超时
*
* @param event
*/
@Override
public void onApplicationEvent(ContextClosedEvent event) {
for (HttpServletResponse response : this.responseList) {
try {
PrintWriter writer = response.getWriter();
if (writer != null) {
writer.flush();
writer.close();
}
this.responseList.remove(response);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

方式二:SseEmitter

Controller
1
2
3
4
5
6
7
@GetMapping("/stream-sse")
public SseEmitter streamSseMvc(HttpServletRequest request) {
SseEmitter emitter = new SseEmitter(0L);
emitter.send(SseEmitter.event().name("connected").data("connected"));
sseService.getEmitterList().add(emitter);
return emitter;
}
Service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Lazy(value = false)
@Service
public class SseService implements ApplicationListener<ContextClosedEvent> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final CopyOnWriteArrayList<SseEmitter> emitterList = new CopyOnWriteArrayList<>();

public CopyOnWriteArrayList<SseEmitter> getEmitterList() {
return this.emitterList;
}

@Scheduled(initialDelay = 0, fixedDelay = 3 * 1000)
public void run() {
Random rand = new Random();
int num = rand.nextInt(100);
for (SseEmitter emitter : this.emitterList) {
try {
emitter.send("data:msg: hello, the random num is: " + num + "\n\n");
} catch (Exception e) {
if (e instanceof IllegalStateException || e instanceof IOException) {
logger.info("error: {}", e.getMessage());
emitter.completeWithError(e);
this.emitterList.remove(emitter);
} else {
e.printStackTrace();
}
}
}
}

@Override
public void onApplicationEvent(ContextClosedEvent event) {
for (SseEmitter emitter : this.emitterList) {
try {
emitter.complete();
} catch (Exception ex) {
ex.printStackTrace();
}
}
this.emitterList.clear();
}
}

C#客户端接收消息

C#客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
_cancellationTokenSource = new CancellationTokenSource();

await Task.Run(async () =>
{
using (var client = new HttpClient())
{
var request = new HttpRequestMessage(HttpMethod.Get, "http://localhost/stream-sse");
request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("text/event-stream"));

using (var response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead))
{
response.EnsureSuccessStatusCode();

using (var stream = await response.Content.ReadAsStreamAsync())
using (var reader = new StreamReader(stream))
{
while (!reader.EndOfStream)
{
if (_cancellationTokenSource.IsCancellationRequested)
{
break;
}

var line = await reader.ReadLineAsync();
if (string.IsNullOrEmpty(line)) continue;

if (line.StartsWith("data:"))
{
var data = line.Substring("data:".Length);
Console.WriteLine(data);
}
}
}
}
}
}, _cancellationTokenSource.Token);

扩展优化以及填坑实录

如何设置一个合适的超时时间?

在前面的示例代码里,不管是DeferredResult还是SseEmitter,在新建实例设置超时参数时,我们都是给的0(永不超时)。在实际案例里,可以结合当前用户的会话来设置超时,比如根据会话超时时间或者JWT(Json Web Token)的超时时间来设置,这样可以及时进行资源的释放。

如何解决Nginx代理的问题?

在前面分析Server-sent events这个技术方案时,就有担心是否能通过代理的问题。实测的情况来看,有Nginx时,确实会受到影响。

好在Nginx非常成熟,这些问题早有解决方案。

客户端无法建立长连接

Nginx使用自定义Header“X-Accel-Buffering”来设置连接的代理缓存,no适用于Comet和HTTP流式应用程序的无缓冲响应,yes缓存响应,默认yes。

所以较合理的解决方案是在Spring Boot服务端建立连接时设置一下这个Header,而不是去修改Nginx的proxy_bufferingoff,那是全局设置,影响所有的连接。

1
2
3
4
5
@GetMapping("/stream-sse")
public SseEmitter streamSse(HttpServletRequest request, HttpServletResponse response) {
response.setHeader("X-Accel-Buffering", "no");
...
}
502 Bad Gateway(错误的网关)

Server-sent events的长连接,会让Nginx和Spring Boot应用的连接超限,表现出来的现象就是客服端请求时报502错误。

设置Nginx的最大连接数

Nginx的最大连接数受限于系统的最大打开文件数,如果系统的最大打开文件数小于Nginx的最大连接数,也会出现502错误。所以,在调整Nginx的最大连接数之前,先调整系统的最大打开文件数。

nginx.conf
1
2
3
4
5
6
7
8
# 进程最大可打开文件数
worker_rlimit_nofile 65535;
events {
...
# 设置Nginx的最大连接数
worker_connections 65535;
...
}

核实配置是否生效(先获取nginx worker的进程id,然后查看该id的limits):

1
2
3
4
5
6
7
8
9
10
11
$ ps -aux|grep nginx
...
www-data 868753 0.0 1.3 107728 53208 ? S May04 0:13 nginx: worker process
www-data 868754 0.0 1.0 96876 42040 ? S May04 0:00 nginx: worker process
...

$ cat /proc/868753/limits
Limit Soft Limit Hard Limit Units
...
Max open files 65535 65535 files
...
让Nginx更好地支持长连接

可以针对Server-sent events的请求,设置一些Nginx的参数,让Nginx更好地支持长连接。

nginx.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
server {
...

location /your-path/sse {
proxy_pass http://your-apps;

# 设置HTTP协议版本为1.1,该协议的TCP连接默认是keepalive的,不需要再设置`proxy_set_header Connection 'keep-alive'`。
proxy_http_version 1.1;
proxy_set_header Connection '';

chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;

# 等待后端服务器响应的时间,单位为秒,默认值60。
proxy_read_timeout 360;
}

...
}
让Spring Boot应用更好地支持长连接

Undertow的worker线程默认是CPU内核数乘以8,可以调整为更大的值,来应对Server-sent events建立的长连接比较多的情况:

application.yml
1
spring.undertow.threads.worker: 256

数据库连接池溢出

在使用Server-sent events进行消息推送之后,服务端运行一段时间就会报下述异常:

1
2
3
HikariPool-1 - Connection is not available, request timed out after 6001ms.
...
[org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection]

这就是所谓的“数据库连接池溢出”了,查了一下数据库的连接数监控,确实已经满载:

上图中间的波谷是重启应用的现象,应用刚启动的时候数据库连接数量比较低,然后很快就达到连接池的上限,这个时候再有获取连接的请求就会报错。

这个问题非常严重,不解决的话服务端很快就会因为连接池溢出而无法正常提供服务。

这个问题也比较隐蔽,不容易排查。

代码中是不是忘了释放连接?

最初是指点打点,既然“连接池溢出”,那就是代码哪里写得不科学,没有及时释放连接了。

在Server-sent events这部分,就是设置SseEmitter的超时,是根据当前用户的会话超时来设置的,查询了一下当前用户的登录日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 获取令牌过期时间(秒)
*
* @param jwtId
* @return
*/
public Long getExpiredSeconds(String jwtId) {
if (StringUtils.hasText(jwtId)) {
Optional<LoginUser> optional = loginUserRepository.findById(jwtId);
if (optional.isPresent()) {
LoginUser loginUser = optional.get();
if (dLoginUser.getExpireTime() != null) {
return (loginUser.getExpireTime().getTime() - System.currentTimeMillis()) / 1000;
}
}
}

return 0L;
}

仔细核查了新增的代码,都是典型的JPA数据操作,没有涉及到数据库连接的管理,所以也就不存在连接释放的问题。

SseEmitter的使用方式不当?

既然不是忘了释放连接,那是不是SseEmitter的使用方式需要注意什么呢?查了Spring framework的官方文档,也没有特殊的说明。

ResponseBodyEmitter

SseEmitter是对ResponseBodyEmitter的扩展。

使用leak-detection-threshold定位没有释放连接的代码

HikariCP连接池配置的leak-detection-threshold参数,可以用来检测连接是否及时释放。大致的逻辑是应用在从连接池获取连接时,就根据leak-detection-threshold这个参数,来设置一个定时检测,如果时间到了连接还没有释放就触发一个异常。

所以这个参数平时不用打开,徒增服务端对CPU和内存的消耗而已。

尽管只是触发异常,不会执行其他操作,比如去关闭连接,但是参数设置之后,对连接池溢出的排查很有用,让我们准确定位到问题所在。没有释放连接的代码,就是getExpiredSeconds这个方法。

解决问题(open-in-view)

定位到问题之后,我们先注释掉这个查询操作,验证一下是不是它的原因。从数据库监控来看,不进行查询操作之后,即不调用getExpiredSeconds,确实没有连接释放方面的问题了。

定位到问题所在,就比较好解决了。

比较简单粗暴的方案,就是设置固定的SseEmitter超时参数,不在Controller方法里执行数据库操作。

但是排查到这里,我们已经可以总结出来了,SseEmitter是长连接,所以数据的连接也一直Hold住了?也就是说Spring framework为这个request自动开了一个连接,直到request结束才关闭?

带着这个现象和疑惑,最后发现可以通过spring JPA的这个设置来解决问题: spring.jpa.open-in-view

1
Register OpenEntityManagerInViewInterceptor. Binds a JPA EntityManager to the thread for the entire processing of the request.

大致意思是open-in-view设置为true时,Spring Framework会注册一个EntityManage拦截器,这个拦截器在判断到request中有JPA操作时,就为这个请求绑定一个EntityManager,并且在request的整个处理过程中一直Hold(维持)住这个EntityManage不释放,默认值是true。

这就跟我们这个案例对上了,Server-sent events是长连接,所以为这个连接绑定的EntityManager就不会关闭,一直占住,这就完全失去连接池的意义了,没有共享只有独占。

所以折腾这么久,问题的解决很简单,修改Spring Boot的配置:

application.yml
1
spring.jpa.open-in-view: false