rl 相当于 一个生产者的仓库(消费者消费的时候会从这个仓库中拿东西);
创新互联是一家专业提供陕州企业网站建设,专注与成都网站建设、成都网站制作、html5、小程序制作等业务。10年已为陕州众多企业、政府机构等服务。创新互联专业网络公司优惠进行中。
wp 相当于库存数量,生产者生产一次 wp数量会+1, 消费者消费一次 wp数量会-1
开启线程:同步进行生产跟消费的动作。(一个只生产,一个只消费)
两个方法(shengchan xiaofei) 是互相notify的,因为他们的wait对象都是 Ck,所以理论上不会存在wait后不释放的情况。可以再完善一下,用 reentrantLock 或者自己写一个方法,比如在wait后几秒 如果还未被notify 强制notify。
小小回顾 写了个简单的demo 项目中没有过 还能记得 纪念一下
产品的entity
Java代码
public class Product {
//产品名称
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this name = name;
}
}
生产者
Java代码
/**
* 生产者
* @author 饭香
*
*/
public class Producer implements Runnable{
private Shop shop;//要去送货的商店
public Producer(Shop shop){
this shop=shop;
}
public void run() {
for(int i= ;i ;i++){
shop produ();
}
}
}
消费者
Java代码
/**
* 消费者
* @author 饭香
*
*/
public class Cousumer implements Runnable{
private Shop shop;//要去消费的商店
public Cousumer(Shop shop){
this shop=shop;
}
public void run() {
for(int i= ;i ;i++){//消费 次
usu();
}
}
}
模拟商店(一切围绕商店 商店只能有一个 产品随便多个实例 这个也是别人问我总是出错的地方 对象思想)
Java代码
import java util ArrayList;
import java util List;
/**
* 模拟商店 (进货/销售)
* @author fx
*
*/
public class Shop {
private static int i= ;
//产品的容器;达到容器暂停生产 消费到 等待生产
private static ListProduct list;
static{
list= new ArrayListProduct();
}
/**
* 生产产品
*/
public synchronized void produ(){
if(list size()= ){
try {
System out println( 生产商品 +i+ 时 达到了总数暂停生产 );
this wait();//进入休眠
} catch (InterruptedException e) {
System out println(e toString());
e=null;
}
} //生产商品
Product product= new Product();
product setName( 商品 +i);
list add(product);
System out println( 生产了商品 +product getName()+ 商品总数 +i);
System out println( 容器容量 +list size());
i++;
super notify();
}
/**
* 消费产品
* @return
*/
public synchronized void cousu(){
if(list size()== ){//消费完时 挂起
System out println( +++++++++++++++++++++++商品消费完了 等待+++++++++++++++= );
try {
this wait();
} catch (InterruptedException e) {
// TODO Auto generated catch block
System out println(e toString());
e=null;
}
}
Product product=list get( );
list remove( );
System out println( 消费了获得了商品 +product getName());
System out println( 容器容量 +list size());
super notify();
}
}
测试代码
Java代码
public static void main(String[] args) {
Shop shop=new Shop();//商店
Producer pro=new Producer(shop);
Cousumer cou = new Cousumer(shop);
new Thread(pro pro ) start();
new Thread(cou cou ) start();
}
生产了商品 商品 商品总数
容器容量
消费了获得了商品 商品
容器容量
+++++++++++++++++++++++商品消费完了 等待+++++++++++++++=
生产了商品 商品 商品总数
容器容量
消费了获得了商品 商品
容器容量
+++++++++++++++++++++++商品消费完了 等待+++++++++++++++=
生产了商品 商品 商品总数
容器容量
消费了获得了商品 商品
容器容量
+++++++++++++++++++++++商品消费完了 等待+++++++++++++++=
生产了商品 商品 商品总数
容器容量
生产了商品 商品 商品总数
容器容量
消费了获得了商品 商品
容器容量
生产了商品 商品 商品总数
容器容量
消费了获得了商品 商品
容器容量
生产了商品 商品 商品总数
容器容量
消费了获得了商品 商品
容器容量
消费了获得了商品 商品
lishixinzhi/Article/program/Java/gj/201311/27600
我晕看了好久,别人写的代码看的真是辛苦,也没注释...修改了一堆括号!!
妥妥的没问题,你的资源用的数组,换句话说,
你数组被A1线程增加索引1,然后B过来拿走索引1; 数组里面此刻是什么?当然是0了啊;因为你递减了...
然后A2被拿到执行权,怎么样?是不是还去增加索引1??明白了?
如果你想要不重复,就别递减就行了!
另外你这么改,有什么问题看的很醒目,知道发生在哪个线程上!
public class ProduceConsumerDemo {
public static void main(String[] args) {
// 1.创建资源
Resource resource = new Resource();
// 2.创建两个任务
Producer producer = new Producer(resource);
Consumer consumer = new Consumer(resource);
// 3.创建线程
/*
* 多生产多消费产生的问题:重复生产、重复消费
*/
Thread thread0 = new Thread(producer);
Thread thread1 = new Thread(producer);
thread0.setName("生产者(NO0)");
thread1.setName("生产者(NO1)");
Thread thread2 = new Thread(consumer);
Thread thread3 = new Thread(consumer);
thread2.setName("消费者(NO2)");
thread3.setName("消费者(NO3)");
thread0.start();
thread1.start();
thread2.start();
thread3.start();
}
}
class Resource {
private String name;
private int count = 1;
// 定义标记
private boolean flag;
// 提供给商品赋值的方法
public synchronized void setName(String name) {// thread0, thread1在这里运行
while (flag)// 判断标记为true,执行wait等待,为false则生产
/*
* 这里使用while,而不使用if的理由如下:
*
* thread0有可能第二次也抢到锁的执行权,判断为真,则有面包不生产,所以接下来执行等待,此时thread0在线程池中。
* 接下来活的线程有3个(除了thread0),这三个线程都有可能获取到执行权.
* 假设thread1获得了执行权,判断为真,则有面包不生产,执行等待。此时thread1又进入到了线程池中。
* 接下来有两个活的线程thread2和thread3。 假设thread2又抢到了执行权,所以程序转到了消费get处……
*/
try {
this.wait();//这里wait语句必须包含在try/catch块中,抛出异常。
} catch (InterruptedException e) {
e.printStackTrace();
}
this.name = name + count;// 第一个面包
count++;// 2
System.out.println(Thread.currentThread().getName() + this.name);// thread0线程生产了面包1
// 生产完毕,将标记改成true.
flag = true;// thread0第一次生产完面包以后,将标记改为真,表示有面包了
// 唤醒消费者(这里使用notifyAll而不使用notify的原因在下面)
this.notifyAll();// 第一次在这里是空唤醒,没有意义
}
/*
* 通过同步,解决了没生产就消费的问题
* 生产完以后,生产者释放了this锁,此时,生产者和消费者同时去抢锁,又是生产者抢到了锁,所以就出现了一直生产的情况。
* 与“生产一个就消费一个的需求不符合” 等待唤醒机制 wait();该方法可以使线程处于冻结状态,并将线程临时存储到线程池
* notify();唤醒指定线程池中的任意一个线程。 notifyAll();唤醒指定线程池中的所有线程
* 这些方法必须使用在同步函数中,因为他们用来操作同步锁上的线程上的状态的。
* 在使用这些方法时候,必须标识他们所属于的锁,标识方式就是锁对象.wait(); 锁对象.notify(); 锁对象.notifyAll();
* 相同锁的notify()可以获取相同锁的wait();
*/
public synchronized void getName() {// thread2,thread3在这里运行
while (!flag)
/*
* ……接着上面的程序执行分析 thread2拿到锁获取执行权之后,判断!flag为假,则不等待,直接消费面包1,输出一次.
* 消费完成之后将flag改为假 接下来又唤醒了thread0或者thread1生产者中的一个
* 假设又唤醒了thread0线程,现在活的线程有thread0,thread2,thread3三个线程
* 假设接下来thread2又抢到了执行权,判断!flag为真,没面包了,停止消费,所以thread2执行等待.
* 此时活着的线程有thread0和thread3。
* 假设thread3得到了执行权,拿到锁之后进来执行等待,此时活着的线程只有thread0.
* 所以thread0只能抢到执行权之后,生产面包2,将标记改为true告诉消费者有面包可以消费了。
* 接下来执行notify唤醒,此时唤醒休眠中的3个线程中的任何一个都有可能。
* 如果唤醒了消费者thread2或者thread3中的任何一个,程序都是正常。如果此时唤醒thread1则不正常。
* 如果唤醒了thread1,此时活着的线程有thread0和thread1两个线程。
* 假设thread0又获得了执行权,判读为真有面包,则又一次执行等待。
* 接下来只有thread1线程有执行权(此时没有判断标记直接生产了,出错了),所以又生产了面包3。 在这个过程中,面包2没有被消费。
* 这就是连续生产和消费容易出现的问题。
*
* 原因:被唤醒的线程没有判断标记就开始执行了,导致了重复的生产和消费发生。
*
* 解决:被唤醒的线程必须判断标记,使用while循环标记,而不使用if判断的理由。
*
* 但是接下来会出现死锁,原因在于:
* 上面的程序中thread0在执行notify的时候唤醒了thread1,而此时thread2和thread3两个消费者线程都处于等待状态
* thread1在执行while判断语句之后判断为真,则执行等待,此时所有的线程都处于冻结等待状态了。
*
* 原因:本方线程在执行唤醒的时候又一次唤醒了本方线程,而本方线程循环判断标记又继续等待,而导致所有的线程都等待。
*
* 解决:本方线程唤醒对方线程, 可以使用notifyAll()方法
* 唤醒之后,既有本方,又有对方,但是本方线程判断标记之后,会继续等待,这样就有对方线程在执行。
*/
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + this.name);
// 将标记改为false
flag = false;
// 唤醒生产者
this.notify();
}
}
// 生产者
class Producer implements Runnable {
private Resource resource;
public Producer(Resource resource) {
this.resource = resource;
}
public void run() {
while (true) {
resource.setName("面包");
}
}
}
// 消费者
class Consumer implements Runnable {
private Resource resource;
public Consumer(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
while (true) {
resource.getName();
}
}
}
public static void main(String[] args) {
Buffer buffer=new Buffer(); //创建一个临界区对象
new Producer(buffer,100).start(); //创建一个生产者对象,并启动其线程
new Consumer(buffer,200).start(); //创建一个消费者对象,并启动其线程
new Consumer(buffer,201).start(); //创建第二个消费者对象,并启动其线程
}
这一段代码,多加入
new Consumer(buffer,201).start(); //创建第二个消费者对象,并启动其线程
多加一段代码创建一个消费者
多加入
new Producer(buffer,100).start();
创建一个生产者。
想要很多生产者和消费者?加就是了啊。
第四个文件 Buffer.java
这个是实现同步的主要缓存类。想要实现同步
在每个方法的声明前都加入synchronized 就行
synchronized 线程锁,很好用的。源代码已经加入了
就比如
public synchronized void put(int value) { //同步方法控制临界区内容写入
使用的生产者和消费者模型具有如下特点:
(1)本实验的多个缓冲区不是环形循环的,也不要求按顺序访问。生产者可以把产品放到目前某一个空缓冲区中。
(2)消费者只消费指定生产者的产品。
(3)在测试用例文件中指定了所有的生产和消费的需求,只有当共享缓冲区的数据满足了所有关于它的消费需求后,此共享缓冲区才可以作为空闲空间允许新的生产者使用。
(4)本实验在为生产者分配缓冲区时各生产者间必须互斥,此后各个生产者的具体生产活动可以并发。而消费者之间只有在对同一产品进行消费时才需要互斥,同时它们在消费过程结束时需要判断该消费对象是否已经消费完毕并清除该产品。
Windows
用来实现同步和互斥的实体。在Windows
中,常见的同步对象有:信号量(Semaphore)、
互斥量(Mutex)、临界段(CriticalSection)和事件(Event)等。本程序中用到了前三个。使用这些对象都分
为三个步骤,一是创建或者初始化:接着请求该同步对象,随即进入临界区,这一步对应于互斥量的
上锁;最后释放该同步对象,这对应于互斥量的解锁。这些同步对象在一个线程中创建,在其他线程
中都可以使用,从而实现同步互斥。当然,在进程间使用这些同步对象实现同步的方法是类似的。
1.用锁操作原语实现互斥
为解决进程互斥进人临界区的问题,可为每类临界区设置一把锁,该锁有打开和关闭两种状态,进程执行临界区程序的操作按下列步骤进行:
①关锁。先检查锁的状态,如为关闭状态,则等待其打开;如已打开了,则将其关闭,继续执行步骤②的操作。
②执行临界区程序。
③开锁。将锁打开,退出临界区。
2.信号量及WAIT,SIGNAL操作原语
信号量的初值可以由系统根据资源情况和使用需要来确定。在初始条件下信号量的指针项可以置为0,表示队列为空。信号量在使用过程中它的值是可变的,但只能由WAIT,SIGNAL操作来改变。设信号量为S,对S的WAIT操作记为WAIT(S),对它的SIGNAL操作记为SIGNAL(S)。
WAIT(S):顺序执行以下两个动作:
①信号量的值减1,即S=S-1;
②如果S≥0,则该进程继续执行;
如果
S(0,则把该进程的状态置为阻塞态,把相应的WAITCB连人该信号量队列的末尾,并放弃处理机,进行等待(直至其它进程在S上执行SIGNAL操作,把它释放出来为止)。
SIGNAL(S):顺序执行以下两个动作
①S值加
1,即
S=S+1;
②如果S)0,则该进程继续运行;
如果S(0则释放信号量队列上的第一个PCB(既信号量指针项所指向的PCB)所对应的进程(把阻塞态改为就绪态),执行SIGNAL操作的进程继续运行。
在具体实现时注意,WAIT,SIGNAL操作都应作为一个整体实施,不允许分割或相互穿插执行。也就是说,WAIT,SIGNAL操作各自都好像对应一条指令,需要不间断地做下去,否则会造成混乱。
从物理概念上讲,信号量S)时,S值表示可用资源的数量。执行一次WAIT操作意味着请求分配一个单位资源,因此S值减1;当S0时,表示已无可用资源,请求者必须等待别的进程释放了该类资源,它才能运行下去。所以它要排队。而执行一次SIGNAL操作意味着释放一个单位资源,因此S值加1;若S(0时,表示有某些进程正在等待该资源,因而要把队列头上的进程唤醒,释放资源的进程总是可以运行下去的。
---------------
/**
*
生产者
*
*/
public
class
Producer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Producer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
empty.p();
mutex.p();
System.out.println(name+"
inserts
a
new
product
into
"+buf.nextEmptyIndex);
buf.nextEmptyIndex
=
(buf.nextEmptyIndex+1)%buf.size;
mutex.v();
full.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
---------------
/**
*
消费者
*
*/
public
class
Customer
implements
Runnable{
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Customer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf){
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
}
public
void
run(){
while(true){
full.p();
mutex.p();
System.out.println(name+"
gets
a
product
from
"+buf.nextFullIndex);
buf.nextFullIndex
=
(buf.nextFullIndex+1)%buf.size;
mutex.v();
empty.v();
try
{
Thread.sleep(1000);
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
}
-------------------------
/**
*
缓冲区
*
*/
public
class
Buffer{
public
Buffer(int
size,int
nextEmpty,int
nextFull){
this.nextEmptyIndex
=
nextEmpty;
this.nextFullIndex
=
nextFull;
this.size
=
size;
}
public
int
size;
public
int
nextEmptyIndex;
public
int
nextFullIndex;
}
-----------------
/**
*
此类用来模拟信号量
*
*/
public
class
Semaphore{
private
int
semValue;
public
Semaphore(int
semValue){
this.semValue
=
semValue;
}
public
synchronized
void
p(){
semValue--;
if(semValue0){
try
{
this.wait();
}
catch
(InterruptedException
e)
{
e.printStackTrace();
}
}
}
public
synchronized
void
v(){
semValue++;
if(semValue=0){
this.notify();
}
}
}
------------------------
public
class
Test
extends
Thread
{
public
static
void
main(String[]
args)
{
Buffer
bf=new
Buffer(10,0,0);
Semaphore
mutex=new
Semaphore(1);
Semaphore
full=new
Semaphore(0);
Semaphore
empty=new
Semaphore(10);
//new
Thread(new
Producer("p001",mutex,full,empty,bf)).start();
Producer
p=new
Producer("p001",mutex,full,empty,bf);
new
Thread(new
Producer("p002",mutex,full,empty,bf)).start();
new
Thread(new
Producer("p003",mutex,full,empty,bf)).start();
new
Thread(new
Producer("p004",mutex,full,empty,bf)).start();
new
Thread(new
Producer("p005",mutex,full,empty,bf)).start();
try{
sleep(3000);
}
catch(Exception
ex)
{
ex.printStackTrace();
}
new
Thread(new
Customer("c001",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c002",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c003",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c004",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c005",mutex,full,empty,bf)).start();
}
}
--------------------------------------------