首页 课程 师资 教程 报名

Java线程安全的阻塞队列

  • 2022-07-29 10:58:53
  • 1623次 动力节点

在本Java教程中,动力节点小编将通过相同的生产者/消费者概念来解释BlockingQueue in Java.

Java中阻塞队列的优点是什么?

Ajava.util.Queue 支持在检索元素时等待队列变为非空,并在存储元素时等待队列中的空间变为可用的操作。

我们需要创建四个 Java 类:

CrunchifyMessage.java 放置和获取消息

CrunchifyBlockingProducer.java将消息放入队列

CrunchifyBlockingConsumer.java 从队列中获取消息

CrunchifyBlockingMain.java 开始测试

BlockingQueue 实现是thread-safe. 所有排队方法本质上都是原子的并且使用内部锁。

让我们开始在 Java 中实现线程安全的 BlockingQueue

第1步

创建类 CrunchifyMessage.java。这是简单的Java 对象。

package com.crunchify.example;
/**
 * @author Crunchify.com 
 * simple Message class to put and get message into queue
 */
public class CrunchifyMessage {
    private String crunchifyMsg;    
    public CrunchifyMessage(String string) {
        this.crunchifyMsg = string;
    }    
    public String getMsg() {
        return crunchifyMsg;
    }
}

第2步

CrunchifyBlockingProducer.java 创建创建简单味精并将其放入队列的生产者 。

package com.crunchify.example;
import java.util.concurrent.BlockingQueue;
/**
 * @author Crunchify.com
 * 
 */
public class CrunchifyBlockingProducer implements Runnable {  
    private BlockingQueue<CrunchifyMessage> crunchQueue;    
    public CrunchifyBlockingProducer(BlockingQueue<CrunchifyMessage> queue) {
        this.crunchQueue = queue;
    }    
    @Override
    public void run() {
        // producing CrunchifyMessage messages
        for (int i = 1; i <= 5; i++) {
            CrunchifyMessage msg = new CrunchifyMessage("i'm msg " + i);
            try {
                Thread.sleep(10);
                crunchQueue.put(msg);
                System.out.println("CrunchifyBlockingProducer: Message - " + msg.getMsg() + " produced.");
            } catch (Exception e) {
                System.out.println("Exception:" + e);
            }
        }        
        // adding exit message
        CrunchifyMessage msg = new CrunchifyMessage("All done from Producer side. Produced 50 CrunchifyMessages");
        try {
            crunchQueue.put(msg);
            System.out.println("CrunchifyBlockingProducer: Exit Message - " + msg.getMsg());
        } catch (Exception e) {
            System.out.println("Exception:" + e);
        }
    }   
}

第3步

创建 CrunchifyBlockingConsumer.java 从队列中消费消息的类。

package com.crunchify.example;
import java.util.concurrent.BlockingQueue;
/**
 * @author Crunchify.com
 * 
 */ 
public class CrunchifyBlockingConsumer implements Runnable {   
    private BlockingQueue<CrunchifyMessage> queue;    
    public CrunchifyBlockingConsumer(BlockingQueue<CrunchifyMessage> queue) {
        this.queue = queue;
    }    
    @Override
    public void run() {
        try {
            CrunchifyMessage msg;            
            // consuming messages until exit message is received
            while ((msg = queue.take()).getMsg() != "exit") {
                Thread.sleep(10);
                System.out.println("CrunchifyBlockingConsumer: Message - " + msg.getMsg() + " consumed.");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }   
}

第4步

创建 CrunchifyBlockingMain.java 运行 BlockingQueue测试的简单方法。运行这个程序来检查 BlockingQueue 的行为。

package com.crunchify.example;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; 
/**
 * @author Crunchify.com
 * 
 */
public class CrunchifyBlockingMain {   
    public static void main(String[] args) {       
        // Creating BlockingQueue of size 10
        // BlockingQueue supports operations that wait for the queue to become non-empty when retrieving an element, and
        // wait for space to become available in the queue when storing an element.
        BlockingQueue<CrunchifyMessage> crunchQueue = new ArrayBlockingQueue<>(10);
        CrunchifyBlockingProducer crunchProducer = new CrunchifyBlockingProducer(crunchQueue);
        CrunchifyBlockingConsumer crunchConsumer = new CrunchifyBlockingConsumer(crunchQueue);        
        // starting producer to produce messages in queue
        new Thread(crunchProducer).start();        
        // starting consumer to consume messages from queue
        new Thread(crunchConsumer).start();        
        System.out.println("Let's get started. Producer / Consumer Test Started.
");
    }   
}

BlockingQueue不接受空元素。在尝试添加、放置或提供null时,实现会抛出NullPointerException。

null用作标记值以指示轮询操作失败。

结果:

Let's get started. Producer / Consumer Test Started.
CrunchifyBlockingProducer: Message - i'm msg 1 produced.
CrunchifyBlockingProducer: Message - i'm msg 2 produced.
CrunchifyBlockingConsumer: Message - i'm msg 1 consumed.
CrunchifyBlockingConsumer: Message - i'm msg 2 consumed.
CrunchifyBlockingProducer: Message - i'm msg 3 produced.
CrunchifyBlockingConsumer: Message - i'm msg 3 consumed.
CrunchifyBlockingProducer: Message - i'm msg 4 produced.
CrunchifyBlockingConsumer: Message - i'm msg 4 consumed.
CrunchifyBlockingProducer: Message - i'm msg 5 produced.
CrunchifyBlockingProducer: Exit Message - All done from Producer side. Produced 50 CrunchifyMessages
CrunchifyBlockingConsumer: Message - i'm msg 5 consumed.
CrunchifyBlockingConsumer: Message - All done from Producer side. Produced 50 CrunchifyMessages consumed.

什么时候应该使用 java.util.concurrent.BlockingQueue?

当您想限制某种传入请求时,您应该使用相同的

生产者可以通过无限队列远远领先于消费者。如果消费者没有赶上生产者,那么它可能会导致 OutOfMemoryError. 在这种情况下,最好向潜在的生产者发出队列已满的信号,并在失败后迅速放弃。

换句话说:生产者自然受到限制。

阻塞队列通常用于并发应用程序

它提供了正确的、线程安全的实现

内存消耗也应该受到限制

选你想看

你适合学Java吗?4大专业测评方法

代码逻辑 吸收能力 技术学习能力 综合素质

先测评确定适合在学习

在线申请免费测试名额
价值1998元实验班免费学
姓名
手机
提交