0%

Java JUC DelayQueue使用

场景

数据处理项目, 新数据源的数据不是100%完整, 会由多个topic组合出一条完整数据, 处理后发送到项目。 可是具体完成的topic未知, 为保证尽可能将数据推送,需要一个延迟队列。

遇到的问题

项目没有redis资源, MQ只有发送功能, 需要定期将之前异常的消息进行重试

解决方案

使用Java自带的DelayQueue, 延迟指定时间, 重启丢失问题忽略, 通过其他定时任务手段进行补全

由于是Java本地数据, 因此对象创建需要保证不会过多, 信息量不能过大, 避免OOM问题

示例

  1. 创建实现类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;

    public class CustomizeDelayQueue implements Delayed {

    public Long time;

    public Long id;


    public CustomizeDelayQueue(Long time, Long id, TimeUnit unit) {
    this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
    this.id = id;
    }

    @Override
    public long getDelay(TimeUnit unit) {
    return time - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
    CustomizeDelayQueue delay = (CustomizeDelayQueue) o;
    long diff = this.time - delay.time;
    return diff <= 0 ? -1 : 1;
    }
    }
  2. 创建自定义Pool

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28

    import com.record.model.CustomizeDelayQueue;
    import org.springframework.stereotype.Component;

    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.TimeUnit;

    @Component
    public class DelayQueueComponent {

    DelayQueue<CustomizeDelayQueue> delayQueue = new DelayQueue<>();

    public void put(Long time, Long id, TimeUnit unit) {
    delayQueue.put(new CustomizeDelayQueue(time, id, unit));
    }

    public CustomizeDelayQueue poll() {
    return delayQueue.poll();
    }

    public int size() {
    return delayQueue.size();
    }

    public void offer(CustomizeDelayQueue obj) {
    delayQueue.offer(obj);
    }
    }
  3. 调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Slf4j
    public class DelayQueueTest extends CaseApplicationTest {

    @Autowired
    private DelayQueueComponent queueComponent;

    @Test
    public void delayTest() {
    log.info("start delayTest");
    queueComponent.put(10L, 1L, TimeUnit.SECONDS);
    while (true) {
    if (queueComponent.size() > 0) {
    CustomizeDelayQueue object = queueComponent.poll();
    if (Objects.nonNull(object)) {
    log.info("poll:{}", object);
    // do something
    Long id = object.id;
    log.info("id:{}", id);

    break;
    }
    }
    }
    }
1
2
3
CASE  2024-10-27 00:43:06.167  INFO 75635 --- [           main] record.service.DelayQueueTest            : start delayTest
CASE 2024-10-27 00:43:16.167 INFO 75635 --- [ main] record.service.DelayQueueTest : poll:com.record.model.CustomizeDelayQueue@7ee2423
CASE 2024-10-27 00:43:16.167 INFO 75635 --- [ main] record.service.DelayQueueTest : id:1

代码记录

Code: https://github.com/swzxsyh/Case
Rivision Num: 7196623c7e6bcaf0f6566f3c0eb272c79d7d923e