Java多线程设计模式之线程池模式.docx

上传人:b****1 文档编号:346733 上传时间:2023-04-29 格式:DOCX 页数:12 大小:19.23KB
下载 相关 举报
Java多线程设计模式之线程池模式.docx_第1页
第1页 / 共12页
Java多线程设计模式之线程池模式.docx_第2页
第2页 / 共12页
Java多线程设计模式之线程池模式.docx_第3页
第3页 / 共12页
Java多线程设计模式之线程池模式.docx_第4页
第4页 / 共12页
Java多线程设计模式之线程池模式.docx_第5页
第5页 / 共12页
Java多线程设计模式之线程池模式.docx_第6页
第6页 / 共12页
Java多线程设计模式之线程池模式.docx_第7页
第7页 / 共12页
Java多线程设计模式之线程池模式.docx_第8页
第8页 / 共12页
Java多线程设计模式之线程池模式.docx_第9页
第9页 / 共12页
Java多线程设计模式之线程池模式.docx_第10页
第10页 / 共12页
Java多线程设计模式之线程池模式.docx_第11页
第11页 / 共12页
Java多线程设计模式之线程池模式.docx_第12页
第12页 / 共12页
亲,该文档总共12页,全部预览完了,如果喜欢就下载吧!
下载资源
资源描述

Java多线程设计模式之线程池模式.docx

《Java多线程设计模式之线程池模式.docx》由会员分享,可在线阅读,更多相关《Java多线程设计模式之线程池模式.docx(12页珍藏版)》请在冰点文库上搜索。

Java多线程设计模式之线程池模式.docx

Java多线程设计模式之线程池模式

Java多线程设计模式之线程池模式

  前序:

  Thread-Per-MessagePattern,是一种对于每个命令或请求,都分配一个线程,由这个线程执行工作。

它将“委托消息的一端”和“执行消息的一端”用两个不同的线程来实现。

该线程模式主要包括三个部分:

  1,Request参与者(委托人,也就是消息发送端或者命令请求端

  2,Host参与者,接受消息的请求,负责为每个消息分配一个工作线程。

  3,Worker参与者,具体执行Request参与者的任务的线程,由Host参与者来启动。

  由于常规调用一个方法后,必须等待该方法完全执行完毕后才能继续执行下一步操作,而利用线程后,就不必等待具体任务执行完毕,就可以马上返回继续执行下一步操作。

  背景:

  由于在Thread-Per-MessagePattern中对于每一个请求都会生成启动一个线程,而线程的启动是很花费时间的工作,所以鉴于此,提出了WorkerThread,重复利用已经启动的线程。

  线程池:

  WorkerThread,也称为工人线程或背景线程,不过一般都称为线程池。

该模式主要在于,事先启动一定数目的工作线程。

当没有请求工作的时候,所有的工人线程都会等待新的请求过来,一旦有工作到达,就马上从线程池中唤醒某个线程来执行任务,执行完毕后继续在线程池中等待任务池的工作请求的到达。

  任务池:

主要是存储接受请求的集合,利用它可以缓冲接受到的请求,可以设置大小来表示同时能够接受最大请求数目。

这个任务池主要是供线程池来访问。

  线程池:

这个是工作线程所在的集合,可以通过设置它的大小来提供并发处理的工作量。

对于线程池的大小,可以事先生成一定数目的线程,根据实际情况来动态增加或者减少线程数目。

线程池的大小不是越大越好,线程的切换也会耗时的。

  存放池的数据结构,可以用数组也可以利用集合,在集合类中一般使用Vector,这个是线程安全的。

  WorkerThread的所有参与者:

  1,Client参与者,发送Request的参与者

  2,Channel参与者,负责缓存Request的请求,初始化启动线程,分配工作线程

  3,Worker参与者,具体执行Request的工作线程

  4,Request参与者

  注意:

将在Worker线程内部等待任务池非空的方式称为正向等待。

  将在Channel线程提供Worker线程来判断任务池非空的方式称为反向等待。

  线程池实例1:

  利用同步方法来实现,使用数组来作为任务池的存放数据结构。

在Channel有缓存请求方法和处理请求方法,利用生成者与消费者模式来处理存储请求,利用反向等待来判断任务池的非空状态。

  Channel参与者:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

package whut.threadpool;

//用到了生产者与消费者模式

//生成线程池,接受客户端线程的请求,找到一个工作线程分配该客户端请求

public class Channel{

    private static final int MAX_REQUEST= 100;//并发数目,就是同时可以接受多少个客户端请求

    //利用数组来存放请求,每次从数组末尾添加请求,从开头移除请求来处理

    private final Request[]requestQueue;//存储接受客户线程的数目

    private int tail;//下一次存放Request的位置

    private int head;//下一次获取Request的位置

    private int count;//当前request数量

    private final WorkerThread[]threadPool;//存储线程池中的工作线程

    //运用数组来存储

    public Channel(int threads{

        this.requestQueue= new Request[MAX_REQUEST];

        this.head= 0;

        this.head= 0;

        this.count= 0;

        threadPool= new WorkerThread[threads];

        //启动工作线程

        for (int i= 0;i

            threadPool[i]= new WorkerThread("Worker-" +i, this;

        }

    }

    public void startWorkers({

        for (int i= 0;i

            threadPool[i].start(;

        }

    }

    //接受客户端请求线程

    public synchronized void putRequest(Requestrequest{

        //当Request的数量大于或等于同时接受的数目时候,要等待

        while (count>=requestQueue.length

            try {

                wait(;

            } catch (InterruptedExceptione{

            }

        requestQueue[tail]=request;

        tail=(tail+ 1%requestQueue.length;

        count++;

        notifyAll(;

    }

    //处理客户端请求线程

    public synchronized RequesttakeRequest({

        while (count<= 0

            try {

                wait(;

            } catch (InterruptedExceptione{

            }

        Requestrequest=requestQueue[head];

        head=(head+ 1%requestQueue.length;

        count--;

        notifyAll(;

        return request;

    }

}

  客户端请求线程:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

package whut.threadpool;

import java.util.Random;

//向Channel发送Request请求的

public class ClientThread extends Thread{

    private final Channelchannel;

    private static final Randomrandom=new Random(;

                                                               

    public ClientThread(Stringname,Channelchannel

    {

        super(name;

        this.channel=channel;

    }

    public void run(

    {

        try{

            for(int i=0;true;i++

            {

                Requestrequest=new Request(getName(,i;

                channel.putRequest(request;

                Thread.sleep(random.nextInt(1000;

            }

        }catch(InterruptedExceptione

        {

        }

    }

}

  工作线程:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

package whut.threadpool;

//具体工作线程

public class WorkerThread extends Thread{

                                                      

    private final Channelchannel;

    public WorkerThread(Stringname,Channelchannel

    {

      super(name;

      this.channel=channel;

    }

                                                      

    public void run(

    {

        while(true

        {

            Requestrequest=channel.takeRequest(;

            request.execute(;

        }

    }

}

  线程池实例2:

  工作线程:

  利用同步块来处理,利用Vector来存储客户端请求。

在Channel有缓存请求方法和处理请求方法,利用生成者与消费者模式来处理存储请求,利用正向等待来判断任务池的非空状态。

  这种实例,可以借鉴到网络ServerSocket处理用户请求的模式中,有很好的扩展性与实用性。

  利用Vector来存储,依旧是每次集合的最后一个位置添加请求,从开始位置移除请求来处理。

  Channel参与者:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

package whut.threadpool2;

import java.util.Vector;

/*

 *这个主要的作用如下

 *0,缓冲客户请求线程(利用生产者与消费者模式

 *1,存储客户端请求的线程

 *2,初始化启动一定数量的线程

 *3,主动来唤醒处于任务池中waitset的一些线程来执行任务

 */

public class Channel{

    public final static int THREAD_COUNT=4;

    public static void main(String[]args{

      //定义两个集合,一个是存放客户端请求的,利用Vector,

      //一个是存储线程的,就是线程池中的线程数目

                             

      //Vector是线程安全的,它实现了Collection和List

      //Vector类可以实现可增长的对象数组。

与数组一样,

      //它包含可以使用整数索引进行访问的组件。

但Vector的大小可以根据需要增大或缩小,

      //以适应创建Vector后进行添加或移除项的操作。

      //Collection中主要包括了list相关的集合以及set相关的集合,Queue相关的集合

      //注意:

Map不是Collection的子类,都是java.util.*下的同级包

      Vectorpool=new Vector(;

      //工作线程,初始分配一定限额的数目

      WorkerThread[]workers=new WorkerThread[THREAD_COUNT];

                          

      //初始化启动工作线程

      for(int i=0;i

      {

          workers[i]=new WorkerThread(pool;

          workers[i].start(;

      }

                           

      //接受新的任务,并且将其存储在Vector中

      Objecttask=new Object(;//模拟的任务实体类

      //此处省略具体工作

      //在网络编程中,这里就是利用ServerSocket来利用ServerSocket.accept接受一个Socket从而唤醒线程

                           

      //当有具体的请求达到

      synchronized(pool

      {

          pool.add(pool.size(,task;

          pool.notifyAll(;//通知所有在poolwaitset中等待的线程,唤醒一个线程进行处理

      }

      //注意上面这步骤添加任务池请求,以及通知线程,都可以放在工作线程内部实现

      //只需要定义该方法为static,在方法体用同步块,且共享的线程池也是static即可

                           

      //下面这步,可以有可以没有根据实际情况

      //取消等待的线程

      for(int i=0;i

      {

          workers[i].interrupt(;

      }

    }

}

   工作线程:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

package whut.threadpool2;

import java.util.List;

public class WorkerThread extends Thread{

    private Listpool;//任务请求池

    private static int fileCompressed=0;//所有实例共享的

                     

    public WorkerThread(Listpool

    {

          this.pool=pool; 

    }

                     

    //利用静态synchronized来作为整个synchronized类方法,仅能同时一个操作该类的这个方法

    private static synchronized void incrementFilesCompressed(

    {

        fileCompressed++;

    }

                     

    public void run(

    {

        //保证无限循环等待中

        while(true

        {

            //共享互斥来访问pool变量

            synchronized(pool

            {

                //利用多线程设计模式中的

                //GuardedSuspensionPattern,警戒条件为pool不为空,否则无限的等待中

                while(pool.isEmpty(

                {

                    try{

                        pool.wait(;//进入pool的waitset中等待着,释放了pool的锁

                    }catch(InterruptedExceptione

                    {

                    }

                }

                //当线程被唤醒,需要重新获取pool的锁,

                //再次继续执行synchronized代码块中其余的工作

                //当不为空的时候,继续再判断是否为空,如果不为空,则跳出循环

                //必须先从任务池中移除一个任务来执行,统一用从末尾添加,从开始处移除

                                 

                pool.remove(0;//获取任务池中的任务,并且要进行转换

            }

            //下面是线程所要处理的具体工作

        }

    }

}

展开阅读全文
相关资源
猜你喜欢
相关搜索
资源标签

当前位置:首页 > 初中教育 > 语文

copyright@ 2008-2023 冰点文库 网站版权所有

经营许可证编号:鄂ICP备19020893号-2