场景
数据处理项目, 新数据源的数据不是100%完整, 会由多个topic组合出一条完整数据, 处理后发送到项目。 可是具体完成的topic未知, 为保证尽可能将数据推送,需要一个延迟队列。
遇到的问题
项目没有redis资源, MQ只有发送功能, 需要定期将之前异常的消息进行重试
解决方案
使用Java自带的DelayQueue, 延迟指定时间, 重启丢失问题忽略, 通过其他定时任务手段进行补全
由于是Java本地数据, 因此对象创建需要保证不会过多, 信息量不能过大, 避免OOM问题
示例
创建实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class CustomizeDelayQueue implements Delayed {
public Long time;
public Long id;
public CustomizeDelayQueue(Long time, Long id, TimeUnit unit) {
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
this.id = id;
}
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
public int compareTo(Delayed o) {
CustomizeDelayQueue delay = (CustomizeDelayQueue) o;
long diff = this.time - delay.time;
return diff <= 0 ? -1 : 1;
}
}创建自定义Pool
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.record.model.CustomizeDelayQueue;
import org.springframework.stereotype.Component;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
public class DelayQueueComponent {
DelayQueue<CustomizeDelayQueue> delayQueue = new DelayQueue<>();
public void put(Long time, Long id, TimeUnit unit) {
delayQueue.put(new CustomizeDelayQueue(time, id, unit));
}
public CustomizeDelayQueue poll() {
return delayQueue.poll();
}
public int size() {
return delayQueue.size();
}
public void offer(CustomizeDelayQueue obj) {
delayQueue.offer(obj);
}
}调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class DelayQueueTest extends CaseApplicationTest {
private DelayQueueComponent queueComponent;
public void delayTest() {
log.info("start delayTest");
queueComponent.put(10L, 1L, TimeUnit.SECONDS);
while (true) {
if (queueComponent.size() > 0) {
CustomizeDelayQueue object = queueComponent.poll();
if (Objects.nonNull(object)) {
log.info("poll:{}", object);
// do something
Long id = object.id;
log.info("id:{}", id);
break;
}
}
}
}
1 | CASE 2024-10-27 00:43:06.167 INFO 75635 --- [ main] record.service.DelayQueueTest : start delayTest |
代码记录
Code: https://github.com/swzxsyh/Case
Rivision Num: 7196623c7e6bcaf0f6566f3c0eb272c79d7d923e