50°

Java 多线程基础(十二)生产者与消费者

 Java 多线程基础(十二)生产者与消费者

一、生产者与消费者模型

生产者与消费者问题是个非常典型的多线程问题,涉及到的对象包括“生产者”、“消费者”、“仓库”和“产品”。他们之间的关系如下:

①、生产者仅仅在仓储未满时候生产,仓满则停止生产。
②、消费者仅仅在仓储有产品时候才能消费,仓空则等待。
③、当消费者发现仓储没产品可消费时候会通知生产者生产。
④、生产者在生产出可消费产品时候,应该通知等待的消费者去消费。

生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。生产消费者模式如下图。

在日益发展的服务类型中,譬如注册用户这种服务,它可能解耦成好几种独立的服务(账号验证,邮箱验证码,手机短信码等)。它们作为消费者,等待用户输入数据,在前台数据提交之后会经过分解并发送到各个服务所在的url,分发的那个角色就相当于生产者。消费者在获取数据时候有可能一次不能处理完,那么它们各自有一个请求队列,那就是内存缓冲区了。做这项工作的框架叫做消息队列。

二、生产者与消费者实现

下面通过生产包子的例子及wait()/notify()方式实现该模型(后面学习线程池相关内容之后,再通过其它方式实现生产/者消费者模型)。

面包类:

public class Bread {
    private int capacity;    // 面包的容量
    private int size;        // 面包的实际数量
    public Bread(int capacity) {
        this.capacity = capacity;
        this.size = 0;
    }
</span><span style="color: #008000;">//</span><span style="color: #008000;"> 生产面包</span>
<span style="color: #0000ff;">public</span> <span style="color: #0000ff;">synchronized</span> <span style="color: #0000ff;">void</span> produce(<span style="color: #0000ff;">int</span><span style="color: #000000;"> val) {
    </span><span style="color: #0000ff;">try</span><span style="color: #000000;"> {
         </span><span style="color: #008000;">//</span><span style="color: #008000;"> left 表示“想要生产的数量”(有可能生产量太多,需多此生产)</span>
        <span style="color: #0000ff;">int</span> left =<span style="color: #000000;"> val;
        </span><span style="color: #0000ff;">while</span> (left &gt; 0<span style="color: #000000;">) {
            </span><span style="color: #008000;">//</span><span style="color: #008000;"> 库存已满时,等待“消费者”消费产品。</span>
            <span style="color: #0000ff;">while</span> (size &gt;=<span style="color: #000000;"> capacity)
                wait();
            </span><span style="color: #008000;">//</span><span style="color: #008000;"> 获取“实际生产的数量”(即库存中新增的数量)
            </span><span style="color: #008000;">//</span><span style="color: #008000;"> 如果“库存”+“想要生产的数量”&gt;“总的容量”,则“实际增量”=“总的容量”-“当前容量”。(此时填满仓库)
            </span><span style="color: #008000;">//</span><span style="color: #008000;"> 否则“实际增量”=“想要生产的数量”</span>
            <span style="color: #0000ff;">int</span> inc = (size+left)&gt;capacity ? (capacity-<span style="color: #000000;">size) : left;
            size </span>+=<span style="color: #000000;"> inc;
            left </span>-=<span style="color: #000000;"> inc;
            System.out.printf(</span>"%s produce(%3d) --&gt; left=%3d, inc=%3d, size=%3d\n"<span style="color: #000000;">,
                    Thread.currentThread().getName(), val, left, inc, size);
            </span><span style="color: #008000;">//</span><span style="color: #008000;"> 通知“消费者”可以消费了。</span>

notifyAll(); } }catch(InterruptedException e) { e.printStackTrace(); } } // 消费面包 public synchronized void consume(int val) { try { // left 表示“客户要消费数量”(有可能消费量太大,库存不够,需多此消费) int left = val; while (left > 0) { // 库存为0时,等待“生产者”生产产品。 while (size <= 0) wait(); // 获取“实际消费的数量”(即库存中实际减少的数量) // 如果“库存”<“客户要消费的数量”,则“实际消费量”=“库存”; // 否则,“实际消费量”=“客户要消费的数量”。 int dec = (size<left) ? size : left; size -= dec; left -= dec; System.out.printf("%s consume(%3d) <-- left=%3d, dec=%3d, size=%3d\n", Thread.currentThread().getName(), val, left, dec, size); notifyAll(); } } catch (InterruptedException e) { e.printStackTrace(); } } }

生产者类

public class Producer{
    Bread bread;
    public Producer(Bread bread) {
        this.bread = bread;
    }
    public void produce(final int val) {
        new Thread(() -> {
            bread.produce(val);
        }).start();;
    }
}

消费者类

public class Customer {
    private Bread bread;
    public Customer(Bread bread) {
        this.bread = bread;
    }
    public void consume(final int val) {
        new Thread(() -> {
            bread.consume(val);
        }).start();;
    }
}

测试类代码

public class Demo {
    public static void main(String[] args) {
        Bread bread = new Bread(100);
        Producer producer = new Producer(bread);
        Cunstomer customer = new Customer(bread);
    producer.produce(</span>60<span style="color: #000000;">);
    producer.produce(</span>120<span style="color: #000000;">);
    consumer.consume(</span>90<span style="color: #000000;">);
    consumer.consume(</span>150<span style="color: #000000;">);
    producer.produce(</span>110<span style="color: #000000;">);
}

}

// 运行结果
Thread-1 produce( 60) --> left=  0, inc= 60, size= 60
Thread-5 produce(110) --> left= 70, inc= 40, size=100
Thread-4 consume(150) <-- left= 50, dec=100, size=  0
Thread-2 produce(120) --> left= 20, inc=100, size=100
Thread-3 consume( 90) <-- left=  0, dec= 90, size= 10
Thread-4 consume(150) <-- left= 40, dec= 10, size=  0
Thread-5 produce(110) --> left=  0, inc= 70, size= 70
Thread-4 consume(150) <-- left=  0, dec= 40, size= 30
Thread-2 produce(120) --> left=  0, inc= 20, size= 50

说明:

①、Producer是“生产者”类,它与“面包(bread)”关联。当调用“生产者”的produce()方法时,它会新建一个线程并向“面包类”中生产产品。
②、Customer是“消费者”类,它与“面包(bread)”关联。当调用“消费者”的consume()方法时,它会新建一个线程并消费“面包类”中的产品。
③、Bread是面包类,记录“面包的产量(capacity)”以及面包当前实际数目(size)”。
        面包类的生产方法produce()和消费方法consume()方法都是synchronized方法,进入synchronized方法体,意味着这个线程获取到了该“面包”对象的同步锁。这也就是说,同一时间,生产者和消费者线程只能有一个能运行。通过同步锁,实现了对“残酷”的互斥访问。
       对于生产方法 produce() 而言:当面包量满时,生产者线程等待,需要等待消费者消费产品之后,生产线程才能生产;生产者线程生产完面包之后,会通过 notifyAll() 唤醒同步锁上的所有线程,包括“消费者线程”,即我们所说的“通知消费者进行消费”。
      对于消费方法consume()而言:当仓库为空时,消费者线程等待,需要等待生产者生产产品之后,消费者线程才能消费;消费者线程消费完产品之后,会通过 notifyAll() 唤醒同步锁上的所有线程,包括“生产者线程”,即我们所说的“通知生产者进行生产”。

本文转载自博客园,原文链接:https://www.cnblogs.com/lingq/p/13040572.html

全部评论: 0

    我有话说: