爪哇中的ArrayBlockingQueue

在本文中,我们将了解Java并发队列BlockingQueue。然后我们将深入其中’其中之一的实现ArrayBlockingQueue。
什么是BlockingQueue
阻塞队列 整型erface was 整型roduced in 爪哇 5 under 同时 APIs and it represents a thread-safe queue in which elements can be added to and removed from. We can have multiple threads inserting and removing elements from 阻塞队列
同时ly.
The reason it is called blocking
because it has the ability to block a thread from inserting an element when 这个 queue is full and also it blocks a thread from removing an element when 这个 queue is empty. Of course, there are different methods which decide whether to block the thread or 返回 an exception.
ArrayBlockingQueue
ArrayBlockingQueue实现BlockingQueue 爪哇接口。它是一个并发且有界的阻塞队列实现,它使用数组来存储元素。该类提供阻塞和非阻塞功能,用于从队列中插入和删除元素。通过阻塞它意味着像take()和put()这样的函数将无限期阻塞使用者或生产者线程,除非删除或插入一个元素。
产品特点
- 它是阻塞队列的线程安全实现。
- 它是一个有界队列,其大小在创建对象时给出,并且可以’实例化后将无法更改。队列是使用内部数组实现的。
- 可以按插入顺序或FIFO使用ArrayBlockingQueue上的元素。
- 不允许使用null对象,如果放在阻塞队列中的对象为null,则它将引发异常。
- 它具有阻塞和非阻塞功能,可以从队列中放置元素或从队列中删除元素。
- 它允许生产者或消费者线程的公平政策。公平政策将在下面详细说明
进一步阅读
建设者
The ArrayBlockingQueue has three constructors. The 公平ness
flag set as 真正 implies that 如果 multiple Producer or 消费者 threads are waiting on the queue either for adding or removing an element from the queue then it will ensure a FIFO order for the waiting threads 其他 如果 it’假,然后不’t保证线程的顺序。
The ArrayBlockingQueue
in 爪哇 整型ernally uses a Reentrant 锁
object instance. It also uses the 锁 to create two conditions 不是空的 and 不完整.
The
不是空的 condition
will allow 消费者s to remove elements from the queue in a threadsafe manner and will block the 消费者 threads in case the queue gets full.The
不完整 condition
will in a similar way allow 制片人 threads to add elements in the queue in a thread-safe manner until it’满后,它将阻止所有生产者线程。
在第三 建设者,如果我们传递一个集合,则初始ArrayBlockingQueue将具有遍历顺序的那个集合的元素。
1 2 3 4 5 6 7 |
上市 ArrayBlockingQueue(整型 容量, 布尔值 公平) 上市 ArrayBlockingQueue(整型 容量) ArrayBlockingQueue(整型 容量, 布尔值 公平, 采集<? 延伸 E> c) |
方法
ArrayBlockingQueue
有许多受支持的方法,我们将列出所有这些方法,此外,我们还添加了某些功能的实际实现的源代码片段,以便更好地理解。添加的功能的源代码片段有助于理解如何使用构造函数中初始化的锁和条件对象。
-
提供(E)
:此函数如下所示,将在队列的尾部添加一个元素,如果成功返回true,否则如果达到队列容量,它将返回false。它’s是线程安全的方法,并且是非阻塞的,即,如果队列已满,则将赢得’t阻止生产者线程,但返回false。1234567891011121314151617上市 布尔值 提供(E e) {checkNotNull(e);最后 重入锁 锁 = 这个.锁;锁.锁();尝试 {如果 (计数 == 项目.长度)返回 假;其他 {入队(e);返回 真正;}} 最后 {锁.开锁();}} -
提供(E e, 长 超时, 时间单位 单元)
:此功能的行为与offer(E)相似,除了在阻塞队列已满的情况下,它没有’t会快速返回false,但是要等到超时值后,才能在阻塞队列返回false之前查看该空间是否在阻塞队列中可用于插入该元素。123456789101112131415161718192021上市 布尔值 提供(E e, 长 超时, 时间单位 单元)抛出 InterruptedException {checkNotNull(e);长 纳米 = 单元.往纳诺斯(超时);最后 重入锁 锁 = 这个.锁;锁.不断地锁定();尝试 {而 (计数 == 项目.长度) {如果 (纳米 <= 0)返回 假;纳米 = 不完整.等待纳米(纳米);}入队(e);返回 真正;} 最后 {锁.开锁();}} -
放(E e)
:此函数将在末尾将元素插入到阻塞队列中,并且将无限期等待,除非阻塞队列已满,除非插入中断。1234567891011121314上市 虚空 放(E e) 抛出 InterruptedException {checkNotNull(e);最后 重入锁 锁 = 这个.锁;锁.不断地锁定();尝试 {而 (计数 == 项目.长度)不完整.等待();入队(e);} 最后 {锁.开锁();}}pre>
-
add(E)
:此函数在内部使用offer(E)并以相同的方式运行,除了当阻塞队列已满时,它将引发IllegalStateException。 -
轮询()
:此函数删除并返回阻塞队列顶部的元素,如果队列为空,则返回null。这是一个非阻塞功能。1234567891011上市 E 轮询() {最后 重入锁 锁 = 这个.锁;锁.锁();尝试 {返回 (计数 == 0) ? 空值 : 出队();} 最后 {锁.开锁();}} -
轮询(long 超时, 时间单位 单元)
:此函数的行为与poll函数类似,不同之处在于,如果阻塞队列为空,它将在尝试获取阻塞队列顶部的元素之前等待超时参数值。1234567891011121314151617上市 E 轮询(长 超时, 时间单位 单元) 抛出 InterruptedException {长 纳米 = 单元.往纳诺斯(超时);最后 重入锁 锁 = 这个.锁;锁.不断地锁定();尝试 {而 (计数 == 0) {如果 (纳米 <= 0)返回 空值;纳米 = 不是空的.等待纳米(纳米);}返回 出队();} 最后 {锁.开锁();}} -
采取()
:如果队列不为空,则此函数将返回阻塞队列顶部的元素。如果阻塞队列为空,则调用此函数的线程将等待,直到将元素插入到阻塞队列中为止。12345678910111213上市 E 采取() 抛出 InterruptedException {最后 重入锁 锁 = 这个.锁;锁.不断地锁定();尝试 {而 (计数 == 0)不是空的.等待();返回 出队();} 最后 {锁.开锁();}} -
peek()
:此函数将返回阻塞队列顶部的元素,而不会删除该元素;如果阻塞队列为空,则返回null。 -
size()
:此函数返回阻塞队列的容量 -
剩余容量()
:此函数返回阻止队列可容纳的最大元素数减去当前阻止队列中的元素数之差。 -
remove(Object o)
:如果此元素等于传递给此函数的对象,则该函数从阻塞队列中删除该元素的单个实例。如果找到匹配的元素,则删除后返回true,否则返回false。 -
contains(Object o)
:如果阻塞队列中存在与作为输入参数传递的对象匹配的对象,则此函数返回true,否则返回false。 -
toArray()
:此函数返回一个Object [],它是支持阻塞队列的内部数组的有序副本。 系统.arraycopy()用于复制数组,以确保对返回的数组进行外部修改’不会影响阻塞队列。 -
clear()
:此函数将以原子方式删除阻塞队列中的所有元素。这还将在清空队列后向阻塞队列中等待的所有生产者线程发出信号。 -
drainTo()
:此功能将以原子方式耗尽阻塞队列中的所有元素。如果您的集合参数实例与调用此函数的实例相同,则将引发IllegalArgumentException。任何等待中的生产者线程都将收到信号,表明队列为空并准备接受新元素。
使用场景
当不需要完整的消息传递基础结构时,可以使用ArrayBlockingQueue实例解决生产者和使用者类型的问题。如果资源有限,它可以用作资源池来限制使用者。
实施代码
在下面的单元测试中,我们创建一个ArrayBlockingQueue实例并启动生产者,以及生产者线程是否等待某个使用者线程创建的空间。同样,我们还测试了一旦队列为空,使用者线程是否等待元素添加到队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
包 主要.com.千伏.公吨.同时.阻塞队列; 进口 爪哇.实用程序.随机; 进口 爪哇.实用程序.同时.ArrayBlockingQueue; 进口 爪哇.实用程序.同时.阻塞队列; / ** *下面的代码测试了ArrayBlockingQueue在队列已满时使生产者线程等待的地方 *并且类似地,如果队列为空,它将使使用者线程充满 * / 上市 类 爪哇2博客ArrayBlockingQueue { 上市 静态的 虚空 主要(串 args[]){ 最后 阻塞队列<整数> 阻塞队列 = 新 ArrayBlockingQueue<整数>(5); 最后 随机 随机 = 新 随机(); //生产者线程一直运行直到被中断 可运行 制片人 = () -> { 布尔值 isInterrupted = 假; 而(!isInterrupted) { 尝试 { 系统.出.打印(线.currentThread().getName() + “添加到队列”); 阻塞队列.放(随机.nextInt()); 系统.出.打印(线.currentThread().getName() + “添加到队列已完成”); } 抓住 (InterruptedException e) { 系统.出.打印(线.currentThread().getName() + “打断了”); isInterrupted = 真正; } }; }; //消费者线程一直运行直到被中断 可运行 消费者 = () -> { 布尔值 isInterrupted = 假; 而(!isInterrupted) { 尝试 { 系统.出.打印(线.currentThread().getName() + “从队列中检索”); 系统.出.打印(线.currentThread().getName() + “已检索” + 阻塞队列.采取() + “来自队列”); } 抓住 (InterruptedException e) { 系统.出.打印(线.currentThread().getName() + “打断了”); isInterrupted = 真正; } } }; 线 制片人Thread = 新 线(制片人); 制片人Thread.setName(“我的生产者”); 线 消费者Thread = 新 线(消费者); 消费者Thread.setName(“我的消费者”); 制片人Thread.开始(); //此代码将等待主线程等待,直到生产者完全填满阻塞队列 而(阻塞队列.剩余容量()!=0){ 尝试{ 线.睡觉(5000); }抓住 (InterruptedException 即){ 系统.出.打印(“主线程中断”); } } //此日志检查MyProducer线程状态,因为它现在应该处于等待状态,因为 阻塞队列已满 系统.出.打印(“队列已满,MyProducer线程状态:”+制片人Thread.getState()); 断言 (线.州.等候==制片人Thread.getState()); 断言(制片人Thread.活着()); //生产线程停止,以确保一旦所有整数都消耗完,阻塞队列将为空 制片人Thread.打断(); //现在启动使用者线程 消费者Thread.开始(); //等待使用者耗尽阻塞队列 而(((ArrayBlockingQueue) 阻塞队列).剩余容量()!=5){ 尝试{ 线.睡觉(5000); }抓住 (InterruptedException 即){ 系统.出.打印(“主线程中断”); } } //一旦阻塞队列为空,检查使用者线程的状态。我们应该处于等待状态吗 系统.出.打印(“队列为空,并且MyConsumer线程状态为:”+消费者Thread.getState()); 断言(线.州.等候==消费者Thread.getState()); 断言(消费者Thread.活着()); //停止消费者 消费者Thread.打断(); } } |
MyProducer完成添加到队列
MyProducer添加到队列
MyProducer完成添加到队列
MyProducer添加到队列
MyProducer完成添加到队列
MyProducer添加到队列
MyProducer完成添加到队列
MyProducer添加到队列
MyProducer完成添加到队列
MyProducer添加到队列
队列已满,MyProducer线程状态:WAITING
MyProducer中断
从队列中检索MyConsumer
MyConsumer从队列中检索了-65648598
从队列中检索MyConsumer
MyConsumer从队列中检索了-1141421021
从队列中检索MyConsumer
MyConsumer从队列中检索到1476346866
从队列中检索MyConsumer
MyConsumer从队列中检索到1937023750
从队列中检索MyConsumer
MyConsumer从队列中检索了-1723127356
从队列中检索MyConsumer
队列为空,并且MyConsumer线程状态为:WAITING
MyConsumer中断
概要
- We have understood what a 同时
阻塞队列
is and why it is 进口ant in a multi-threaded environment. - We have also seen the implementation of 阻塞队列, i.e.
ArrayBlockingQueue
. - We have gone through the constructors and methods of the
ArrayBlockingQueue
. - We have implemented our own
ArrayBlockingQueue
and tested it with 制片人 and 消费者.