java多线程消费者.docx
《java多线程消费者.docx》由会员分享,可在线阅读,更多相关《java多线程消费者.docx(11页珍藏版)》请在冰点文库上搜索。
![java多线程消费者.docx](https://file1.bingdoc.com/fileroot1/2023-5/2/9741307a-6ad2-419d-9dce-0297d1024460/9741307a-6ad2-419d-9dce-0297d10244601.gif)
java多线程消费者
java多线程消费者-生产者
java多线程一般都会讲消费者-生产者模型
生产者与消费者模型中,要保证以下几点:
1同一时间内只能有一个生产者生产
2同一时间内只能有一个消费者消费
3生产者生产的同时消费者不能消费
4消息队列满时生产者不能继续生产
5消息队列空时消费者不能继续消费
参考了下网上一个代码实例 发现作者写得有问题修改了一些代码 现在ok了
----------------------------------------------------Message类
packagecom.example.test;
publicclassMessage{
publicstaticintid;
publicStringcontent;
publicStringgetContent(){
returncontent;
}
publicvoidsetContent(Stringcontent){
this.content=content;
}
publicintgetId(){
returnid;
}
publicvoidsetId(intid){
Message.id=id;
}
}
----------------------------------------------------Queue类
packagecom.example.test;
importjava.util.ArrayList;
importjava.util.List;
publicclassQueue{
Listqueue=newArrayList();
/**队列中message对象的最大值,默认为5*/
intmaxMessageNum=5;
publicsynchronizedvoidproduce(Messagemessage){
this.notifyAll();
while(queue.size()==maxMessageNum){
System.out.println(Thread.currentThread().getName()
+" 队列满!
等待中。
。
。
");
try{
this.wait();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
queue.add(message);
System.out.println(Thread.currentThread().getName()+"正在生产"
+message.getContent()+"。
。
。
,当前个数:
"+getCount());
}
publicsynchronizedvoidconsume(){
this.notifyAll();
while(queue.size()==0){
System.out.println(Thread.currentThread().getName()
+" 队列空!
等待中。
。
。
");
try{
System.out.println("begin!
");
wait();
System.out.println("end!
");
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
Messagemessage=queue.get(0);
queue.remove(0);
System.out.println(Thread.currentThread().getName()+"正在消费"
+message.getContent()+"。
。
。
当前个数:
"+getCount());
}
publicsynchronizedintgetCount(){
returnqueue.size();
}
}
----------------------------------------------------Test类
packagecom.example.test;
publicclassTest{
publicstaticvoidmain(String[]args){
QueueQ=newQueue();
ProducerwQ1=newProducer(Q);
ProducerwQ2=newProducer(Q);
ConsumerrQ1=newConsumer(Q);
ConsumerrQ2=newConsumer(Q);
ConsumerrQ3=newConsumer(Q);
ThreadthreadWQ1=newThread(wQ1,"thread-wQ1");
ThreadthreadWQ2=newThread(wQ2,"thread-wQ2");
ThreadthreadRQ1=newThread(rQ1,"thread-rQ1");
ThreadthreadRQ2=newThread(rQ2,"thread-rQ2");
ThreadthreadRQ3=newThread(rQ3,"thread-rQ3");
threadWQ1.start();
threadWQ2.start();
threadRQ1.start();
threadRQ2.start();
threadRQ3.start();
}
}
classProducerextendsThread{
privateQueuequeue;
Producer(Queuequeue){
this.queue=queue;
}
publicvoidrun(){
while(true){
Messagemessage=newMessage();
message.setId(++Message.id);
message.setContent("food"+Message.id);
queue.produce(message);
try{
sleep(100);
}catch(Exceptione){
}
}
}
}
classConsumerextendsThread{
privateQueuequeue;
Consumer(Queuequeue){
this.queue=queue;
}
publicvoidrun(){
while(true){
queue.consume();
try{
sleep(100);
}catch(Exceptione){
}
}
}
}
为什么在consume方法和produce方法开始的时候要调用this.notifyAll();这个应该是生产者在生产完产品后调用通知其他线程,同样消费者在消费完产品后也要调用this.notifyAll();方法来通知其他线程,为什么一上来调用它呢?
我认为在consume和produce的结尾分别获得producer和consumer的锁,然后notify它们。
在方法一开始唤醒不知道有什么意义。
java多线程一般都会讲消费者-生产者模型
生产者与消费者模型中,要保证以下几点:
1同一时间内只能有一个生产者生产
2同一时间内只能有一个消费者消费
3生产者生产的同时消费者不能消费
4消息队列满时生产者不能继续生产
5消息队列空时消费者不能继续消费
---------------------------------------------
除了4和5,其他都不是必要条件.实际上,更多的生产者消费者应用都不遵循1,2,3.
问下各位,“ publicsynchronizedvoidproduce(Messagemessage){”
该方法锁定的资源是“message”还是“queue”?
下面的是我写的:
importjava.util.ArrayList;
importjava.util.List;
publicclassProductQueue{
privateListproducts;
privateintmaxSize;
publicProductQueue(intmaxSize){
this.maxSize=maxSize;
products=newArrayList(maxSize);
}
publicsynchronizedvoidaddProdcut(Productproduct){
while(isFull()){
try{
wait();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
products.add(product);
System.out.println("Produce:
"+product.getId()+""+product.getMadeDate());
notifyAll();
}
publicsynchronizedvoidremoveProduct(){
while(isEmpty()){
try{
wait();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
Productproduct=products.get(products.size()-1);
products.remove(product);
System.out.println("Consume:
"+product.getId()+""+product.getMadeDate());
notifyAll();
}
publicsynchronizedbooleanisFull(){
booleanisFull= products.size()==maxSize;
if(isFull){
System.out.println("Thequeueisfull.");
}
returnisFull;
}
publicsynchronizedbooleanisEmpty(){
booleanisEmpty=products.size()<=0;
if(isEmpty){
System.out.println("Thequeueisempty.");
}
returnisEmpty;
}
}
publicclassConsumerimplementsRunnable{
privateProductQueuequeue;
publicConsumer(ProductQueuequeue){
this.queue=queue;
}
@Override
publicvoidrun(){
while(true){
queue.removeProduct();
try{
Thread.sleep(100l);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
}
importjava.util.Date;
publicclassProducerimplementsRunnable{
privatestaticintid;
privateProductQueuequeue;
publicProducer(ProductQueuequeue){
this.queue=queue;
}
@Override
publicvoidrun(){
while(true){
Productproduct=newProduct(id++,newDate());
queue.addProdcut(product);
try{
Thread.sleep(100);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
}
importjava.util.Date;
publicclassProduct{
privateintid;
privateDatemadeDate;
publicProduct(intid,DatemadeDate){
this.id=id;
this.madeDate=madeDate;
}
publicintgetId(){
returnid;
}
publicvoidsetId(intid){
this.id=id;
}
publicDategetMadeDate(){
returnmadeDate;
}
publicvoidsetMadeDate(DatemadeDate){
this.madeDate=madeDate;
}
}
publicclassThreadMain{
publicstaticvoidmain(String[]args){
ProductQueuequeue=newProductQueue(50);
for(inti=0;i<10;i++){
Producerproducer=newProducer(queue);
newThread(producer).start();
}
for(inti=0;i<10;i++){
Consumerconsumer=newConsumer(queue);
newThread(consumer).start();
}
}
}