版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u013332124/article/details/82694076
本篇重点介绍httpclient连接池的相关原理以及介绍,顺带的介绍httpclient发送请求时的简单介绍,并会带上一些源码分析。本篇博文是基于httpclient的4.5.2版本进行介绍的。

一、发送请求的流程原理
几个关键的类和接口介绍
在介绍架构原理前,先介绍几个类和接口,方便读者对httpclient的整体设计有个大概的概念。

HttpClient:一个接口,即http客户端的抽象,主要就是用它发送请求http请求。它的主要实现有CloseableHttpClient,相信读者们比较熟悉。

HttpRequestBase:一个抽象类,是请求内容的抽象。包括了请求协议、uri、还有一些配置。我们常用的HttpGet和HttpPost都是它的子类。

HttpClientConnectionManager:一个接口,连接管理的抽象。一般要发送http请求前,需要和目标服务建立连接,然后再发送数据包。这个连接管理器可以对连接以池的方式进行管理。

HttpRoute:一个final类,用来表示目标服务器(ip+端口)。

发送流程图

一个HttpRequestBase在被httpclient执行后,会经过一个链路被一个个组件处理。这里使用了职责链的设计模式,一个组件处理完后,就会交给下一个组件处理。这样做的好处就是如果要移除一个组件或者添加一个新的组件来实现对请求的一些处理非常方便。这里要说一下,上图列的组件中的一些是根据配置决定是否加入到该执行链中。

我们一般通过CloseableHttpClient httpClient = HttpClients.custom().build();获取到一个httpClient。这里返回的实际对象其实是InternalHttpClient类,所以执行httpclient.execute(request)时候最终会调用InternalHttpClient#doExecute()。我们看下对应的源码

protected CloseableHttpResponse doExecute(
            final HttpHost target,
            final HttpRequest request,
            final HttpContext context) throws IOException, ClientProtocolException {
        Args.notNull(request, "HTTP request");
        HttpExecutionAware execAware = null;
        if (request instanceof HttpExecutionAware) {
            execAware = (HttpExecutionAware) request;
        }
        try {
            final HttpRequestWrapper wrapper = HttpRequestWrapper.wrap(request, target);
            final HttpClientContext localcontext = HttpClientContext.adapt(
                    context != null ? context : new BasicHttpContext());
            RequestConfig config = null;
            if (request instanceof Configurable) {
                config = ((Configurable) request).getConfig();
            }
            if (config == null) {
                final HttpParams params = request.getParams();
                if (params instanceof HttpParamsNames) {
                    if (!((HttpParamsNames) params).getNames().isEmpty()) {
                        config = HttpClientParamConfig.getRequestConfig(params);
                    }
                } else {
                    config = HttpClientParamConfig.getRequestConfig(params);
                }
            }
            if (config != null) {
                localcontext.setRequestConfig(config);
            }
            setupContext(localcontext);
            final HttpRoute route = determineRoute(target, wrapper, localcontext);
            return this.execChain.execute(route, wrapper, localcontext, execAware);
        } catch (final HttpException httpException) {
            throw new ClientProtocolException(httpException);
        }
    }

通过代码可以看到,这里主要是做两件事

获取requestConfig,然后设置到请求上下文中
通过请求获取对应的目标服务器HttpRoute
之后就交给处理链处理请求。

处理链的构造是在InternalHttpClient的构造中完成的,也就是HttpClients.custom().build()方法中构造起来的。我们看下处理链的构造代码

public CloseableHttpClient build() {
        ...
        ClientExecChain execChain = createMainExec(
                requestExecCopy,
                connManagerCopy,
                reuseStrategyCopy,
                keepAliveStrategyCopy,
                new ImmutableHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
                targetAuthStrategyCopy,
                proxyAuthStrategyCopy,
                userTokenHandlerCopy);

execChain = decorateMainExec(execChain);

...
        execChain = new ProtocolExec(execChain, httpprocessorCopy);

execChain = decorateProtocolExec(execChain);

// Add request retry executor, if not disabled
        if (!automaticRetriesDisabled) {
            HttpRequestRetryHandler retryHandlerCopy = this.retryHandler;
            if (retryHandlerCopy == null) {
                retryHandlerCopy = DefaultHttpRequestRetryHandler.INSTANCE;
            }
            execChain = new RetryExec(execChain, retryHandlerCopy);
        }

HttpRoutePlanner routePlannerCopy = this.routePlanner;
        if (routePlannerCopy == null) {
            SchemePortResolver schemePortResolverCopy = this.schemePortResolver;
            if (schemePortResolverCopy == null) {
                schemePortResolverCopy = DefaultSchemePortResolver.INSTANCE;
            }
            if (proxy != null) {
                routePlannerCopy = new DefaultProxyRoutePlanner(proxy, schemePortResolverCopy);
            } else if (systemProperties) {
                routePlannerCopy = new SystemDefaultRoutePlanner(
                        schemePortResolverCopy, ProxySelector.getDefault());
            } else {
                routePlannerCopy = new DefaultRoutePlanner(schemePortResolverCopy);
            }
        }
        // Add redirect executor, if not disabled
        if (!redirectHandlingDisabled) {
            RedirectStrategy redirectStrategyCopy = this.redirectStrategy;
            if (redirectStrategyCopy == null) {
                redirectStrategyCopy = DefaultRedirectStrategy.INSTANCE;
            }
            execChain = new RedirectExec(execChain, routePlannerCopy, redirectStrategyCopy);
        }

// Optionally, add service unavailable retry executor
        final ServiceUnavailableRetryStrategy serviceUnavailStrategyCopy = this.serviceUnavailStrategy;
        if (serviceUnavailStrategyCopy != null) {
            execChain = new ServiceUnavailableRetryExec(execChain, serviceUnavailStrategyCopy);
        }
        // Optionally, add connection back-off executor
        if (this.backoffManager != null && this.connectionBackoffStrategy != null) {
            execChain = new BackoffStrategyExec(execChain, this.connectionBackoffStrategy, this.backoffManager);
        }
        ...
        return new InternalHttpClient(
                execChain,
                connManagerCopy,
                routePlannerCopy,
                cookieSpecRegistryCopy,
                authSchemeRegistryCopy,
                defaultCookieStore,
                defaultCredentialsProvider,
                defaultRequestConfig != null ? defaultRequestConfig : RequestConfig.DEFAULT,
                closeablesCopy);
    }

由于这个方法太长,所以只保留了处理链的那部分代码。

下面介绍处理链中各个组件的一个大概功能:

MainClientExec:主要执行客户端请求的,通过连接管理器,来把请求绑定到具体的连接上面,接着发送请求。同时也是在这个组件里面做连接的池化处理等。
ProtocolExec:通过一系列的HttpProcessor处理链对Http消息按格式编码以及解码。每一个processor处理一个范畴的事情,比如处理header,content以及cookie等等。我们可以往HttpRequestInterceptor和HttpResponseInterceptor中添加我们自己定义的拦截器。这样,HttpProcessor在处理请求和响应前,就会经过我们设定的拦截器进行相应的操作。
RetryExec:进行重连操作。是否要重连的判断的根据配置的HttpRequestRetryHandler。
RedirectExec:处理重定向的情况
ServiceUnavailableRetryExec:返回503进行重试
BackoffStrategyExec:对出现连接或者响应超时异常的route进行降级,缩小该route上连接数,能使得服务质量更好的route能得到更多的连接。降级的速度可以通过因子设置,默认是每次降级减少一半的连接数,即降级因子是0.5。
上图中的HttpRequestExecutor只是MainClientExec中的组件,用于真正的发送http请求给目标服务器。

二、连接池的管理
通过下面这段代码,我们可以给httpclient设置连接管理器

private static CloseableHttpClient createHttpClient() {
        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
        // 将最大连接数添加
        cm.setMaxTotal(200);
        // 将每一个路由基础的连接添加
        cm.setDefaultMaxPerRoute(40);
        HttpHost httpHost = new HttpHost("www.baidu.com", 80);
        HttpRoute httpRoute = new HttpRoute(httpHost);
        cm.setMaxPerRoute(httpRoute, 80);

HttpRequestRetryHandler httpRetryHandler = new DefaultHttpRequestRetryHandler(5, false);

//默认连接配置
        RequestConfig defaultRequestConfig = RequestConfig.custom()
                .setConnectionRequestTimeout(10000)
                .setConnectTimeout(10000).setSocketTimeout(10000).build();

return HttpClients.custom()
                .setDefaultRequestConfig(defaultRequestConfig)
                .evictExpiredConnections()
                .evictIdleConnections(10, TimeUnit.SECONDS)
                .setConnectionManager(cm)
                .setRetryHandler(httpRetryHandler).build();
    }

在上面的例子中,我们初始化了一个基于连接池的连接管理器。这个连接池中最多持有200个连接,每个目标服务器最多持有40个连接。其中,我们专门设定了www.baidu.com:80的目标服务器的可以持有的最大连接数是80个。

如果我们不给httpclient配置指定的连接管理器,在默认情况下,httpclient也会自动使用PoolingHttpClientConnectionManager作为连接管理器。但是PoolingHttpClientConnectionManager默认的maxConnPerRoute和maxConnTotal分别是是2和20。也就是对于每个服务器最多只会维护2个连接,看起来有点少。所以,在日常使用时我们尽量使用自己配置的连接管理器比较好。

a. 连接池结构如下

连接池管理器会为每个httpRoute单独的维护一个连接池。管理着available、leased、pending这些对象。同时,CPool本身也会维护一个总的available、leased、pending对象,是那些routeToPool中available、leased、pending的总和。这里介绍一下上图的几个概念

1. PoolEntity
2. LinkedList available – 存放可用连接
available 表示可用的连接,他们是用LinkedList来存放的。

当连接被使用完后,会被放入链表的头部。同时,当需要取连接时,也是从链表的头部开始遍历,直到获取可用的连接为止。这样做的目的是为了尽快的获取到可用的连接,因为在链表头部的都是刚放入链表的连接,离过期时间肯定是最远的。如果从链表尾部获取的话,那么很可能会获取到失效的连接。

同时,删除链表的失效连接时从链表尾部开始遍历的。

3. HashSet leased – 存放被租用的连接
leased存放正在被使用的连接。如果一个连接被创建或者从available 链表中取出,就会先放入leased集合中。同时,用完连接后,就从leased集合中移除掉掉。因为就add和remove操作,所以使用HashSet的数据结构,效率很高。

maxTotal的配置就是available链表和leased集合的总和限制。

4. LinkedList pending – 存放等待获取连接的线程的Future
当从池中获取连接时,如果available链表没有现成可用的连接,且当前路由或连接池已经达到了最大数量的限制,也不能创建连接了,此时不会阻塞整个连接池,而是将当前线程用于获取连接的Future放入pending链表的末尾,之后当前线程调用await(),释放持有的锁,并等待被唤醒。

当有连接被release()释放回连接池时,会从pending链表头获取future,并唤醒其线程继续获取连接,做到了先进先出。

5. RouteToPool
每个RouteToPool都会管理一个池,也就是持有pending 、leased 、available 这些对象。

同时,CPool本身也会维护一个总的available、leased、pending对象,是那些routeToPool中链表和集合的总和。

b. 分配连接 & 建立连接
分配连接
连接分配的过程:

如果available中有可用连接,则直接返回该连接
判断routeToPool和全局的连接数量是否分别达到maxPerRoute和MaxTotal的限制,如果都没达到,则创建一个连接,然后返回
如果上面的条件都没达成,就挂起当前线程,然后构造一个Future对象放入pending队列,等待有连接释放后唤醒自己。
建立连接
当分配到PoolEntry连接实体后,会调用establishRoute(),建立socket连接并与ManagedHttpClientConnection绑定。

c. 回收连接 & 保持连接
回收连接

保持连接
连接是否要保持
连接是否被标记成可重用是根据连接是否可以保持。

客户端如果希望保持长连接,应该在发起请求时告诉服务器希望服务器保持长连接(http 1.0设置connection字段为keep-alive,http 1.1字段默认保持)。根据服务器的响应来确定是否保持长连接,判断原则如下:

检查返回response报文头的Transfer-Encoding字段,若该字段值存在且不为chunked,则连接不保持,直接关闭
检查返回的response报文头的Content-Length字段,若该字段值为空或者格式不正确(多个长度,值不是整数)或者小于0,则连接不保持,直接关闭
检查返回的response报文头的connection字段值,如果字段存在,且字段值为close 则连接不保持,直接关闭,若字段值为keep-alive则连接标记为保持。字段不存在或者不为这两个中的一个,则http 1.1版本默认为保持,将连接标记为保持, 1.0版本默认为连接不保持,直接关闭
连接保持时间
连接保持时,会更新PoolEntry的expiry到期时间,计算逻辑为:

如果response头中的keep-alive字段中timeout属性值存在且为正值:newExpiry = System.currentTimeMillis() + timeout;。如果timeout值不存在或者为负数,则newExpiry = Long.MAX_VALUE。
最后拿原来的expire和新的expire取最小值:expire=Math.min(oldExpire,newExpire)
如何释放连接
httpclient会将我们的请求响应封装成CloseableHttpResponse对象,我们可以通过这个对象获取响应的内容。获取内容时,最终是通过底层的InputStream.read()进行读取的,httpclient中对于该InputStream的实现是org.apache.http.conn.EofSensorInputStream,它会在read过程中不断的检查流是否读完,一旦检测到读完,就会自动根据该连接是否可重用来选择把连接返回给连接池或者关闭连接。

同时,我们也可以人为功能CloseableHttpResponse.close()方法来回收该连接或者关闭该连接。

d. 连接的过期和失效
每次从连接池中获取连接时,都会先检测该连接是否已经过期或者关闭,如果是的话,就分别从routeToPool、httpConnPool的available队列移除,然后继续获取下一个连接。

expire到期
每个连接都有一个expire时间,这个过期时间是连接管理器用来管理连接的,并不是说过了这个时间tcp连接就不能用了。只是连接管理器不会再使用这个连接了。

底层连接被关闭
可能连接被服务端单方面关闭。那么,httpclient是怎么判断连接被关闭的呢?

httpclient会通过socket输入流尝试读取数据,它将soTimeout设置为1ms,然后读取数据。如果返回的字节数小于0,则说明该连接关闭了。

/**BHttpConnectionBase#isStale()*/
public boolean isStale() {
    if (!isOpen()) {
        return true;
    }
    try {
        final int bytesRead = fillInputBuffer(1);
        return bytesRead < 0;
    } catch (final SocketTimeoutException ex) {
        return false;
    } catch (final IOException ex) {
        return true;
    }
socket输入流读取数据的底层也是通过recv系统指令完成的,执行recv指令时,在阻塞的情况下,如果返回的值是-1,则表示连接被异常关闭。如果返回的是0,则表示连接被关闭了。我们这里讲soTimeout设置为1ms,所以最多只会阻塞1ms就返回,如果连接被服务端单方面关闭的话,这里就会返回-1,我们马上就知道连接被关闭了。

e. 后台线程清除过期和闲置过久的连接
如果每次获取连接时都要去判断连接是否过期或者关闭,会造成一定的性能损耗。另外如果连接长时间没用,长期闲置在那也是一种资源浪费。所以httpclient提供了一个机制,也就是开启后台线程定时的清除过期和闲置过久的连接。注意,4.5.2版本默认是有这个机制的,以前的版本不太确定有没有,如果没有我们也可以自己写一个。PoolingHttpClientConnectionManager提供了两个方法,closeExpiredConnections和closeIdleConnections,分别用来清除过期的连接以及闲置过久的连接。

这个线程默认是不开启的,我们可以在构建httpclient的时候设置

return HttpClients.custom()
                .setDefaultRequestConfig(defaultRequestConfig)
                //开启后台线程清除过期的连接
                .evictExpiredConnections()
                //开启后台线程清除闲置30秒以上的连接
                .evictIdleConnections(30, TimeUnit.SECONDS)
                .setConnectionManager(cm)
                .setRetryHandler(httpRetryHandler).build();
1
2
3
4
5
6
7
8
evictExpiredConnections和evictIdleConnections中只要有一个被调用了,就会开启那个后台线程。

if (!this.connManagerShared) {
            if (closeablesCopy == null) {
                closeablesCopy = new ArrayList<Closeable>(1);
            }
            final HttpClientConnectionManager cm = connManagerCopy;
            //只要有一个为true,就开启清除线程
            if (evictExpiredConnections || evictIdleConnections) {
                final IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor(cm,
                        maxIdleTime > 0 ? maxIdleTime : 10, maxIdleTimeUnit != null ? maxIdleTimeUnit : TimeUnit.SECONDS);
                closeablesCopy.add(new Closeable() {

@Override
                    public void close() throws IOException {
                        connectionEvictor.shutdown();
                    }

});
                //启动线程
                connectionEvictor.start();
            }
            closeablesCopy.add(new Closeable() {

@Override
                public void close() throws IOException {
                    cm.shutdown();
                }

});
        }

线程开启后,会执行下面的方法

public void run() {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        //休眠sleepTimeMs时间
                        Thread.sleep(sleepTimeMs);
                        //清除过期的连接
                        connectionManager.closeExpiredConnections();
                        if (maxIdleTimeMs > 0) {
                            //清除闲置过久的连接
                            connectionManager.closeIdleConnections(maxIdleTimeMs, TimeUnit.MILLISECONDS);
                        }
                    }
                } catch (final Exception ex) {
                    exception = ex;
                }

}
//关闭过期的连接
    @Override
    public void closeExpiredConnections() {
        this.log.debug("Closing expired connections");
        this.pool.closeExpired();
    }

public void closeExpired() {
        final long now = System.currentTimeMillis();
        enumAvailable(new PoolEntryCallback<T, C>() {

@Override
            public void process(final PoolEntry<T, C> entry) {
                if (entry.isExpired(now)) {
                    entry.close();
                }
            }

});
    }
//关闭闲置太久的连接 
    @Override
    public void closeIdleConnections(final long idleTimeout, final TimeUnit tunit) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit);
        }
        this.pool.closeIdle(idleTimeout, tunit);
    }

public void closeIdle(final long idletime, final TimeUnit tunit) {
        Args.notNull(tunit, "Time unit");
        long time = tunit.toMillis(idletime);
        if (time < 0) {
            time = 0;
        }
        final long deadline = System.currentTimeMillis() - time;
        enumAvailable(new PoolEntryCallback<T, C>() {

@Override
            public void process(final PoolEntry<T, C> entry) {
                if (entry.getUpdated() <= deadline) {
                    entry.close();
                }
            }

});
    }

另外,从上面的代码可以看出,这个线程其实只会关闭过期的连接以及闲置太久的连接,对于那些被服务端异常关闭的连接,是不会处理的。

三、总结
httpclient的介绍大概就到这里。由于时间有限,并没有很深入的研究其源码实现,但是对其架构也有了一定的认识。再遇到相关的问题时也可以较快的分析出来,实在不行也可以跟踪源码查。

另外,本文如果哪里有说的不对的地方,欢迎指出,一起交流~

本文参考链接:

https://blog.csdn.net/umke888/article/details/54881946

https://blog.csdn.net/szwandcj/article/details/51291967
————————————————
版权声明:本文为CSDN博主「疯狂哈丘」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u013332124/article/details/82694076

httpclient架构原理介绍 连接池详解相关推荐

  1. python requests 异步调用_构建高效的python requests长连接池详解

    前文: 最近在搞全网的CDN刷新系统,在性能调优时遇到了requests长连接的一个问题,以前关注过长连接太多造成浪费的问题,但因为系统都是分布式扩展的,针对这种各别问题就懒得改动了. 现在开发的缓存 ...

  2. Spring Boot 使用 Druid 连接池详解

    Spring Boot 使用 Druid 连接池详解 Alibaba Druid 是一个 JDBC 组件库,包含数据库连接池.SQL Parser 等组件,被大量业务和技术产品使用或集成,经历过严苛的 ...

  3. Mysql连接池详解——原理部分

    引言 为什么要使用连接池 线程池如何配合连接池使用, 连接池和线程池数量 不能根据经验值直接设置,需要根据io同步的具体时间去测试得到最优的值 同步连接池和异步连接池的区别 连接池的扩展 一.池化技术 ...

  4. Hyperf连接池详解

    开头语 Hyperf官网介绍了为什么要使用连接池及连接池的简单案例,但是对新手却不太友好,不过我还是想基于官方文档,来做一篇更深的讲解. 先来看看官方文档对于连接池的介绍 为什么要使用连接池 当并发量 ...

  5. jedis连接池详解(Redis)

    转自:http://tianxingzhe.blog.51cto.com/3390077/1684306 原子性(atomicity): 一个事务是一个不可分割的最小工作单位,事务中包括的诸操作要么都 ...

  6. java连接池详解与自定义es连接池

    目录 1 版本选择 2 依赖选择 3 使用commons-pool构造连接池 3.1 pom.xml 3.2 对象池类 GenericObjectPool普通对象池 GenericKeyedObjec ...

  7. Redis Lettuce客户端异步连接池详解

    前言 异步/非阻塞编程模型需要非阻塞API才能获得Redis连接.阻塞的连接池很容易导致阻塞事件循环并阻止您的应用程序进行处理的状态.Lettuce带有异步,非阻塞池实现,可与Lettuces异步连接 ...

  8. java class文件常量池_《Java虚拟机原理图解》 1.2.3、Class文件中的常量池详解(下)...

    Java内存区域         1.程序计数器(Program Counter Register)(线程私有的)         2.Java虚拟机栈 (Java Virtual Machine S ...

  9. 【初篇】DHT11连接STM32、One wire单总线原理、GPIO代码详解

    目录 一.DHT11单总线原理 二.代码详解 三.代码 代码见文章末尾 一.DHT11单总线原理 DHT11温湿度传感器只需要一根线即可和MCU进行数据交换,无数据传输时,单线应为高电平状态,具体流程 ...

  10. 大型网站系统架构系列:负载均衡详解(一)

    大型网站系统架构系列:负载均衡详解(一) 2016-03-20 架构说 面对大量用户访问.高并发请求,海量数据,可以使用高性能的服务器.大型数据库,存储设备,高性能Web服务器,采用高效率的编程语言比 ...

最新文章

  1. 2021-04-29 Python绘制柱状图之可视化神器Pyecharts
  2. 高并发之并发容器详解(从入门到超神)
  3. c语言编程文件中删除数据结构,C语言数据结构实战(一)顺序表的插入与删除
  4. DZ论坛系统 UC_KEY拿webshell
  5. 定义表格的指定列的属性
  6. Linux守护进程的启动方法
  7. ecu故障现象_传感器坏了,会导致什么故障现象?
  8. 计算机编程和机器人编程有什么不同,编程和机器人编程的区别
  9. xshell的注册码
  10. (更新至v0.108)termux下载、安装教程 版本v0.88
  11. 服务器硬盘坏道修复教程视频,硬盘坏道修复工具使用教程
  12. Class文件格式总结
  13. espanso-跨平台文本扩展工具
  14. 二维数组正确初始化规则
  15. Android 输入系统 ANR机制的设计与实现
  16. 利用python和boto3包从amazon s3 bucket中下载数据
  17. 煤炭企业内部调拨物资称重问题如何管理(二)
  18. ***技巧(转载暗组)
  19. 基于BS的传统中医诊断系统软件开发与实现
  20. 马化腾豪掷千金,张一鸣爱而不得:这家美国公司要独立上市了

热门文章

  1. LVS负载均衡群集-NAT
  2. [Struts]使用tiles管理界面遇到困难
  3. Android在程序中浏览网页
  4. mysql数据库忘记密码时如何修改
  5. 百度云存储教程---免费建立自己的静态网站
  6. 所有子线程全部结束的判断
  7. 如何实现Windows Network所有会话的限制登录和访问控制
  8. 怎么让envi中影像背景为0_eCogniton波段权重设置——基于ENVI的波段信息量计算
  9. slub释放过程-do_slab_free
  10. 彻底理解Cisco NAT内部的一些事