在我15年的职业生涯中,生产者和消费者的问题是我仅遇到过几次。 在大多数编程情况下,我们正在做的事情是以同步方式执行功能,其中JVM或Web容器自行处理多线程的复杂性。 但是,在编写某些需要的用例时。 上周,我遇到了一个这样的用例,使我在上一次这样做的时候回溯了三年。 但是,上次完成的方式却大不相同。

当我第一次听到问题陈述时,我立即知道需要什么。 但是,这次我的做法与上次有所不同。 这与我今天如何看待技术有关。 我不会涉足任何非技术方面,并且会直接跳入问题及其解决方案。 我开始研究市场上存在的东西,并发现了几篇文章,这些文章帮助我以正确的方式传达了我的想法。

问题陈述

我们需要一个用于批量迁移的解决方案。 我们正在将数据从系统1迁移到系统2,在此过程中,我们需要执行以下三个任务:

  • 根据组从数据库加载数据
  • 处理数据
  • 通过修改来更新在步骤1中加载的记录

我们必须处理100个小组,每个小组大约有4万条记录。 您可以想象如果我们以同步方式执行此练习将花费多少时间。 这里的图像有效地解释了这个问题。

生产者消费者:问题

生产者和消费者模式

首先让我们看一下生产者消费者模式。 如果您参考上面的问题说明并查看图片,我们会看到有太多实体准备使用其部分数据。 但是,没有足够的工人可以处理所有数据。 因此,随着生产者继续排队,它只会继续增长。 我们看到系统开始占用线程并花费大量时间。

中级解决方案

生产者消费者:中级方法

我们确实有一个中间解决方案。 参考该图像,您将立即注意到,生产者将他们的工作堆积在文件柜中,而工人在完成上一项任务时继续将其捡起来。 但是,这种方法确实存在一些明显的缺点:

  1. 仍然只有一名工人必须完成所有工作。 外部系统可能很高兴,但是任务将继续存在,直到工作人员完成所有任务为止
  2. 生产者将他们的数据堆积在队列中,并且需要资源来保存它们。 就像在此示例中,机柜可以装满一样,JVM资源也可能发生同样的情况。 我们需要注意要在内存中放入多少数据,在某些情况下可能不会太多。

解决方案

生产者消费者:解决方案

解决方案是我们每天在很多地方都能看到的,例如电影院大厅排队,汽油泵等。有很多人来订票,而根据进来的人数,增加了更多的人来发行票。 本质上,请参考此处的图像,您会注意到生产者将继续向内阁添加他们的工作,并且我们有更多的工人来处理工作量。

Java提供了并发包来解决此问题。 到现在为止,我一直在较低级别上进行线程工作,这是我第一次使用此程序包。 当我开始浏览网络并阅读其他博客作者的言论时,我遇到了一篇非常好的文章 。 它有助于非常有效地理解BlockingQueue的使用。 但是,Dhruba提供的解决方案并不能帮助我实现所需的高吞吐量。 因此,我开始探索对ArrayBlockingQueue的使用。

控制器

这是管理生产者和消费者之间的合同的第一类。 控制器将为生产者设置1个线程,为消费者设置2个线程。 根据需要,我们可以创建所需数量的线程。 甚至甚至可以从属性中读取数据或做一些动态魔术。 现在,我们将保持简单。

