Skip to content
DAILY QUOTE

“ ”

阻塞队列:

BlockingQueue

什么情况下我们会使用 阻塞队列?:多线程并发处理,线程池用的较多 !

学会使用队列

添加、移除

四组API

方式抛出异常有返回值不抛出异常阻塞 等待超市等待
添加addoffer()put()offer()
移除removepoll()take()poll()
检测队首元素elementpeek()--
java
/** * 抛出异常 */
package com.mystpet.queueTest;  
  
import java.util.concurrent.ArrayBlockingQueue;  
  
public class Test {  
    public static void main(String[] args) {  
        // 队列的大小  
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);  
  
        System.out.println(blockingQueue.add("a"));  
        System.out.println(blockingQueue.add("b"));  
        System.out.println(blockingQueue.add("c"));  
        // java.lang.IllegalStateException: Queue full 抛出异常  
        // System.out.println(blockingQueue.add("d"));  
  
        System.out.println("====================");  
  
        System.out.println(blockingQueue.remove());  
        System.out.println(blockingQueue.remove());  
        System.out.println(blockingQueue.remove());  
        // java.util.NoSuchElementException 抛出异常  
        // System.out.println(blockingQueue.remove());  
    }  
}
java
/** * 有返回值,不抛出异常 */
package com.mystpet.queueTest;  
  
import java.util.concurrent.ArrayBlockingQueue;  
  
public class Test {  
    public static void main(String[] args) {  
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);  
  
        System.out.println(blockingQueue.offer("a"));  
        System.out.println(blockingQueue.offer("b"));  
        System.out.println(blockingQueue.offer("c"));  
        System.out.println(blockingQueue.offer("d"));   // false 不抛出异常  
  
        System.out.println("=========================");  
  
        System.out.println(blockingQueue.poll());  
        System.out.println(blockingQueue.poll());  
        System.out.println(blockingQueue.poll());  
        System.out.println(blockingQueue.poll());   // null 不出抛出异常  
    }  
}
java
/** * 等待,阻塞(一直阻塞) */
package com.mystpet.queueTest;  
  
import java.util.concurrent.ArrayBlockingQueue;  
  
public class Test {  
    public static void main(String[] args) throws InterruptedException {  
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);  
  
        // 一直阻塞  
        blockingQueue.put("a");  
        blockingQueue.put("b");  
        blockingQueue.put("c");  
        // blockingQueue.put("d"); // 队列没有位置了,一直等待  
  
        System.out.println(blockingQueue.take());  
        System.out.println(blockingQueue.take());  
        System.out.println(blockingQueue.take());  
    }  
}
java
/** * 等待,阻塞(等待超时) */
package com.mystpet.queueTest;  
  
import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.TimeUnit;  
  
public class Test {  
    public static void main(String[] args) throws InterruptedException {  
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);  
  
        System.out.println(blockingQueue.offer("a"));  
        System.out.println(blockingQueue.offer("b"));  
        System.out.println(blockingQueue.offer("c"));  
        System.out.println(blockingQueue.offer("d", 2, TimeUnit.SECONDS));  // 等待超过2秒就退出  
  
        System.out.println("==================");  
  
        System.out.println(blockingQueue.poll());  
        System.out.println(blockingQueue.poll());  
        System.out.println(blockingQueue.poll());  
        System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));    // 等待超过2秒就退出  
    }  
}

SynchronousQueue同步队列

没有容量,

进去一个元素,必须等待取出来之后,才能再往里面放一个元素!

put:放入 take:取出

java
package com.mystpet.queueTest;  
  
import java.util.concurrent.SynchronousQueue;  
import java.util.concurrent.TimeUnit;  
  
public class SynchronousQueueDemo {  
    public static void main(String[] args) {  
        SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();   // 同步队列  
  
        new Thread(() -> {  
            try {  
  
                synchronousQueue.put("1");  
                System.out.println(Thread.currentThread().getName() + " put 1 ");  
                synchronousQueue.put("2");  
                System.out.println(Thread.currentThread().getName() + " put 2 ");  
                synchronousQueue.put("3");  
                System.out.println(Thread.currentThread().getName() + " put 3 ");  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }, "T1").start();  
  
        new Thread(() -> {  
            try {  
                TimeUnit.SECONDS.sleep(3);  
                System.out.println(Thread.currentThread().getName() + " => " + synchronousQueue.take());  
                TimeUnit.SECONDS.sleep(3);  
                System.out.println(Thread.currentThread().getName() + " => " + synchronousQueue.take());  
                TimeUnit.SECONDS.sleep(3);  
                System.out.println(Thread.currentThread().getName() + " => " + synchronousQueue.take());  
  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }, "T2").start();  
    }  
}

运行结果: