RabbitMQ中的Back-off和retry

使用第三方API的一个常见问题是峰值期间可用性问题。即便是非峰值期间,API请求仍可能被拒绝、超时或者失败。这篇文章中,我将讲解对于RabbitMQ消费者轻量级的指数退避和重试机制。

New API, new service

当用户从我们的一个网站删除了一张照片之后,我们会将它从我们的CDN服务中删除。我们的后端是面向服务的架构,这里使用了CDN purge服务。这个服务对核心web应用发生的事情做出反应,发送消息队列,并且调用CDN的API清除资源。最近我更新了这个服务来使用Akamai CCU REST API

由于现有的服务已经存在很久了,我们决定重写它。对于一个初级开发者来说,这是一个很好的机会使用RabbitMQNode.js来实现后台任务。

正如Paul在他最近的blog中所说RabbitMQ: Front the Front Line,我们是RabbitMQ的重度使用者。尽管我们很多消费者都是实用Ruby来实现的,但是我们发现Node.js在很少的业务逻辑下也是非常实用。事件驱动和消息流很契合,并且适用于快速编码。我们实用node-amqp作为我们的RabbitMQ客户端。

一旦新的服务运行起来,很显然负载会很高,尤其是高峰8pm-2am。我们很快发现,我们的API被集中在晚上调用。这个API有一个内部队列用来处理purge请求,当队列满了的时候它会返回507 queue is full

CDN response pattern over one week

观察调用成功和失败的API,很显然我们在高峰时期需要强大的退避和重试机制。

Starting point

我们的平台发送多个不同的消息路由至CDN purge队列。CDN purge服务消费这些消息并且发送对应的请求给CDN API。下图描述了从平台到API的信息流。

Diagram of our CDN purge process

当API请求被拒绝的时候,我们想过一段时间再重试。与此同时,我们该如何处理消息?

Let RabbitMQ do the work

实现退避和重试机制,我的第一直觉是创建一个新的wait queue,把失败的请求放在里面,过一段时间继续重试。由于我刚刚使用RabbitMQ,有这么几个问题需要考虑:

  • 是否我需要消费者处理wait queue中的消息
  • 是否我能控制每个消息的重试等待时间
  • 是否我能追踪每个API请求我重试的次数
  • 是否能在一个wait queue中处理多个平台的事情

感谢的是,RabbitMQ有很多protocol extensions扩展了AMQP规范。
dead letter exchangesper-message TTL正是wait queue需要的功能。

Dead letter exchanges (DLX)

术语dead letter mail 死信邮件在邮政行业仍在使用,用来描述信件没有办法被送达。在RabbitMQ中,消息有以下几种情况的时候会被认为dead-lettered

  • 消息被拒绝
  • 消息已经过期
  • 队列已经满了

邮政服务会将死信邮件返还给发送者,和这很相似,RabbitMQ会做一些工作并且重新将dead-lettered的消息发送给我们选择的exchange-dead letter exchange。

既然我们想要一个wait queue,消息过期最容易触发dead-lettering。我们将控制消息短期内过期。

任何队列都能设置dead-letter消息。dead letter exchange是队列的一个参数,当你声明的时候可以通过x-dead-letter-exchange参数来设定。这是一个使用node-amqp的例子:

1
2
3
4
5
var queueOptions = { arguments: { "x-dead-letter-exchange": "exchange" } };

connection.queue("wait-queue", queueOptions, function(waitQueue) {
// Bind to exchange
});

尽管dead letter exchanges没有做什么特别的配置,我们仍是有了wait queue。下一步我们将设置消息过期,这样RabbitMQ就能为我们重新分配。

更多信息: RabbitMQ docs on DLX

Per-message TTL

消息可以使用默认的expiry或者TTL来声明。然而为了实现指数退避,我们需要对每个消息逐个设置过期时间。

当你发布消息的时候,你可以设置expiration字段的毫秒值:

1
2
3
var messageOptions = { expiration: 10000 };

exchange.publish("routing-key", "body", messageOptions);

看起很简单,但是这也意味着,当一个purge请求失败的时候,因为需要expiration字段,我们的消费者需要复制消息。注意:如果你声明了你的队列使用消息确认机制,不要忘记确认原来的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var subscribeOptions = { ack: true };

queue.subscribe(subscribeOptions, function(message, headers, deliveryInfo, messageObject) {
// Post request to API
// ...

// If the API request fails
var messageOptions = {
appId: messageObject.appId,
timestamp: messageObject.timestamp,
contentType: messageObject.contentType,
deliveryMode: messageObject.deliveryMode,
headers: headers,
expiration: 10000
};
exchange.publish(deliveryInfo.routingKey, message, messageOptions);
messageObject.acknowledge(false);
});

你需要确保你复制你自己的消息的详细信息。下面让我们增加每次API调用失败时的的超时时间。

更多信息: RabbitMQ docs on per-message TTL

Handling dead-lettered messages

当消息dead-lettered之后,RabbitMQ对它做了一些很有用的变化,在header中记录了这些变化。对于我们的wait queue,我们仅仅关心对expiration字段发生了什么。

expiration字段被移除了,并且重新在headers中的x-death中作为original-expiration值记录着。这允许我们找到上次的过期时间并且防止消息再次过期。重要的是,x-deathheader是有序数组,第一个是最近的值。

1
2
3
4
5
6
7
8
9
var expiration;

if (headers["x-death"]) {
expiration = (headers["x-death"][0]["original-expiration"] * 3);
} else {
expiration = 10000;
}
// Apply some randomness to the expiration
// ...

这个例子中,第一个个过期时间是10000毫秒,每次重试的时候它都会乘以3。这是指数退避算法中比较常见的实践。我们的例子中,我们增加了一点随即值来提高API调用的成功可能性。

下面我们来组织我们的队列,这样他能管理多个平台的事件。

Routing dead-lettered messages

我们的CDN purge服务对于多个平台发生的事情做出反应,每个都有它自己的routing key。最简单的方式路由多个routing key的是声明单独的wait exchange

使用单独的wait exchange之后,你可以不用理会routing keys。当失败的消息发送到wait exchange时,你不需要改变routing key。仅仅需要将wait queueprimary queue绑定同样的routing key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var routingKeys = ["routing-key-a", "routing-key-b"];

connection.exchange("wait-exchange", waitExchangeOptions, function(waitExchange) {
var waitQueueOptions = { arguments: { "x-dead-letter-exchange": "exchange" } };

connection.queue("wait-queue", waitQueueOptions, function(waitQueue) {
// Bind wait queue to all routing keys on wait exchange
routingKeys.map(function(routingKey) {
waitQueue.bind("wait-exchange", routingKey);
});
});
});

connection.exchange("primary-exchange", primaryExchangeOptions, function(primaryExchange) {
connection.queue("primary-queue", primaryQueueOptions, function(primaryQueue) {
// Bind primary queue to all routing keys on primary exchange
routingKeys.map(function(routingKey) {
primaryQueue.bind("primary-exchange", routingKey);
});
// Subscribe to messages
// ...
});
});

这样配置之后,当消息被wait queue dead-lettered之后,会重新发布至primary exchangerouting key会保持不变。这样以后添加和移除routing key会很简单。

All together now

现在把所有的代码合并在一起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
var amqp = require("amqp");

connection = amqp.createConnection({ host: "localhost" });

connection.on("ready", function() {
var waitExchange,
routingKeys = ["routing-key-a", "routing-key-b"];

waitExchange = connection.exchange("wait-exchange", waitExchangeOptions, function(waitExchange) {
var waitQueueOptions = { arguments: { "x-dead-letter-exchange": "primary-exchange" } };

connection.queue("wait-queue", waitQueueOptions, function(waitQueue) {
routingKeys.map(function(routingKey) {
waitQueue.bind("wait-exchange", routingKey);
});
});
});

connection.exchange("primary-exchange", primaryExchangeOptions, function(primaryExchange) {
connection.queue("primary-queue", primaryQueueOptions, function(primaryQueue) {
var subscribeOptions = { ack: true };

routingKeys.map(function(routingKey) {
primaryQueue.bind("primary-exchange", routingKey);
});

primaryQueue.subscribe(subscribeOptions, function(message, headers, deliveryInfo, messageObject) {
var expiration, messageOptions;
// Post request to API
// ...

// If the API request fails
if (headers["x-death"]) {
expiration = (headers["x-death"][0]["original-expiration"] * 3);
} else {
expiration = 10000;
}
messageOptions = {
appId: messageObject.appId,
timestamp: messageObject.timestamp,
contentType: messageObject.contentType,
deliveryMode: messageObject.deliveryMode,
headers: headers,
expiration: expiration
};
waitExchange.publish(deliveryInfo.routingKey, message, messageOptions);
messageObject.acknowledge(false);
});
});
});
});

Summary

我已经介绍了如何使用dead letter exchangesper-message TTL来实现轻量级别的指数退避和重试机制。上面的示例代码展示了在Node.js中使用node-amqp如何实现。下图表述了这个机制原理:

Diagram of back-off and retry mechanism

如果你和第一个图比较,我希望可以清楚的解释其中原理。最后,下面对开始的问题做一些简要的回答:

是否我需要消费者处理wait queue中的消息?

否。RabbitMQ可以做这个工作,声明一个带x-dead-letter-exchange参数的wait queue,RabbitMQ会在他们过期的时候重新发布。

是否我能控制每个消息的重试等待时间?

可以。但是per-message TTL仅能在你发送消息的时候设置。所以的你的消费者需要复制消息体并且发送的时候需要携带expiration字段。注意:如果使用了消息确认机制,不要忘了确认原来的消息。

是否我能追踪每个API请求我重试的次数?

是。每次消息变为dead-lettered时,RabbitMQ都会记录有用的信息在它的x-deathheader。第一条记录的是最新的,并且会包含original-expiration

是否能在一个wait queue中处理多个平台的事情

是。最简单的管理多个routing key的做法是为你的wait queue声明一个单独的exchange,然后和primary exchange绑定同样的routing key

原文 Back-off and retry with RabbitMQ