先贴一段实际应用代码,应用场景是一个基于http请求拦截的用户行为分析数据录入片段

package com.howbuy.coop.interceptor;import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.ModelAndView;import com.alibaba.dubbo.config.annotation.Reference;
import com.howbuy.coop.constant.MixAll;
import com.howbuy.mring.base.common.model.BehaviorMapping;
import com.howbuy.mring.base.common.model.BehaviorRecord;
import com.howbuy.mring.base.facade.behavior.BehaviorFacade;
import com.howbuy.mring.base.facade.common.BasicResponse;
import com.howbuy.mring.base.facade.common.ListObjectResponse;import httl.util.StringUtils;public class BehaviorInterceptor implements HandlerInterceptor {private static Logger logger = LoggerFactory.getLogger(BehaviorInterceptor.class);@ReferenceBehaviorFacade behaviorFacade;@Value("${systemName}")private String systemName;private static Map<String, Integer> ruleMap = new HashMap<String, Integer>();// 线程池private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 2000, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(100));@PostConstructpublic void initialize() {// logger.info("loading user behavior rules from db");if (ruleMap.isEmpty()) {ListObjectResponse<BehaviorMapping> rules = behaviorFacade.getBehaviorRules(systemName);if (rules != null && rules.getList() != null ) {List<BehaviorMapping> mappings = rules.getList();for(BehaviorMapping rule : mappings) {if ("0".equals(rule.getFlag())) {ruleMap.put(rule.getSystem() + rule.getUri(), rule.getId());}}}}}public void loadBehaviorRule() {// logger.info("according to schedule loading user behavior rules from// db");ListObjectResponse<BehaviorMapping> rules = behaviorFacade.getBehaviorRules(systemName);if (rules != null && rules.getList() != null ) {List<BehaviorMapping> mappings = rules.getList();for(BehaviorMapping rule : mappings) {if ("0".equals(rule.getFlag())) {ruleMap.put(rule.getSystem() + rule.getUri(), rule.getId());} else {ruleMap.remove((rule.getSystem() + rule.getUri()));}}}}@Overridepublic boolean preHandle(final HttpServletRequest arg0, HttpServletResponse arg1, Object arg2) throws Exception {executor.execute(new Thread() {public void run() {try {Integer actionID = ruleMap.get(systemName + arg0.getRequestURI());if (actionID == null) {ListObjectResponse<BehaviorMapping> rules = behaviorFacade.getBehaviorRules(systemName);if (rules != null && rules.getList() != null) {List<String> arrayList = new ArrayList<String>();List<BehaviorMapping> mappings = rules.getList();for (BehaviorMapping rule : mappings) {arrayList.add(rule.getSystem() + rule.getUri());}if (arrayList.indexOf(systemName + arg0.getRequestURI()) < 0) {BehaviorMapping behaviorMapping = new BehaviorMapping();behaviorMapping.setSystem(systemName);behaviorMapping.setUri(arg0.getRequestURI());behaviorMapping.setFlag("1");behaviorFacade.saveMapping(behaviorMapping);}}}} catch (Exception e) {logger.info(e.getMessage());}}});arg0.setAttribute("startTime", new Timestamp(System.currentTimeMillis()));return true;}@Overridepublic void postHandle(HttpServletRequest arg0, HttpServletResponse arg1, Object arg2, ModelAndView arg3)throws Exception {// Auto-generated method stub// logger.info("this is http request post handle");}@Overridepublic void afterCompletion(HttpServletRequest arg0, HttpServletResponse arg1, Object arg2, Exception arg3)throws Exception {Integer actionID = ruleMap.get(systemName + arg0.getRequestURI());if (actionID != null && actionID > 0) {Timestamp startTime = (Timestamp) arg0.getAttribute("startTime");final BehaviorRecord br = new BehaviorRecord();br.setActionID(actionID);br.setStartTime(startTime);if (arg0.getRequestURI().contains("loginAction")) {Map<String, String[]> parameterMap = arg0.getParameterMap();br.setOpsUser(parameterMap.get("username")[0].toString());} else {if (arg0.getSession().getAttribute(MixAll.USER_SESSION_FLAG) != null) {br.setOpsUser(arg0.getSession().getAttribute(MixAll.USER_SESSION_FLAG).toString());} else {br.setOpsUser("mring");}}br.setTerminal(check(arg0));br.setTerminalIp(getIpInfo(arg0));br.setStatus("operating");br.setActionID(actionID);br.setEndTime(new Timestamp(System.currentTimeMillis()));long duration = br.getEndTime().getTime() - startTime.getTime();br.setDuration(String.valueOf(duration));if (arg3 == null) {br.setStatus("success");} else {br.setStatus("failure");}executor.execute(new Thread() {public void run() {try {BasicResponse resp = behaviorFacade.save(br);logger.info(com.alibaba.fastjson.JSONObject.toJSONString(br));if (resp.getId() != null && resp.getId() > 0) {logger.info("this is behavior save OK");} else {logger.info("this is behavior save error");}} catch (Exception e) {logger.info(e.getMessage());}}});}}/*** 判断是电脑访问还是手机访问* * @param request* @return*/public String check(HttpServletRequest request) {boolean isMoblie = false;String[] mobileAgents = { "iphone", "android", "phone", "mobile", "wap", "netfront", "java", "opera mobi","opera mini", "ucweb", "windows ce", "symbian", "series", "webos", "sony", "blackberry", "dopod","nokia", "samsung", "palmsource", "xda", "pieplus", "meizu", "midp", "cldc", "motorola", "foma","docomo", "up.browser", "up.link", "blazer", "helio", "hosin", "huawei", "novarra", "coolpad", "webos","techfaith", "palmsource", "alcatel", "amoi", "ktouch", "nexian", "ericsson", "philips", "sagem","wellcom", "bunjalloo", "maui", "smartphone", "iemobile", "spice", "bird", "zte-", "longcos", "pantech","gionee", "portalmmm", "jig browser", "hiptop", "benq", "haier", "^lct", "320x320", "240x320","176x220", "w3c ", "acs-", "alav", "alca", "amoi", "audi", "avan", "benq", "bird", "blac", "blaz","brew", "cell", "cldc", "cmd-", "dang", "doco", "eric", "hipt", "inno", "ipaq", "java", "jigs", "kddi","keji", "leno", "lg-c", "lg-d", "lg-g", "lge-", "maui", "maxo", "midp", "mits", "mmef", "mobi", "mot-","moto", "mwbp", "nec-", "newt", "noki", "oper", "palm", "pana", "pant", "phil", "play", "port", "prox","qwap", "sage", "sams", "sany", "sch-", "sec-", "send", "seri", "sgh-", "shar", "sie-", "siem", "smal","smar", "sony", "sph-", "symb", "t-mo", "teli", "tim-", /* "tosh", */ "tsm-", "upg1", "upsi", "vk-v","voda", "wap-", "wapa", "wapi", "wapp", "wapr", "webc", "winw", "winw", "xda", "xda-","Googlebot-Mobile" };if (request.getHeader("User-Agent") != null) {for (String mobileAgent : mobileAgents) {if (request.getHeader("User-Agent").toLowerCase().indexOf(mobileAgent) >= 0) {isMoblie = true;break;}}}if (isMoblie) {return "PHONE";}return "PC";}/*** 获取用户真实IP地址,不使用request.getRemoteAddr()的原因是有可能用户使用了代理软件方式避免真实IP地址,* 可是,如果通过了多级反向代理的话,X-Forwarded-For的值并不止一个,而是一串IP值* * @return ip*/public String getIpInfo(HttpServletRequest request) {String ip = request.getHeader("X-Forwarded-For");if (StringUtils.isNotEmpty(ip) && !"unKnown".equalsIgnoreCase(ip)) {// 多次反向代理后会有多个ip值,第一个ip才是真实ipint index = ip.indexOf(",");if (index != -1) {return ip.substring(0, index);} else {return ip;}}ip = request.getHeader("X-Real-IP");if (StringUtils.isNotEmpty(ip) && !"unKnown".equalsIgnoreCase(ip)) {return ip;}return request.getRemoteAddr();}}

1. ThreadPoolExecutor数据成员

1

Private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));

ctl主要用于存储线程池的工作状态以及池中正在运行的线程数。显然要在一个整型变量存储两个数据,只能将其一分为二。其中高3bit用于存储线程池的状态,低位的29bit用于存储正在运行的线程数。

线程池具有以下五种状态,当创建一个线程池时初始化状态为RUNNING

RUNNING

允许提交并处理任务

SHUTDOWN

不允许提交新的任务,但是会处理完已提交的任务

STOP

不允许提交新的任务,也不会处理阻塞队列中未执行的任务,并设置正在执行的线程的中断标志位

TIDYING

所有任务执行完毕,池中工作的线程数为0,等待执行terminated()勾子方法

TERMINATED

terminated()勾子方法执行完毕

注意,这里说的是线程池的状态而不是池中线程的状态。

调用线程池的shutdown方法,将线程池由RUNNING(运行状态)转换为SHUTDOWN状态。

调用线程池的shutdownNow方法,将线程池由RUNNING或SHUTDOWN状态转换为STOP状态。

SHUTDOWN状态和STOP状态先会转变为TIDYING状态,最终都会变为TERMINATED

1

2

3

Private static int runStateOf(int c)

Private static int workerCountOf(int c)

Private static int ctlOf(int rs,int wc)

ThreadPoolExecutor同时提供上述三个方法用于池中的线程查看线程池的状态和计算正在运行的线程数。

1

2

3

4

5

6

7

Private int largestPoolSize;

Private final BlockingQueue<Runnable>workQueue;

Private volatile long keepAliveTime;

private volatile int corePoolSize;

private volatile int maximumPoolSize;

private volatile ThreadFactory threadFactory;

private volatile RejectedExecutionHandler handler;

上述数据成员对线程池的性能也有很大的影响,我会将它们放到构造中讲解。

1

2

3

4

Privatefinal HashSet<Worker> workers= new HashSet<Worker>();

Privatelong completedTaskCount;

Private volatile boolean allowCoreThreadTimeOut;

private int largestPoolSize;

completedTaskCount表示线程池已完成的任务数。

allowCoreThreadTimeeOut表示是否允许核心线程在空闲状态下自行销毁。

largestPoolSize 表示线程池从创建到现在,池中线程的最大数量

1

private final HashSet<Worker> workers = new HashSet<Worker>();

workers是个HashSet容器,它存储的是Worker类的对象,Worker是线程池的内部类,它继承了Runnable接口,不严格的情况下,可以将一个Worker对象看成Thread对象,也就是工作的线程。shutdown和shutdownNow方法中会使用workers完成对所有线程的遍历。

1

2

Privatefinal ReentrantLock mainLock =new ReentrantLock();

Privatefinal Condition termination = mainLock.newCondition();

mainLock主要用于同步访问(或者说改变)线程池的状态以及线程池的各项参数,比如completedTaskCount和workers等。

在awaitTermination方法中,(mianLock的)termination是用于延时的条件队列。

2. 构造函数

1

2

3

4

5

6

7

publicThreadPoolExecutor(intcorePoolSize,

        int maximumPoolSize,

        long keepAliveTime,

        TimeUnit unit,

        BlockingQueue<Runnable> workQueue,

        ThreadFactory threadFactory,

        RejectedExecutionHandler handler)

线程池的构造函数参数多达7个,现在我们一一来分析它们对线程池的影响。

corePoolSize:线程池中核心线程数的最大值

maximumPoolSize:线程池中能拥有最多线程数

workQueue:用于缓存任务的阻塞队列

我们现在通过向线程池添加新的任务来说明着三者之间的关系。

(1)如果没有空闲的线程执行该任务且当前运行的线程数少于corePoolSize,则添加新的线程执行该任务。

(2)如果没有空闲的线程执行该任务且当前的线程数等于corePoolSize同时阻塞队列未满,则将任务入队列,而不添加新的线程。

(3)如果没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数小于maximumPoolSize,则创建新的线程执行任务。

(4)如果没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数等于maximumPoolSize,则根据构造函数中的handler指定的策略来拒绝新的任务。

注意,线程池并没有标记哪个线程是核心线程,哪个是非核心线程,线程池只关心核心线程的数量。

通俗解释,如果把线程池比作一个单位的话,corePoolSize就表示正式工,线程就可以表示一个员工。当我们向单位委派一项工作时,如果单位发现正式工还没招满,单位就会招个正式工来完成这项工作。随着我们向这个单位委派的工作增多,即使正式工全部满了,工作还是干不完,那么单位只能按照我们新委派的工作按先后顺序将它们找个地方搁置起来,这个地方就是workQueue,等正式工完成了手上的工作,就到这里来取新的任务。如果不巧,年末了,各个部门都向这个单位委派任务,导致workQueue已经没有空位置放新的任务,于是单位决定招点临时工吧(临时工:又是我!)。临时工也不是想招多少就找多少,上级部门通过这个单位的maximumPoolSize确定了你这个单位的人数的最大值,换句话说最多招maximumPoolSize–corePoolSize个临时工。当然,在线程池中,谁是正式工,谁是临时工是没有区别,完全同工同酬。

keepAliveTime:表示空闲线程的存活时间。

TimeUnitunit:表示keepAliveTime的单位。

为了解释keepAliveTime的作用,我们在上述情况下做一种假设。假设线程池这个单位已经招了些临时工,但新任务没有继续增加,所以随着每个员工忙完手头的工作,都来workQueue领取新的任务(看看这个单位的员工多自觉啊)。随着各个员工齐心协力,任务越来越少,员工数没变,那么就必定有闲着没事干的员工。这样的话领导不乐意啦,但是又不能轻易fire没事干的员工,因为随时可能有新任务来,于是领导想了个办法,设定了keepAliveTime,当空闲的员工在keepAliveTime这段时间还没有找到事情干,就被辞退啦,毕竟地主家也没有余粮啊!当然辞退到corePoolSize个员工时就不再辞退了,领导也不想当光杆司令啊!

handler:表示当workQueue已满,且池中的线程数达到maximumPoolSize时,线程池拒绝添加新任务时采取的策略。

为了解释handler的作用,我们在上述情况下做另一种假设。假设线程池这个单位招满临时工,但新任务依然继续增加,线程池从上到下,从里到外真心忙的不可开交,阻塞队列也满了,只好拒绝上级委派下来的任务。怎么拒绝是门艺术,handler一般可以采取以下四种取值。

ThreadPoolExecutor.AbortPolicy()

抛出RejectedExecutionException异常

ThreadPoolExecutor.CallerRunsPolicy()

由向线程池提交任务的线程来执行该任务

ThreadPoolExecutor.DiscardOldestPolicy()

抛弃最旧的任务(最先提交而没有得到执行的任务)

ThreadPoolExecutor.DiscardPolicy()

抛弃当前的任务

workQueue:它决定了缓存任务的排队策略。对于不同的应用场景我们可能会采取不同的排队策略,这就需要不同类型的阻塞队列,在线程池中常用的阻塞队列有以下2种:

(1)SynchronousQueue<Runnable>:此队列中不缓存任何一个任务。向线程池提交任务时,如果没有空闲线程来运行任务,则入列操作会阻塞。当有线程来获取任务时,出列操作会唤醒执行入列操作的线程。从这个特性来看,SynchronousQueue是一个无界队列,因此当使用SynchronousQueue作为线程池的阻塞队列时,参数maximumPoolSizes没有任何作用。

(2)LinkedBlockingQueue<Runnable>:顾名思义是用链表实现的队列,可以是有界的,也可以是无界的,但在Executors中默认使用无界的。

threadFactory:指定创建线程的工厂

实际上ThreadPoolExecutor类中还有很多重载的构造函数,下面这个构造函数在Executors中经常用到。

1

2

3

4

5

6

7

8

public ThreadPoolExecutor(int corePoolSize,

        int maximumPoolSize,

        long keepAliveTime,

        TimeUnit unit,

        BlockingQueue<Runnable> workQueue) {

    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

    Executors.defaultThreadFactory(), defaultHandler);

}

注意到上述的构造方法使用Executors中的defaultThreadFactory()线程工厂和ThreadPoolExecutor中的defaultHandler抛弃策略。

使用defaultThreadFactory创建的线程同属于相同的线程组,具有同为Thread.NORM_PRIORITY的优先级,以及名为"pool-XXX-thread-"的线程名(XXX为创建线程时顺序序号),且创建的线程都是非守护进程。

defaultHandler缺省抛弃策是ThreadPoolExecutor.AbortPolicy()。

除了在创建线程池时指定上述参数的值外,还可在线程池创建以后通过如下方法进行设置。

1

2

3

4

5

6

Public void allowCoreThreadTimeOut(boolean value)

Public void setKeepAliveTime(long time,TimeUnit unit)

Public void setMaximumPoolSize(int maximumPoolSize)

Public void setCorePoolSize(int corePoolSize)

Public void setThreadFactory(ThreadFactory threadFactory)

Public void setRejectedExecutionHandler(RejectedExecutionHandler handler)

3. 其它有关涉及池中线程数量的相关方法

1

2

public void allowCoreThreadTimeOut(boolean value)

public int prestartAllCoreThreads()

默认情况下,当池中有空闲线程,且线程的数量大于corePoolSize时,空闲时间超过keepAliveTime的线程会自行销毁,池中仅仅会保留corePoolSize个线程。如果线程池中调用了allowCoreThreadTimeOut这个方法,则空闲时间超过keepAliveTime的线程全部都会自行销毁,而不必理会corePoolSize这个参数。

如果池中的线程数量小于corePoolSize时,调用prestartAllCoreThreads方法,则无论是否有待执行的任务,线程池都会创建新的线程,直到池中线程数量达到corePoolSize。

4. Executors中的线程池的工厂方法

为了防止使用者错误搭配ThreadPoolExecutor构造函数的各个参数以及更加方便简洁的创建ThreadPoolExecutor对象,JavaSE中又定义了Executors类,Eexcutors类提供了创建常用配置线程池的方法。以下是Executors常用的三个创建线程池的源代码。

从源码中可以看出,Executors间接的调用了重载的ThreadPoolExecutor构造函数,并帮助用户根据不同的应用场景,配置不同的参数。

1

2

3

4

5

public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                  60L, TimeUnit.SECONDS,

                                  new SynchronousQueue<Runnable>());

}

newCachedThreadPool:使用SynchronousQueue作为阻塞队列,队列无界,线程的空闲时限为60秒。这种类型的线程池非常适用IO密集的服务,因为IO请求具有密集、数量巨大、不持续、服务器端CPU等待IO响应时间长的特点。服务器端为了能提高CPU的使用率就应该为每个IO请求都创建一个线程,以免CPU因为等待IO响应而空闲。

1

2

3

4

5

public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads,

                                  0L, TimeUnit.MILLISECONDS,

                                  new LinkedBlockingQueue<Runnable>());

}

newFixedThreadPool:需指定核心线程数,核心线程数和最大线程数相同,使用LinkedBlockingQueue 作为阻塞队列,队列无界,线程空闲时间0秒。这种类型的线程池可以适用CPU密集的工作,在这种工作中CPU忙于计算而很少空闲,由于CPU能真正并发的执行的线程数是一定的(比如四核八线程),所以对于那些需要CPU进行大量计算的线程,创建的线程数超过CPU能够真正并发执行的线程数就没有太大的意义。

1

2

3

4

5

6

public static ExecutorService newSingleThreadExecutor() {

    return new FinalizableDelegatedExecutorService

        (new ThreadPoolExecutor(1, 1,

                                0L, TimeUnit.MILLISECONDS,

                                new LinkedBlockingQueue<Runnable>()));

}

newSingleThreadExecutor:池中只有一个线程工作,阻塞队列无界,它能保证按照任务提交的顺序来执行任务。

5. 任务的提交过程

submit方法源码

1

2

3

4

5

6

7

8

9

10

11

12

13

public Future<?> submit(Runnable task) {

    if (task == null) throw new NullPointerException();

    RunnableFuture<Void> ftask = newTaskFor(task, null);

    execute(ftask);

    return ftask;

}

public <T> Future<T> submit(Callable<T> task) {

    if (task == null) throw new NullPointerException();

    RunnableFuture<T> ftask = newTaskFor(task);

    execute(ftask);

    return ftask;

}

submit的实现方法位于抽象类AbstractExecutorService中,而此时execute方法还未实现(而是在AbstractExecutorService的继承类ThreadPoolExecutor中实现)。submit有三种重载方法,这里我选取了两个常用的进行分析,可以看出无论哪个submit方法都最终调用了execute方法。

execute方法源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

public void execute(Runnable command) {

    if (command == null)

        throw new NullPointerException();

    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) {

        if (addWorker(command, true))

            return;

        c = ctl.get();

    }

    

    if (isRunning(c) && workQueue.offer(command)) {

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);

    }

    else if (!addWorker(command, false))

        reject(command);

}

由于execute方法中多次调用addWorker,我们这里就简要介绍一下它,这个方法的主要作用就是创建一个线程来执行Runnnable对象。

1

addWorker(Runnable firstTask, boolean core)

第一个参数firstTask不为null,则创建的线程就会先执行firstTask对象,然后去阻塞队列中取任务,否直接到阻塞队列中获取任务来执行。第二个参数,core参数为真,则用corePoolSize作为池中线程数量的最大值;为假,则以maximumPoolSize作为池中线程数量的最大值。

简要分析一下execute源码,执行一个Runnable对象时,首先通过workerCountOf(c)获取线程池中线程的数量,如果池中的数量小于corePoolSize就调用addWorker添加一个线程来执行这个任务。否则通过workQueue.offer(command)方法入列。如果入列成功还需要在一次判断池中的线程数,因为我们创建线程池时可能要求核心线程数量为0,所以我们必须使用addWorker(null, false)来创建一个临时线程去阻塞队列中获取任务来执行。

isRunning(c) 的作用是判断线程池是否处于运行状态,如果入列后发现线程池已经关闭,则出列。不需要在入列前判断线程池的状态,因为判断一个线程池工作处于RUNNING状态到执行入列操作这段时间,线程池可能被其它线程关闭了,所以提前判断毫无意义。

addWorker源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

private boolean addWorker(Runnable firstTask, boolean core) {

    //这个两个for循环主要是判断能否增加一个线程,

    //外循环来判断线程池的状态

    //内循环主要是个增加线程数的CAS操作

    retry:

    for (;;) {

        int c = ctl.get();

        int rs = runStateOf(c);

        // Check if queue empty only if necessary.

        if (rs >= SHUTDOWN &&

            ! (rs == SHUTDOWN &&

               firstTask == null &&

               ! workQueue.isEmpty()))

            return false;

        for (;;) {

            int wc = workerCountOf(c);

            if (wc >= CAPACITY ||

                wc >= (core ? corePoolSize : maximumPoolSize))

                return false;

            if (compareAndIncrementWorkerCount(c))

                break retry;

            c = ctl.get();  // Re-read ctl

            if (runStateOf(c) != rs)

                continue retry;

            // 如果是因为线程数的改变导致CAS失败,只需要重复内循环

        }

    }

    boolean workerStarted = false;

    boolean workerAdded = false;

    Worker w = null;

    try {

        w = new Worker(firstTask);//创建线程

        final Thread t = w.thread;

        if (t != null) {

            final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try {

                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||

                    (rs == SHUTDOWN && firstTask == null)) {

                    if (t.isAlive()) // precheck that t is startable

                        throw new IllegalThreadStateException();

                    workers.add(w);

                    int s = workers.size();

                    if (s > largestPoolSize)

                        largestPoolSize = s;

                    workerAdded = true;

                }

            } finally {

                mainLock.unlock();

            }

            if (workerAdded) {

                t.start();//启动线程

                workerStarted = true;

            }

        }

    } finally {

        if (! workerStarted)

            addWorkerFailed(w);

    }

    return workerStarted;

}

6. 线程的执行过程

runWorker源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();

    Runnable task = w.firstTask;

    w.firstTask = null;

    w.unlock(); // allow interrupts

    boolean completedAbruptly = true;

    try {

        while (task != null || (task = getTask()) != null) {

            w.lock();

            // If pool is stopping, ensure thread is interrupted;

            // if not, ensure thread is not interrupted.  This

            // requires a recheck in second case to deal with

            // shutdownNow race while clearing interrupt

            if ((runStateAtLeast(ctl.get(), STOP) ||

                 (Thread.interrupted() &&

                  runStateAtLeast(ctl.get(), STOP))) &&

                !wt.isInterrupted())

                wt.interrupt();

            try {

                beforeExecute(wt, task);

                Throwable thrown = null;

                try {

                    task.run();

                } catch (RuntimeException x) {

                    thrown = x; throw x;

                } catch (Error x) {

                    thrown = x; throw x;

                } catch (Throwable x) {

                    thrown = x; throw new Error(x);

                } finally {

                    afterExecute(task, thrown);

                }

            } finally {

                task = null;

                w.completedTasks++;

                w.unlock();

            }

        }

        completedAbruptly = false;

    } finally {

        processWorkerExit(w, completedAbruptly);

    }

}

Thread的run方法实际上调用了Worker类的runWorker方法,而Worker类继承了AQS类,并实现了lock、unlock、trylock方法。但是这些方法不是真正意义上的锁,所以在代码中加锁操作和解锁操作没有成对出现。runWorker方法中获取到任务就“加锁”,完成任务后就“解锁”。也就是说在“加锁”到“解锁”的这段时间内,线程处于忙碌状态,而其它时间段,处于空闲状态。线程池就可以通过trylock方法来确定这个线程是否空闲。

getTask方法的主要作用是从阻塞队列中获取任务。

beforeExecute(wt, task)和afterExecute(task, thrown)是个钩子函数,如果我们需要在任务执行之前和任务执行以后进行一些操作,那么我们可以自定义一个继承ThreadPoolExecutor类,并覆盖这两个方法。

getTask源代码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

private Runnable getTask() {

    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {

        int c = ctl.get();

        int rs = runStateOf(c);

        // Check if queue empty only if necessary.

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

            decrementWorkerCount();

            return null;

        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))

            && (wc > 1 || workQueue.isEmpty())) {

            if (compareAndDecrementWorkerCount(c))

                return null;

            continue;

        }

        try {

            Runnable r = timed ?

                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                workQueue.take();

            if (r != null)

                return r;

            timedOut = true;

        } catch (InterruptedException retry) {

            timedOut = false;

        }

    }

}

可以看出如果允许线程在keepAliveTime时间内未获取到任务线程就销毁就调用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),否则会调用workQueue.take()方法(该方法即使获取不到任务就会一直阻塞下去)。而确定是否使用workQueue.poll方法只有两个条件决定,一个是当前池中的线程是否大于核心线程数量,第二个是是否允许核心线程销毁,两者其一满足就会调用该方法。

7. 线程池的关闭过程

shutdown源码

1

2

3

4

5

6

7

8

9

10

11

12

13

public void shutdown() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        checkShutdownAccess();

        advanceRunState(SHUTDOWN);

        interruptIdleWorkers();

        onShutdown(); // hook for ScheduledThreadPoolExecutor

    } finally {

        mainLock.unlock();

    }

    tryTerminate();

}

advanceRunState(SHUTDOWN)的作用是通过CAS操作将线程池的状态更改为SHUTDOWN状态。

interruptIdleWorkers是对空闲的线程进行中断,它实际上调用了重载带参数的函数interruptIdleWorkers(false)

onShutdown也是一个钩子函数

interruptIdleWorkers源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

private void interruptIdleWorkers(boolean onlyOne) {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        for (Worker w : workers) {

            Thread t = w.thread;

            if (!t.isInterrupted() && w.tryLock()) {

                try {

                    t.interrupt();

                } catch (SecurityException ignore) {

                } finally {

                    w.unlock();

                }

            }

            if (onlyOne)

                break;

        }

    } finally {

        mainLock.unlock();

    }

}

通过workers容器,遍历池中的线程,对每个线程进行tryLock()操作,如果成功说明线程空闲,则设置其中断标志位。而线程是否响应中断则由任务的编写者决定。

