java-延时队列-DelayQueue源码分析

小编 2026-06-05 阅读:678 评论:0
首先延时队列的发现是基于一个实际场景:如何处理超时的订单 这个问题很容易想到解决方案 (1)写一个定时任务,轮询超时的订单(缺点:性能消耗过大,对数据库造成压力) (2)放入延时队列当中 那么我就选...

首先延时队列的发现是基于一个实际场景:如何处理超时的订单

这个问题很容易想到解决方案

(1)写一个定时任务,轮询超时的订单(缺点:性能消耗过大,对数据库造成压力)

(2)放入延时队列当中

那么我就选择一下延时队列,看一下java的实现吧。

贴出一个入门的博客:https://www.cnblogs.com/barrywxx/p/8525907.html

然后我们就以这个博客来入手源码

1.首先看下如何使用

public static void main(String[] args) {    
            // 创建延时队列    
            DelayQueue<Message> queue = new DelayQueue<Message>();    
            // 添加延时消息,m1 延时3s    
            Message m1 = new Message(1, \"world\", 3000);    
            // 添加延时消息,m2 延时10s    
            Message m2 = new Message(2, \"hello\", 10000);    
            //将延时消息放到延时队列中  
            queue.offer(m2);    
            queue.offer(m1);    
            // 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间   
            ExecutorService exec = Executors.newFixedThreadPool(1);  
            exec.execute(new Consumer(queue));  
            exec.shutdown();  
}
public class Consumer implements Runnable {  
    // 延时队列 ,消费者从其中获取消息进行消费  
    private DelayQueue<Message> queue;  
  
    public Consumer(DelayQueue<Message> queue) {  
        this.queue = queue;  
    }  
  
    @Override  
    public void run() {  
        while (true) {  
            try {  
                Message take = queue.take();  
                System.out.println(\"消费消息id:\" + take.getId() + \" 消息体:\" + take.getBody());  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}

那么从上面可以看得出来,最基础的两个方法便是queue.offer(X),queue.take();

1.那么就从offer()开始看起

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //这里DelayQueue其实内置了一个队列,大部分功能是沿用了优先级队列
            //优先级队列大致就是讲的是 1 2 3 4 5 6 7这7个数你随便指定先后顺序放入
            //都只会有一种结果
            //private final PriorityQueue<E> q = new PriorityQueue<E>();
            q.offer(e);
            //如果e是队首的话就唤醒其余等待线程
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
}
//接着看q.offer(e);
public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        //这里是扩容操作
        if (i >= queue.length)
            grow(i + 1);
        size = i + 1;
        //队列第一个没有的话,就直接放入
        if (i == 0)
            queue[0] = e;
        //继续看
        else
            siftUp(i, e);
        return true;
}
private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        else
            siftUpComparable(k, x);
}
//一个一个的对比,一般来说是自己实现compareTo的实现
//一般就是对比时间
private void siftUpUsingComparator(int k, E x) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (comparator.compare(x, (E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = x;
}

2.take()

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                //peek(): return (size == 0) ? null : (E) queue[0];
                //就是获取第一个元素
                E first = q.peek();
                if (first == null)
                    //availabe是一个condition
                    available.await();
                else {
                    //对比当前时间是否达到
                    long delay = first.getDelay(NANOSECONDS);
                    //已经到了
                    if (delay <= 0)
                        //返回头结点,并将其移除
                        return q.poll();
                    first = null; // don\'t retain ref while waiting
                    //leader是用来提高效率的
                    //因为前面是lock.lockInterruptibly(),
                    //可能存在多个线程一起去take()
                    //所以避开过多线程一起操作
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

 

版权声明

本文仅代表作者观点,不代表百度立场。
本文系作者授权百度百家发表,未经许可,不得转载。

热门文章
  • 机房智能化温湿度解决方式之POE供电以太网温湿度传感器

    机房智能化温湿度解决方式之POE供电以太网温湿度传感器
    机房智能化温湿度解决方式之POE供电以太网温湿度传感器 北京盈创力和电子科技有限公司 智能型TCP网口温湿度记录仪 北京IP网络温湿度记录仪厂家,北京盈创力和 北京智能型TCP网口温湿度记录仪IP网络温湿度记录仪是一种新型的基于TCP/IP协议双绞线以太网标准温湿度采集模块,利用它可以实现现场温度值、相对湿度值的采集,同时利用其自身的RJ45通信接口可以方便地和机房监控主机或交换机集线器进行联网。 工作于-40℃~85℃工业级带...
  • Sequential Monte Carlo Methods (SMC) 序列蒙特卡洛/粒子滤波/Bootstrap Filtering

    Sequential Monte Carlo Methods (SMC) 序列蒙特卡洛/粒子滤波/Bootstrap Filtering
    Problem Statement 我们考虑一个具有马尔可夫性质、非线性、非高斯的状态空间模型(State Space Model):对于一个时间序列上的观测结果{yt,t∈N}\\{ y_t , t \\in N \\}{yt​,t∈N},我们认为每个观测结果yty_tyt​的生成依赖于一个无法直接观察的隐变量xt∈{xt,t∈N}x_t \\in \\{x_t , t \\in N \\}xt​∈{xt​,t∈N},即:p(...
  • HTTP状态保持的原理

    HTTP状态保持的原理
    a)在用户登录之后,浏览器返回响应的时候会在响应中添加上cookieb)浏览器接收到cookie之后会自动保存c)当用户再次请求同一服务器中的其他网页的时候,浏览器会自动带上之前保存的cookied)服务接收到请求之后可以请 request 对象中取到cookie 判断当前用户是否登录  Http是无状态的,就是连接时数据互通,关闭后...
  • Hive 系统函数及示例

    Hive 系统函数及示例
    查看所有系统函数 show functions; 函数分类 内置函数【系统函数】 数学函数: floor、round、ceil、cos、log2等 字符串函数: length、reverse、trim、lower、get_json_object、repeat等 收集函数: size 转换函数: cast 日期函数: year、month、datediff、date、date_add等 条件函数: coalesce、case…w...
  • CSRF的原理和防范措施

    CSRF的原理和防范措施
    a)攻击原理:i.用户C访问正常网站A时进行登录,浏览器保存A的cookieii.用户C再访问攻击网站B,网站B上有某个隐藏的链接或者图片标签会自动请求网站A的URL地址,例如表单提交,传指定的参数iii.而攻击网站B在访问网站A的时候,浏览器会自动带上网站A的cookieiv.所以网站A在接收到请求之后可判断当前用户是登录状态,所以...
标签列表