package com.kapil.techieforever.producerconsumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestProducerConsumer
{
public static void main(String args[])
{
try
{
Broker broker = new Broker();
ExecutorService threadPool = Executors.newFixedThreadPool(3);
threadPool.execute(new Consumer("1", broker));
threadPool.execute(new Consumer("2", broker));
Future producerStatus = threadPool.submit(new Producer(broker));
// this will wait for the producer to finish its execution.
producerStatus.get();
threadPool.shutdown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}

我正在使用ExecuteService创建线程池并对其进行管理。 代替使用基本的Thread实现,这是一种更有效的方法,因为它将根据需要处理退出和重新启动线程。 您还将注意到,我正在使用Future类来获取生产者线程的状态。 该类非常有效,它将使我的程序停止进一步执行。 这是在线程上替换“ .join”方法的一种好方法。 注意:在这个例子中,我并不是很有效地使用Future。 因此您可能需要尝试一些适合自己的事情。
另外,您还应注意在生产者和消费者之间用作文件柜的Broker类。 我们将在短时间内看到它的实现。

生产者

此类负责产生需要处理的数据。

package com.kapil.techieforever.producerconsumer;
public class Producer implements Runnable
{
private Broker broker;
public Producer(Broker broker)
{
this.broker = broker;
}
@Override
public void run()
{
try
{
for (Integer i = 1; i < 5 + 1; ++i)
{
System.out.println("Producer produced: " + i);
Thread.sleep(100);
broker.put(i);
}
this.broker.continueProducing = Boolean.FALSE;
System.out.println("Producer finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}

此类正在做它所能做的最简单的事情-向代理添加一个整数。 需要注意的一些关键领域是:
1. Broker上有一个属性,生产者在完成生产后最终会对其进行更新。 这也称为“最终”或“毒药”条目。 消费者使用它来知道不再有数据 2.我使用Thread.sleep来模拟某些生产者可能需要更多时间来生产数据。 您可以调整此值并查看消费者的行为

消费者

此类负责从代理读取数据并完成其工作

package com.kapil.techieforever.producerconsumer;
public class Consumer implements Runnable
{
private String name;
private Broker broker;
public Consumer(String name, Broker broker)
{
this.name = name;
this.broker = broker;
}
@Override
public void run()
{
try
{
Integer data = broker.get();
while (broker.continueProducing || data != null)
{
Thread.sleep(1000);
System.out.println("Consumer " + this.name + " processed data from broker: " + data);
data = broker.get();
}
System.out.println("Comsumer " + this.name + " finished its job; terminating.");
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}

这还是一个简单的类,它读取Integer并将其打印在控制台上。 但是,要注意的关键点是:
1.处理数据的循环是一个无限循环,它在两种情况下运行–直到生产者消费并且经纪人有一些数据为止
2.同样,Thread.sleep用于创建有效的不同方案

经纪人

package com.kapil.techieforever.producerconsumer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class Broker
{
public ArrayBlockingQueue queue = new ArrayBlockingQueue(100);
public Boolean continueProducing = Boolean.TRUE;
public void put(Integer data) throws InterruptedException
{
this.queue.put(data);
}
public Integer get() throws InterruptedException
{
return this.queue.poll(1, TimeUnit.SECONDS);
}
}

首先要注意的是,我们使用ArrayBlockingQueue作为数据持有人。 我不会说这是什么,而是要您在此处的JavaDocs上阅读它。 但是,我将解释生产者将把数据放入队列,而使用者将以FIFO格式从队列中获取数据。 但是,如果生产者运行缓慢,则消费者将等待数据进入,如果阵列已满,生产者将等待数据填满。

另外,请注意,我使用的是“投票”功能,而不是进入队列。 这是为了确保消费者不会一直等待,等待会在几秒钟后超时。 这有助于我们进行相互交流,并在处理完所有数据后杀死消费者。 (注意:尝试用get代替poll,您将看到一些有趣的输出)。

我的代码位于Google项目托管上 。 随意浏览并从那里下载。 本质上,这是一个蚀(Spring STS)项目。 根据下载时间,您可能还会在下载时获得其他软件包和类。 也可以随意查看这些内容并分享您的评论
–您可以在SVN浏览器中浏览源代码,或者;
–您可以从项目本身下载它 。

侧面解决方案

最初,我在中间发布了此解决方案,但是后来我意识到这不是做事的方法,因此我从主要内容中删除了此内容,并将其放在最后。 最终解决方案的另一种变体是,工人/消费者一次不处理一项工作,而是一起处理多个工作,并在完成下一个工作之前先完成工作。 这种方法可以产生相似的结果,但是在某些情况下,如果我们有一些工作需要花费不同的时间才能完成,那么从本质上讲,这意味着某些工人比其他工人最终会更快地结束工作,从而造成了瓶颈。 并且,如果作业是事先分配的,这意味着所有消费者将在加工之前拥有所有作业(不是生产者-消费者模式),那么这个问题可能加起来甚至更多,并导致处理逻辑的更多延迟。

相关文章

  • 队列是Devil自己的数据结构 (petewarden.typepad.com)
  • 我对撒但的小帮手排队有错吗? (petewarden.typepad.com)
  • http://code.google.com/p/disruptor/

参考: 并发模式: JCG合作伙伴的 生产者和消费者   Scratch Pad博客上的Kapil Viren Ahuja。

翻译自: https://www.javacodegeeks.com/2012/02/concurrency-pattern-producer-and.html

并发模式:生产者和消费者相关推荐

  1. python 生产者和消费者模式_Python爬虫:生产者和消费者模式

    认识生产者和消费者模式 生产者和消费者是多线程中很常见的一个问题.产生数据的模块,我们称之为生产者,而处理数据的模块,就称为消费者.但是单单只有生产者和消费者显然还是不够的,一般来说,我们还有一个缓冲 ...

  2. 认识生产者和消费者模式

    认识生产者和消费者模式 生产者和消费者是多线程中很常见的一个问题.产生数据的模块儿,我们称之为生产者,而处理数据的模块,就称为消费者.但是单单只有生产者和消费者显然还是不够的,一般来说,我们还有一个缓 ...

  3. python中的 生产者和消费者

    什么是生产者消费者模式 在软件开发过程中,经常会遇到这样的情景: 某些模块负责生产数据,这些数据由其他模块来负责处理(此处的模块可能是:函数.线程.进程等).产生数据的模块称为生产者,而处理数据的模块 ...

  4. 生产者,消费者,CDN

    1 生产者消费者模型应用场景及优势? 什么是生产者消费者模型 在 工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进 ...

  5. 线程案例:生产者与消费者

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程. 在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据. 同样的道理,如 ...

  6. [19/04/11-星期四] 多线程_并发协作(生产者/消费者模式_2种解决方案(管程法和信号灯法))...

    一.概念 多线程环境下,我们经常需要多个线程的并发和协作.这个时候,就需要了解一个重要的多线程并发协作模型"生产者/消费者模式". Ø 什么是生产者? 生产者指的是负责生产数据的模 ...

  7. python 多线程并发编程(生产者、消费者模式),边读图像,边处理图像,处理完后保存图像实现提高处理效率

    文章目录 需求 实现 先导入本次需要用到的包 一些辅助函数 如下函数是得到指定后缀的文件 如下的函数一个是读图像,一个是把RGB转成BGR 下面是主要的几个处理函数 在上面几个函数构建对应的处理函数 ...

  8. Java并发编程系列18:多线程之生产者和消费者模式_信号灯法(wait/notify通知机制)

    1.生产者消费者模式 生产者消费者问题(Producer-consumer problem),也称为有限缓冲问题(Bounded-buffer problem),是一个多线程同步问题的经典案例.该问题 ...

  9. Java 并发(生产者/消费者 模式)

    >生产者/消费者 模式角色:生产者,消费者都是线程,两者中间是容器,容器内部是产品. 要求: 容器 里面要定义容量 容器 往里面添加(满时等待) 或者 从里面删除(空时等待) ,都要是阻塞的(等 ...

最新文章

  1. Android中通过自定义签名控件实现手写签名
  2. 2.1 数个常用的网络命令
  3. C++学习笔记-----输出数据的另一种写法(std::copy)
  4. 引用 看下面图片是向左转还是向右转呢?
  5. 计算机 ieee access,计算机 | IEEE Access 诚邀专刊稿件 (IF:3.557)
  6. 处理测试环境硬盘爆满
  7. dotnet程序优化心得(三)
  8. TortoiseSVN的bin目录下面没有svn.exe
  9. linux如何确认账号过期了,linux下非root用户秘密过期如何确认,如果确认,该如何延期使其有效?...
  10. HDU 3466 Proud Merchants 带有限制的01背包问题
  11. 4位格雷码的顺序编码_格雷码的编码和译码算法.doc
  12. 计算机专业实习报告-5000字+,以及计算机专业实习周记-15篇
  13. C# 关于浏览器——CefSharp篇
  14. 软件生命周期模型汇总
  15. 【RX解码MIPI输出】XS9922B 4通道模拟复合视频解码芯片 功能对标TP2815
  16. 操作系统虚拟存储管理实验
  17. 关于互联网+分布式光伏运维平台的应用介绍-李亚俊
  18. 订单需求BOM合并 自动计算
  19. JAVA学习3-抽象类、内部类、数组、Object、System、String、基本包装类型
  20. ACM比赛中如何加速c++的输入输出

热门文章

  1. java线程同步——竞争条件的荔枝+锁对象
  2. java日志——基本日志+高级日志
  3. 在Spring Boot中使用切面统一处理自定义的异常
  4. 普罗米修斯使用es数据库_用普罗米修斯和格拉法纳仪法来豪猪
  5. java char类型空值_展望Java的未来:空值类型
  6. glacier2_Amazon Glacier的Scala客户端
  7. 怎么运行aws的示例程序_使Spring Boot应用程序在AWS上无服务器运行
  8. java crud_Java 8流中的数据库CRUD操作
  9. java自动生成合同_Java 7和Java 8之间的细微自动关闭合同更改
  10. Java XMPP负载测试工具