ThreadPoolExecutor线程池使用及参数详解相关推荐

  1. Java线程池七个参数详解

    java多线程开发时,常常用到线程池技术,这篇文章是对创建java线程池时的七个参数的详细解释. 从源码中可以看出,线程池的构造函数有7个参数,分别是corePoolSize.maximumPoolS ...

  2. Java线程池及配置参数详解

    一.线程池的优点 合理利用线程池能够带来三个好处. 第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗. 第二:提高响应速度.当任务到达时,任务可以不需要的等到线程创建就能立即执 ...

  3. Java线程池七个参数详解:核心线程数、最大线程数、空闲线程存活时间、时间单位、工作队列、线程工厂、拒绝策略

    源码简介 ThreadPoolExecutor是JDK中的线程池实现,这个类实现了一个线程池需要的各个方法,它提供了任务提交.线程管理.监控等方法. 下面是ThreadPoolExecutor类的构造 ...

  4. java 重启线程_java 可重启线程及线程池类的设计(详解)

    了解JAVA多线程编程的人都知道,要产生一个线程有两种方法,一是类直接继承Thread类并实现其run()方法:二是类实现Runnable接口并实现其run()方法,然后新建一个以该类为构造方法参数的 ...

  5. future java 原理_Java线程池FutureTask实现原理详解

    前言 线程池可以并发执行多个任务,有些时候,我们可能想要跟踪任务的执行结果,甚至在一定时间内,如果任务没有执行完成,我们可能还想要取消任务的执行,为了支持这一特性,ThreadPoolExecutor ...

  6. Java 线程池原理和队列详解

    转载请标明出处:http://blog.csdn.net/xx326664162/article/details/51701508 文章出自:薛瑄的博客 你也可以查看我的其他同类文章,也会让你有一定的 ...

  7. 一文搞懂线程池原理——Executor框架详解

    文章目录 1 使用线程池的好处 2 Executor 框架 2.1 Executor 框架结构 2.2 Executor 框架使用示意图 2.3 Executor 框架成员 2.3.1 Executo ...

  8. java线程池使用最全详解

    线程池使用 前言 在执行一个异步任务或并发任务时,往往是通过直接new Thread()方法来创建新的线程,这样做弊端较多,更好的解决方案是合理地利用线程池,线程池的优势很明显,如下: 降低系统资源消 ...

  9. 线程池的好处,详解,单例(绝对好记)

    转载请注意出处:http://blog.csdn.net/fengye454545/article/details/79536986 前几天公司面试,问了很多线程池的问题,由于也是菜鸟一只本来对线程池 ...

最新文章

  1. 国贫县山西永和:“一揽子”保险“保”脱贫
  2. 准IT工作者如何择师、如何学习
  3. python怎么运行代码-python代码如何运行
  4. 「禾连健康」轻松实现弹性降本20%以上,竟然是因为使用了它!
  5. lisp ssget 浩辰_AutoCAD和浩辰CAD,gCAD都可以用的lisp
  6. 解决:Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c5330000, 986513408,
  7. html支持的脚本语言,能不能让日志内容在支持html语言的同时支持一下脚本语言,拜托!拜托!...
  8. Redis之数据结构和底层编码
  9. Mysql高可用方案mmm
  10. 为flash游戏终极实用提速
  11. PanDownloadSpeedPan迅雷极速版合集
  12. 树形结构的数据库表Schema设计
  13. 为提高 SDLC 安全,GitHub 发布新功能|GitHub Universe 2022
  14. mysql统计每半小时内的数据(查寻某段时间内的数据)
  15. 汉语数字或罗马数字转化为阿拉伯数字:例如:一百二十三为123、III为3
  16. C66X中断整理 6678中断配置(含例程)
  17. Redis高级应(2)-事务以及LUA脚本
  18. iphone为什么不能连接到服务器未响应,苹果手机(iPhone)连接电脑没反应?(这样就可以解决!)...
  19. mac air 2013 boot camp 装win10双系统
  20. window.close关闭当前页面

热门文章

  1. 机器学习之决策树模型最优属性选择方法
  2. 吴恩达机器学习作业1-线性回归
  3. Mybatis报错mapkey is required解决方案
  4. 云计算、云安全、云道德
  5. 水性气凝胶涂料的制备方法,具体实施方式举例
  6. 《程序员,你伤不起》编辑的话
  7. yaml.parser.ParserError
  8. ajax 失败textstatus,rails 3 jquery ajax调用与statusText失败:“parsererror”
  9. Python报错:pandas.errors.ParserError: Error tokenizing data. C error: Expected 3……
  10. “任务管理器已被管理员禁用”解决方案