博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
《 Java并发编程从入门到精通》第5章 多线程之间交互:线程阀
阅读量:6909 次
发布时间:2019-06-27

本文共 9853 字,大约阅读时间需要 32 分钟。

作者:张振华    购买链接:

(投入多少,收获多少。参与多深,领悟多深,京东,亚马逊,当当均有销售。)

 


5.1 线程安全的阻塞队列BlockingQueue

(1)先理解一下Queue、Deque、BlockingQueue的概念:

Queue(队列) :用于保存一组元素,不过在存取元素的时候必须遵循先进先出原则。队列是一种特殊的线性表,它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作。进行插入操作的端称为队尾,进行删除操作的端称为队头。队列中没有元素时,称为空队列。在队列这种数据结构中,最先插入的元素将是最先被删除的元素;反之最后插入的元素将是最后被删除的元素,因此队列又称为“先进先出”(FIFO—first in first out)的线性表。

Deque(双端队列): 两端都可以进出的队列。当我们约束从队列的一端进出队时,就形成了另外一种存取模式,它遵循先进后出原则,这就是栈结构。双端队列主要是用于栈操作。使用站结构让操作有可追溯性(如windows窗口地址栏内的路径前进栈、后退栈)。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
  • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
  • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

(2)Java里的阻塞队列最新JDK中提供了7个阻塞队列。分别是:

 

BlockingQueue常用的方法有,更多方法请查询API:

1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则招聘异常

2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.

3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止

其中:BlockingQueue 不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。null 被用作指示poll 操作失败的警戒值。


5.2 ArrayBlockingQueue


    ArrayBlockingQueue一个由数组支持的有界的。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。

这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。

先看一下ArrayBlockingQueue的部分源码:理解一下ArrayBlockingQueue的实现原理和机制

public 
class 
ArrayBlockingQueue <E> 
extends 
AbstractQueue<E>
        
implements 
BlockingQueue<E>, java.io.Serializable {
 
    
//数组的储存结构
    
final 
Object[] 
items
;
    
   //锁采用的机制
    
final 
ReentrantLock 
lock
;
    
public 
ArrayBlockingQueue( 
int 
capacity, 
boolean 
fair) {
        
if 
(capacity <= 0)
            
throw 
new 
IllegalArgumentException();
        
this
.
items 
= 
new 
Object[capacity];
        //通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程
        
lock 
= 
new 
ReentrantLock(fair);
        
notEmpty 
= 
lock 
.newCondition();
        
notFull 
 
lock 
.newCondition();
    }
    
public 
boolean 
offer(E e) {
        checkNotNull(e);
        //使用ReentrantLock 锁机制
        
final 
ReentrantLock lock = 
this
.
lock
;
        lock.lock();//加锁
        
try 
{
            
if 
(
count 
== 
items
.
length
)
                
return 
false 
;
            
else 
{
                enqueue(e);
                
return 
true 
;
            }
        } 
finally 
{
            lock.unlock();//释放锁
        }
    }
    
private 
void 
enqueue(E x) {
        
final 
Object[] items = 
this
.
items
;
        items[ 
putIndex
] = x;//通过数组进行储存
        
if 
(++
putIndex 
== items.
length
)
            
putIndex 
= 0;
        
count
++;
        
notEmpty
.signal();
    }
…….
}
使用实例是:
import 
java.util.concurrent.ArrayBlockingQueue;
import 
java.util.concurrent.BlockingQueue;
/*
* 现有的程序代码模拟产生了16个日志对象,并且需要运行16秒才能打印完这些日志,
* 请在程序中增加4个线程去调用parseLog()方法来分头打印这16个日志对象,
* 程序只需要运行4秒即可打印完这些日志对象。
*/
public 
class 
BlockingQueueTest {
       
public 
static 
void 
main(String[] args) 
throws 
Exception {
             
// 新建一个等待队列
             
final 
BlockingQueue<String> bq = 
new 
ArrayBlockingQueue<String>(16);
             
// 四个线程
             
for 
(
int 
i = 0; i < 4; i++) {
                   
new 
Thread(
new 
Runnable() {
                         
@Override
                         
public 
void 
run() {
                               
while 
(
true 
) {
                                     
try 
{
                                          String log = (String) bq.take();
                                           parseLog(log);
                                    } 
catch 
(Exception e) {
                                    }
                              }
                        }
                  }).start();
            }
             
for 
(
int 
i = 0; i < 16; i++) {
                  String log = (i + 1) + 
” –>  “
;
                  bq.put(log); 
// 将数据存到队列里!
            }
      }
       
// parseLog方法内部的代码不能改动
       
public 
static 
void 
parseLog(String log) {
            System. 
out
.println(log + System.currentTimeMillis());
             
try 
{
                  Thread. sleep(1000);
            } 
catch 
(InterruptedException e) {
                  e.printStackTrace();
            }
      }
}

5.3 LinkedBlockingQueue


      LinkedBlockingQueue : 基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列 中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时 (LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反 之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别 采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大 小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于 消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

先看一下LinkedBlockingDeque的部分源码:理解一下ArrayBlockingQueue的实现原理和机制
public 
class 
LinkedBlockingDeque <E>
    
extends 
AbstractQueue<E>
    
implements 
BlockingDeque<E>, java.io.Serializable {
    
final 
ReentrantLock 
lock 
= 
new 
ReentrantLock();//线程安全
    
/**
     * 
@throws 
NullPointerException 
{@inheritDoc}
     */
    
public 
boolean 
offerLast(E e) {
        
if 
(e == 
null
) 
throw 
new 
NullPointerException();
        Node<E> node = 
new 
Node<E>(e);//每次插入后都将动态地创建链接节点
        
final 
ReentrantLock lock = 
this
.
lock
;
        lock.lock();
        
try 
{
            
return 
linkLast(node);
        } 
finally 
{
            lock.unlock();
        }
    }
    
public 
boolean 
offer(E e) {
        
return 
offerLast(e);
    }
    
public 
boolean 
add(E e) {
        addLast(e);
        
return 
true 
;
    }
    
public 
void 
addLast(E e) {
        
if 
(!offerLast(e))
            
throw 
new 
IllegalStateException(
“Deque full”
);
    }
    
public 
E removeFirst() {
        E x = pollFirst();
        
if 
(x == 
null
) 
throw 
new 
NoSuchElementException();
        
return 
x;
    }
    
public 
E pollFirst() {
        
final 
ReentrantLock lock = 
this
.
lock
;
        lock.lock();
        
try 
{
            
return 
unlinkFirst();
        } 
finally 
{
            lock.unlock();
        }
    }
 
……
}
使用实例是:
将ArrayBlockingQueue的例子换成LinkedBlockingQueue即可:
 
// 新建一个等待队列
 
final
 BlockingQueue<String> bq = 
new
 ArrayBlockingQueue<String>(16);
换成:
final 
BlockingQueue<String> bq = 
new 
LinkedBlockingQueue<String>(16);

5.4 PriorityBlockingQueue 


PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

先看一下PriorityBlockingQueue的部分源码,理解一下PriorityBlockingQueue的实现原理和机制:

public 
class 
PriorityBlockingQueue <E> 
extends 
AbstractQueue<E>
    
implements 
BlockingQueue<E>, java.io.Serializable {
    
private 
final 
ReentrantLock 
lock 
;//说明本类使用一个lock来同步读写等操作
    
private 
transient 
Comparator<? 
super 
E> 
comparator
;
     // 使用指定的初始容量创建一个 PriorityBlockingQueue,并根据指定的比较器对其元素进行排序。
    
public 
PriorityBlockingQueue( 
int 
initialCapacity,
                                 Comparator<? 
super 
E> comparator) {
        
if 
(initialCapacity < 1)
            
throw 
new 
IllegalArgumentException();
        
this
.
lock 
= 
new 
ReentrantLock();
        
this
.
notEmpty 
= 
lock
.newCondition();
        
this
.
comparator 
= comparator;
        
this
.
queue 
= 
new 
Object[initialCapacity];
    }
     public 
E poll() {
        
final 
ReentrantLock lock = 
this
.
lock
;
        lock.lock();
        
try 
{
            
return 
dequeue();
        } 
finally 
{
            lock.unlock();
        }
    }
……
}
 

5.5 DelayQueue


     DelayQueue:是一个支持延时获取元素的使用优先级队列的实现的无界阻塞队列。队列中的元素必须实现Delayed接口和Comparable接口,也就是说
DelayQueue里面的元素必须有
public 
int 
compareTo( T o)和
long 
getDelay(TimeUnit unit)方法存在,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将DelayQueue运用在以下应用场景:
  • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  • 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。
我们来看一下DelayQueue的源码来理解一下:
//可以看出来E元素必须继承Delayed和而Delayed又继承Comparable;
public 
class 
DelayQueue<E 
extends 
Delayed> 
extends 
AbstractQueue<E>
    
implements 
BlockingQueue<E> {
 
    
private 
final 
transient 
ReentrantLock 
lock 
= 
new 
ReentrantLock();//安全锁机制
    
private 
final 
PriorityQueue<E> 
q 
= 
new 
PriorityQueue<E>();//PriorityQueue来存取元素
    
public 
E take() 
throws 
InterruptedException {
        
final 
ReentrantLock lock = 
this
.
lock
;
        lock.lockInterruptibly();
        
try 
{
            
for 
(;;) {
                E first = 
q
.peek();
                
if 
(first == 
null
)
                    
available
.await();
                
else 
{
                    //根据元素的Delay进行判断
                    
long 
delay = first.getDelay(
NANOSECONDS
);
                    
if 
(delay <= 0)
                        
return 
q 
.poll();
                    first = 
null
; 
// don’t retain ref while waiting
                    
if 
(
leader 
!= 
null
)
                       //没到时间阻塞等待
                        
available
.await();
                    
else 
{
                        Thread thisThread = Thread. currentThread();
                        
leader 
= thisThread;
                        
try 
{
                            
available
.awaitNanos(delay);
                        } 
finally 
{
                            
if 
(
leader 
== thisThread)
                                
leader 
= 
null 
;
                        }
                    }
                }
            }
        } 
finally 
{
            
if 
(
leader 
== 
null 
&& 
q
.peek() != 
null
)
                
available
.signal();
            lock.unlock();
        }
    }
……
}
我们来看一下DelayQueue的使用实例:
(1)实现一个Student对象作为DelayQueue的元素必须实现Delayed 接口的两个方法;
import 
java.util.concurrent.Delayed;
import 
java.util.concurrent.TimeUnit;
public 
class 
Student 
implements 
Delayed {
 
//必须实现Delayed接口
       
private 
String 
name 
;
       
private 
long 
submitTime 
;
// 交卷时间
       
private 
long 
workTime 
;
// 考试时间
       
public 
String getName() {
             
return 
this 
.
name 
+ 
” 交卷,用时” 
+ 
workTime
;
      }
       
public 
Student(String name, 
long 
submitTime) {
             
this
.
name 
= name;
             
this
.
workTime 
= submitTime;
             
this
.
submitTime 
= TimeUnit.
NANOSECONDS
.convert(submitTime, TimeUnit.
MILLISECONDS 
) + System.nanoTime ();
            System. 
out
.println(
this
.
name 
+ 
” 交卷,用时” 
+ 
workTime
);
      }
       
//必须实现getDelay方法
       
public 
long 
getDelay(TimeUnit unit) {
//          返回一个延迟时间
             
return 
unit.convert(
submitTime 
– System.nanoTime (), unit.
NANOSECONDS 
);
      }
       
//必须实现compareTo方法
       
public 
int 
compareTo(Delayed o) {
//          比较的方法
            Student that = (Student) o;
             
return 
submitTime 
> that.
submitTime 
? 1 : ( 
submitTime 
< that.
submitTime 
? -1 : 0);
      }
}
(2)执行运行类如下:
package 
demo.thread;
import 
java.util.concurrent.DelayQueue;
public 
class 
DelayQueueTest {
       
public 
static 
void 
main(String[] args) 
throws 
Exception {
             
// 新建一个等待队列
             
final 
DelayQueue<Student> bq = 
new 
DelayQueue<Student>();
             
for 
(
int 
i = 0; i < 5; i++) {
                  Student student = 
new 
Student(
“学生” 
+i,Math.round((Math. random()*10+i)));
                  bq.put(student); 
// 将数据存到队列里!
            }
             
//获取但不移除此队列的头部;如果此队列为空,则返回 null。
            System. 
out
.println(“bq.peek()”+bq.peek().getName());
             
//获取并移除此队列的头部,在可从此队列获得到期延迟的元素,或者到达指定的等待时间之前一直等待(如有必要)。
             
//poll(long timeout, TimeUnit unit) 大家可以试一试这个方法
      }
}

运行结果如下:每次运行结果都不一样,一问,我们获得永远是队列里面的第一个元素;

学生0 交卷,用时8学生1 交卷,用时6学生2 交卷,用时10学生3 交卷,用时10学生4 交卷,用时9bq.peek()学生1 交卷,用时6

可以慢慢的在以后的工作当中体会DelayQueue的用法;

转载地址:http://nrwcl.baihongyu.com/

你可能感兴趣的文章
沈颖刚:生物柴油或是高原柴油货车污染治理有效途径
查看>>
掌阅公布数字阅读报告:00后成第二大阅读群体
查看>>
冬季风暴席卷美国致航班取消车祸频发 20万人断电
查看>>
民航局正式启动北斗星基增强系统民航应用验证评估工作
查看>>
北京新机场 严打无人机“黑飞”
查看>>
8点1氪|阿里巴巴第三财季营收破千亿;传滴滴拟裁员25%;饿了么口碑超30亿美元融资已逐步到位...
查看>>
程序员用代码将近200个小时,为自己DIY一个手机音乐播放器
查看>>
CentOS 7安装TCP BBR拥塞算法
查看>>
JDBC【PreparedStatment、批处理、处理二进制、自动主键、调用存储过程、函数】...
查看>>
微软洪小文:真正的AI不应基于大数据,而需从小数据、零数据着手
查看>>
koroFileHeader 非常实用的Vscode 插件[用于添加文件头部注释]
查看>>
java多线程的杂谈
查看>>
BCH支付服务商Bitek为哥伦比亚商家提供比索兑换服务
查看>>
Centos6.8安装node生产环境
查看>>
这次不会说我的正则教程没写全了吧??
查看>>
LeetCode16.最接近的三数之和 JavaScript
查看>>
2017 Material design 第三章第二节《Icons》
查看>>
你凭什么做好互联网?
查看>>
【火炉炼AI】机器学习014-用SVM构建非线性分类模型
查看>>
Java线程的CPU时间片
查看>>