首页 课程 师资 教程 报名

Java线程安全的队列示例

  • 2022-08-24 10:26:36
  • 2893次 动力节点

在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。

注:什么叫线程安全?这个首先要明确。线程安全的类 ,指的是类内共享的全局变量的访问必须保证是不受多线程形式影响的。如果由于多线程的访问(比如修改、遍历、查看)而使这些变量结构被破坏或者针对这些变量操作的原子性被破坏,则这个类就不是线程安全的。

本文来讲讲这两种Queue,本文分为以下两个部分,用分割线分开:

BlockingQueue 阻塞算法

ConcurrentLinkedQueue,非阻塞算法

首先来看看BlockingQueue:

Queue是什么就不需要多说了吧,一句话:队列是先进先出。相对的,栈是后进先出。如果不熟悉的话先找本基础的数据结构的书看看吧。

BlockingQueue,顾名思义,“阻塞队列”:可以提供阻塞功能的队列。

首先,看看BlockingQueue提供的常用方法:

  可能报异常 返回布尔值 可能阻塞 设定等待时间
入队 add(e) offer(e) put(e) offer(e, timeout, unit)
出队 remove() poll() take() poll(timeout, unit)
查看 element() peek()

从上表可以很明显看出每个方法的作用,这个不用多说。我想说的是:

add(e) remove() element() 方法不会阻塞线程。当不满足约束条件时,会抛出IllegalStateException 异常。例如:当队列被元素填满后,再调用add(e),则会抛出异常。

offer(e) poll() peek() 方法即不会阻塞线程,也不会抛出异常。例如:当队列被元素填满后,再调用offer(e),则不会插入元素,函数返回false。

要想要实现阻塞功能,需要调用put(e) take() 方法。当不满足约束条件时,会阻塞线程。

BlockingQueue 阻塞算法

BlockingQueue作为线程容器,可以为线程同步提供有力的保障。

BlockingQueue定义的常用方法

BlockingQueue定义的常用方法如下:

  抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用

1. ArrayBlockingQueue

基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。

ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

2. LinkedBlockingQueue

基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

阻塞队列:线程安全

按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列检索操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。

注意:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingDeque {
    //阻塞队列,FIFO
    private static LinkedBlockingQueue<Integer> concurrentLinkedQueue = new LinkedBlockingQueue<Integer>();           
 public static void main(String[] args) {  
     ExecutorService executorService = Executors.newFixedThreadPool(2);  
     executorService.submit(new Producer("producer1"));  
     executorService.submit(new Producer("producer2"));  
     executorService.submit(new Producer("producer3"));  
     executorService.submit(new Consumer("consumer1"));  
     executorService.submit(new Consumer("consumer2"));  
     executorService.submit(new Consumer("consumer3"));  
 }  
 static class Producer implements Runnable {  
     private String name;  
     public Producer(String name) {  
         this.name = name;  
     }  
     public void run() {  
         for (int i = 1; i < 10; ++i) {  
             System.out.println(name+ "  生产: " + i);  
             //concurrentLinkedQueue.add(i);  
             try {
                concurrentLinkedQueue.put(i);
                Thread.sleep(200); //模拟慢速的生产,产生阻塞的效果
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }             
         }  
     }  
 }  
 static class Consumer implements Runnable {  
     private String name;  
     public Consumer(String name) {  
         this.name = name;  
     }  
     public void run() {  
         for (int i = 1; i < 10; ++i) {  
             try {          
                    //必须要使用take()方法在获取的时候阻塞
                      System.out.println(name+"消费: " +  concurrentLinkedQueue.take());  
                      //使用poll()方法 将产生非阻塞效果
                      //System.out.println(name+"消费: " +  concurrentLinkedQueue.poll());                       
                     //还有一个超时的用法,队列空时,指定阻塞时间后返回,不会一直阻塞
                     //但有一个疑问,既然可以不阻塞,为啥还叫阻塞队列?
                    //System.out.println(name+" Consumer " +  concurrentLinkedQueue.poll(300, TimeUnit.MILLISECONDS));                    
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }  
         }  
     }  
 }  
}

ConcurrentLinkedQueue,非阻塞算法

非阻塞队列

基于链接节点的、无界的、线程安全。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列检索操作从队列头部获得元素。当许多线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择。此队列不允许 null 元素。

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
public class NoBlockQueue {  
       private static ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<Integer>();             
    public static void main(String[] args) {  
        ExecutorService executorService = Executors.newFixedThreadPool(2);  
        executorService.submit(new Producer("producer1"));  
        executorService.submit(new Producer("producer2"));  
        executorService.submit(new Producer("producer3"));  
        executorService.submit(new Consumer("consumer1"));  
        executorService.submit(new Consumer("consumer2"));  
        executorService.submit(new Consumer("consumer3"));  
    }    
    static class Producer implements Runnable {  
        private String name;    
        public Producer(String name) {  
            this.name = name;  
        }    
        public void run() {  
            for (int i = 1; i < 10; ++i) {  
                System.out.println(name+ " start producer " + i);  
                concurrentLinkedQueue.add(i);  
                try {
                    Thread.sleep(20);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                //System.out.println(name+"end producer " + i);  
            }  
        }  
    }    
    static class Consumer implements Runnable {  
        private String name;    
        public Consumer(String name) {  
            this.name = name;  
        }  
        public void run() {  
            for (int i = 1; i < 10; ++i) {  
                try { 
                    System.out.println(name+" Consumer " +  concurrentLinkedQueue.poll());
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }  
//                System.out.println();  
//                System.out.println(name+" end Consumer " + i);  
            }  
        }  
    }  
} 

在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。

使用非阻塞队列,虽然能即时返回结果(消费结果),但必须自行编码解决返回为空的情况处理(以及消费重试等问题)。

另外他们都是线程安全的,不用考虑线程同步问题。

选你想看

你适合学Java吗?4大专业测评方法

代码逻辑 吸收能力 技术学习能力 综合素质

先测评确定适合在学习

在线申请免费测试名额
价值1998元实验班免费学
姓名
手机
提交