
阻塞队列:


BlockingQueue

什么情况下我们会使用 阻塞队列?:多线程并发处理,线程池用的较多 !
学会使用队列
添加、移除
四组API
| 方式 | 抛出异常 | 有返回值不抛出异常 | 阻塞 等待 | 超市等待 |
|---|---|---|---|---|
| 添加 | add | offer() | put() | offer() |
| 移除 | remove | poll() | take() | poll() |
| 检测队首元素 | element | peek() | - | - |
java
/** * 抛出异常 */
package com.mystpet.queueTest;
import java.util.concurrent.ArrayBlockingQueue;
public class Test {
public static void main(String[] args) {
// 队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// java.lang.IllegalStateException: Queue full 抛出异常
// System.out.println(blockingQueue.add("d"));
System.out.println("====================");
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// java.util.NoSuchElementException 抛出异常
// System.out.println(blockingQueue.remove());
}
}java
/** * 有返回值,不抛出异常 */
package com.mystpet.queueTest;
import java.util.concurrent.ArrayBlockingQueue;
public class Test {
public static void main(String[] args) {
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d")); // false 不抛出异常
System.out.println("=========================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll()); // null 不出抛出异常
}
}java
/** * 等待,阻塞(一直阻塞) */
package com.mystpet.queueTest;
import java.util.concurrent.ArrayBlockingQueue;
public class Test {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
// 一直阻塞
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d"); // 队列没有位置了,一直等待
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
}
}java
/** * 等待,阻塞(等待超时) */
package com.mystpet.queueTest;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d", 2, TimeUnit.SECONDS)); // 等待超过2秒就退出
System.out.println("==================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS)); // 等待超过2秒就退出
}
}SynchronousQueue同步队列
没有容量,
进去一个元素,必须等待取出来之后,才能再往里面放一个元素!
put:放入 take:取出
java
package com.mystpet.queueTest;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>(); // 同步队列
new Thread(() -> {
try {
synchronousQueue.put("1");
System.out.println(Thread.currentThread().getName() + " put 1 ");
synchronousQueue.put("2");
System.out.println(Thread.currentThread().getName() + " put 2 ");
synchronousQueue.put("3");
System.out.println(Thread.currentThread().getName() + " put 3 ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " => " + synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " => " + synchronousQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + " => " + synchronousQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
}运行结果:
