Redis 实现等待队列

前言

甲方爸爸:现在老平台流程不支持并发运行,能否写个等待队列,给用户一种感觉平台支持并发运行流程。

我:没问题。

于是乎,开始思考如何依托于现有的环境(MQ 和 Redis)实现等待队列。

问题描述:存在若干个不固定数量流程,每个流程所需要的运行时间不同,每个流程需要执行多个任务(参数不同)。同一时间段内,可以执行不同流程类型的任务,但是不能同时处理相同类型流程的任务。现需要实现一个等待队列,以应对并发请求执行若干任务。

实现思路

假设有 A、B、C 三类流程,每类流程有若干个任务(每个任务对应的流程参数不同)。给每类流程设置一个分布式锁和一个先进先出的队列(等待队列),当需要执行任务时,先尝试获取该任务所属流程类型的分布式锁。如果获取到锁,则直接处理任务,反之进入队列。当任务处理完成后,解除当前任务所属流程的分布式锁并从对应流程类型的等待队列中取出一个任务执行,直到队列为空。

实现细节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class TaskInfo {

private String flowId;

private String taskName;

private String taskParams;

public String getFlowId() {
return flowId;
}

public void setFlowId(String flowId) {
this.flowId = flowId;
}

public String getTaskName() {
return taskName;
}

public void setTaskName(String taskName) {
this.taskName = taskName;
}

public String getTaskParams() {
return taskParams;
}

public void setTaskParams(String taskParams) {
this.taskParams = taskParams;
}

@Override
public String toString() {
return "TaskInfo{" +
"flowId='" + flowId + '\'' +
", taskName='" + taskName + '\'' +
", taskParams='" + taskParams + '\'' +
'}';
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class TaskWaitQueueService {

private final Redisson redisson;

private final RedisTemplate<String, Object> redisTemplate;

private final static TASK_WAIT_QUEUE_LOCK = "taskWaitQueueLock:";

private final static TASK_WAIT_QUEUE = "taskWaitQueue:";

private final static Logger logger = LoggerFactory.getLogger(TaskWaitQueueService.class);

/**
*
* @param flowId 流程id,用于区分不同任务所属的流程类型
* @param taskName 任务名
* @param taskParams 任务参数,执行流程所需要用到的参数
*/
public void executeTask(String flowId, String taskName, String taskParams) {
String taskWaitQueueLockKey = TASK_WAIT_QUEUE_LOCK.concat(taskType);
RLock lock = redisson.getLock(taskWaitQueueLockKey);
if (lock.isLocked()) {
// 进入等待队列
String taskWaitQueueKey = TASK_WAIT_QUEUE.concat(flowId);
redisTemplate.opsForList().rightPush(taskWaitQueueKey, new TaskInfo(flowId, taskName, taskParams));
} else {
lock.lock();
try {
doSomething(flowId, taskName, taskParams); // 具体任务的执行
} catch (Exception e) {
logger.error("执行任务失败", e);
}
}
}

public unlockWaitQueueLock(String flowId) {
String taskWaitQueueLockKey = TASK_WAIT_QUEUE_LOCK.concat(flowId);
RLock lock = redisson.getLock(taskWaitQueueLockKey);
if (lock.isLocked()) {
lock.forceUnlock();
}
String taskWaitQueueKey = TASK_WAIT_QUEUE.concat(flowId);
Long taskWaitQueueSize = redisTemplate.opsForList().size(taskWaitQueueKey);
int size = taskWaitQueueSize != null ? taskWaitQueueSize.intValue() : 0;
if (size > 0) {
// 从 redis 队列中取出数据,调用执行方法
TaskInfo taskInfo = (TaskInfo) redisTemplate.opsForList().leftPop(taskWaitQueueKey);
if (taskInfo != null) {
this.executeTask(taskInfo.getFlowId(), taskInfo.getTaskName(), taskInfo.getTaskParams());
}
}
}

}

总结

记一次自己实现等待队列的过程,主要使用 Redis 分布式锁以及 Redis List 数据类型的先进先出的特性实现的。当然也考虑过使用消息中间件实现等待队列,最后没选择使用的原因是因为同一时间段内,可以执行不同流程类型的任务,但是不能同时处理相同类型流程的任务。可能需要多个消费者去消费数据(流程数量不固定,可能需要动态创建消费者)

对于上述实现方案有疑问或者有更好的实现方式,欢迎在评论区留言或邮件交流,愿与君共勉。

Powered by Hexo and Hexo-theme-hiker

Copyright © 2018 - 2023 Leamx's Blog All Rights Reserved.

UV : | PV :