Hystrix具有折叠(或批处理)请求的高级功能。 如果两个或多个命令同时运行相似的请求,Hystrix可以将它们组合在一起,运行一个批处理请求,并将拆分结果分派回所有命令。 首先让我们看看Hystrix如何工作而不会崩溃。 假设我们有一个StockPrice给定Ticker StockPrice的服务:

import lombok.Value;
import java.math.BigDecimal;
import java.time.Instant;@Value
class Ticker {String symbol;
}@Value
class StockPrice {BigDecimal price;Instant effectiveTime;
}interface StockPriceGateway {default StockPrice load(Ticker stock) {final Set<Ticker> oneTicker = Collections.singleton(stock);return loadAll(oneTicker).get(stock);}ImmutableMap<Ticker, StockPrice> loadAll(Set<Ticker> tickers);
}

为了方便起见, StockPriceGateway核心实现必须提供loadAll()批处理方法,而实现load()方法则是为了方便。 因此,我们的网关能够批量加载多个价格(例如,以减少延迟或网络协议开销),但是目前我们不使用此功能,始终一次加载一个股票的价格:

class StockPriceCommand extends HystrixCommand<StockPrice> {private final StockPriceGateway gateway;private final Ticker stock;StockPriceCommand(StockPriceGateway gateway, Ticker stock) {super(HystrixCommandGroupKey.Factory.asKey("Stock"));this.gateway = gateway;this.stock = stock;}@Overrideprotected StockPrice run() throws Exception {return gateway.load(stock);}
}

这样的命令将始终为每个Ticker调用StockPriceGateway.load() ,如以下测试所示:

class StockPriceCommandTest extends Specification {def gateway = Mock(StockPriceGateway)def 'should fetch price from external service'() {given:gateway.load(TickerExamples.any()) >> StockPriceExamples.any()def command = new StockPriceCommand(gateway, TickerExamples.any())when:def price = command.execute()then:price == StockPriceExamples.any()}def 'should call gateway exactly once when running Hystrix command'() {given:def command = new StockPriceCommand(gateway, TickerExamples.any())when:command.execute()then:1 * gateway.load(TickerExamples.any())}def 'should call gateway twice when command executed two times'() {given:def commandOne = new StockPriceCommand(gateway, TickerExamples.any())def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())when:commandOne.execute()commandTwo.execute()then:2 * gateway.load(TickerExamples.any())}def 'should call gateway twice even when executed in parallel'() {given:def commandOne = new StockPriceCommand(gateway, TickerExamples.any())def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:2 * gateway.load(TickerExamples.any())}}

如果您不了解Hystrix,则通过将外部调用包装在命令中可以获得许多功能,例如超时,断路器等。但这不是本文的重点。 看一下最后两个测试:当两次,顺序或并行( queue() )两次询问任意行情的价格时,我们的外部gateway也被两次调用。 上一次测试特别有趣–我们几乎同时要求相同的报价,但Hystrix不能弄清楚。 这两个命令是完全独立的,将在不同的线程中执行,彼此之间一无所知-即使它们几乎同时运行。

折叠就是找到类似的请求并将其组合。 批处理(我将这个术语与崩溃互换使用)不会自动发生,并且需要一些编码。 但是首先让我们看看它的行为:

def 'should collapse two commands executed concurrently for the same stock ticker'() {given:def anyTicker = TickerExamples.any()def tickers = [anyTicker] as Setand:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, anyTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:0 * gateway.load(_)1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any())
}def 'should collapse two commands executed concurrently for the different stock tickers'() {given:def anyTicker = TickerExamples.any()def otherTicker = TickerExamples.other()def tickers = [anyTicker, otherTicker] as Setand:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any(),otherTicker, StockPriceExamples.other())
}def 'should correctly map collapsed response into individual requests'() {given:def anyTicker = TickerExamples.any()def otherTicker = TickerExamples.other()def tickers = [anyTicker, otherTicker] as Setgateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any(),otherTicker, StockPriceExamples.other())and:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:def anyPrice = futureOne.get()def otherPrice = futureTwo.get()then:anyPrice == StockPriceExamples.any()otherPrice == StockPriceExamples.other()
}

第一个测试证明,不是两次调用load() ,而是几乎一次调用loadAll() 。 还要注意,由于我们要求相同的Ticker (来自两个不同的线程),因此loadAll()仅请求一个代码。 第二个测试显示两个并发请求,两个不同的行情收录器被折叠为一个批处理调用。 第三次测试可确保我们仍能对每个单独的请求得到正确的响应。 相反,延长HystrixCommand我们必须扩展更加复杂HystrixCollapser 。 现在是时候看到StockTickerPriceCollapsedCommand实现,它无缝替换了StockPriceCommand

class StockTickerPriceCollapsedCommand extends HystrixCollapser<ImmutableMap<Ticker, StockPrice>, StockPrice, Ticker> {private final StockPriceGateway gateway;private final Ticker stock;StockTickerPriceCollapsedCommand(StockPriceGateway gateway, Ticker stock) {super(HystrixCollapser.Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Stock")).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));this.gateway = gateway;this.stock = stock;}@Overridepublic Ticker getRequestArgument() {return stock;}@Overrideprotected HystrixCommand<ImmutableMap<Ticker, StockPrice>> createCommand(Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {final Set<Ticker> stocks = collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(toSet());return new StockPricesBatchCommand(gateway, stocks);}@Overrideprotected void mapResponseToRequests(ImmutableMap<Ticker, StockPrice> batchResponse, Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {collapsedRequests.forEach(request -> {final Ticker ticker = request.getArgument();final StockPrice price = batchResponse.get(ticker);request.setResponse(price);});}}

这里有很多事情要做,所以让我们逐步回顾StockTickerPriceCollapsedCommand 。 前三种通用类型:

  • BatchReturnType (在我们的示例中为ImmutableMap<Ticker, StockPrice> )是批处理命令响应的类型。 如您将在后面看到的那样,崩溃程序会将多个小命令变成批处理命令。 这是该批处理命令的响应的类型。 请注意,它与StockPriceGateway.loadAll()类型相同。
  • ResponseTypeStockPrice )是要折叠的每个命令的类型。 在我们的例子中,我们正在折叠HystrixCommand<StockPrice> 。 稍后,我们将BatchReturnTypeBatchReturnType为多个StockPrice
  • RequestArgumentTypeTicker )是我们将要折叠(批处理)的每个命令的输入。 当多个命令一起批处理时,我们最终将所有这些替换为一个批处理命令。 此命令应接收所有单个请求,以便执行一个批处理请求。

withTimerDelayInMilliseconds(100)将在稍后说明。 createCommand()创建一个批处理命令。 此命令应替换所有单个命令并执行批处理逻辑。 在我们的情况下,我们不会进行多次单独的load()调用:

class StockPricesBatchCommand extends HystrixCommand<ImmutableMap<Ticker, StockPrice>> {private final StockPriceGateway gateway;private final Set<Ticker> stocks;StockPricesBatchCommand(StockPriceGateway gateway, Set<Ticker> stocks) {super(HystrixCommandGroupKey.Factory.asKey("Stock"));this.gateway = gateway;this.stocks = stocks;}@Overrideprotected ImmutableMap<Ticker, StockPrice> run() throws Exception {return gateway.loadAll(stocks);}
}

此类与StockPriceCommand之间的唯一区别是,它需要一堆Ticker并返回所有价格。 Hystrix将收集几个StockTickerPriceCollapsedCommand实例,一旦它具有足够StockTickerPriceCollapsedCommand (稍后再介绍),它将创建一个StockPriceCommand 。 希望这很清楚,因为mapResponseToRequests()涉及的更多。 折叠后的StockPricesBatchCommand完成后,我们必须以某种方式拆分批处理响应,并将回复传达回各个命令,而不会崩溃。 从这个角度来看, mapResponseToRequests()实现非常简单:我们收到批处理响应和包装CollapsedRequest<StockPrice, Ticker>的集合。 现在,我们必须遍历所有正在等待的单个请求并完成它们( setResponse() )。 如果我们不完成某些请求,它们将无限期挂起并最终超时。

怎么运行的

这是描述如何实现折叠的正确时机。 我之前说过崩溃是在同时发生两个请求时发生的。 没有一样的时间了 。 实际上,当第一个可折叠请求进入时,Hystrix会启动一个计时器。 在我们的示例中,我们将其设置为100毫秒。 在此期间,我们的命令被挂起,等待其他命令加入。 在此可配置时间段之后,Hystrix将调用createCommand() ,收集所有请求键(通过调用getRequestArgument() )并运行它。 批处理命令完成后,它将使我们将结果分发给所有等待中的单个命令。 如果我们担心创建庞大的批处理,也可以限制已折叠请求的数量–另一方面,在此短时间内可以容纳多少并发请求?

用例和缺点

请求折叠应在承受高负载(请求频率很高)的系统中使用。 如果每个折叠时间窗口(在示例中为100毫秒)仅收到一个请求,则折叠只会增加开销。 这是因为每次您调用可折叠命令时,它都必须等待,以防万一其他命令想要加入并形成批处理。 仅当至少折叠了几个命令时,这才有意义。 节省网络等待时间和/或更好地利用合作者中的资源可以平衡浪费的等待时间(与单个呼叫相比,批处理请求通常要快得多)。 但是请记住,折叠是一把双刃剑,在特定情况下很有用。

最后要记住的一件事–为了使用请求折叠,您需要在try-finally块中使用HystrixRequestContext.initializeContext()shutdown()

HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {//...
} finally {context.shutdown();
}

崩溃与缓存

您可能会认为可以通过适当的缓存来代替崩溃。 这不是真的。 在以下情况下使用缓存:

  1. 资源可能会被多次访问
  2. 我们可以安全地使用先前的值,它会在一段时间内保持有效, 或者我们确切地知道如何使它无效
  3. 我们可以提供对同一资源的并发请求以多次计算

另一方面,折叠不会强制数据的局部性(1),它总是命中实际服务,并且永远不会返回陈旧的数据(2)。 最后,如果我们从多个线程中请求相同的资源,我们将仅调用一次备份服务(3)。 在进行缓存的情况下,除非您的缓存真的很聪明,否则两个线程将独立地发现缓存中没有给定的资源,并两次请求支持服务。 但是,折叠可以与缓存一起使用-通过在运行可折叠命令之前咨询缓存。

摘要

请求折叠是一个有用的工具,但是用例非常有限。 它可以显着提高我们系统的吞吐量,并限制外部服务的负载。 崩溃可以神奇地平抑流量高峰,而不是将其散布到各处。 只要确保将其用于以极高频率运行的命令即可。

翻译自: https://www.javacodegeeks.com/2014/11/batching-collapsing-requests-in-hystrix.html

Hystrix中的批量(折叠)请求相关推荐

  1. Hystrix面试 - 基于 request cache 请求缓存技术优化批量商品数据查询接口

    Hystrix面试 - 基于 request cache 请求缓存技术优化批量商品数据查询接口 Hystrix command 执行时 8 大步骤第三步,就是检查 Request cache 是否有缓 ...

  2. 软件测试中的批量交易测试

    初次接触批量测试的小伙伴一定一头雾水,不知从何下手.批量交易是什么?它与联机交易有何不同?批量测试都要关注哪些内容?本文结合实际测试经验,详细介绍软件测试中的批量交易测试. 一.什么是批量交易? 本文 ...

  3. 记一次解决 Feign 提交批量添加请求收到 400 报错的经历

    上周在实现用 OpenFeign 提交批量添加请求时,遇到一个奇怪的问题,本来 Post 请求的 body 体,因为是批量请求,内容就比较多,准备起来就比较麻烦,仔细检查了很多次,提交时还是各种失败, ...

  4. JSP中的重定向和请求转发以及它们的区别

    我们先硬着头皮看一下重定向的定义: 重定向(Redirect): 客户端浏览器向Web应用服务器端发送一个请求,Web服务器端使用HttpServletResponse的sendRedirect()方 ...

  5. 爬虫之requests模块在headers参数中携带cookie发送请求

    爬虫之requests模块在headers参数中携带cookie发送请求 网站经常利用请求头中的Cookie字段来做用户访问状态的保持,那么我们可以在headers参数中添加Cookie,模拟普通用户 ...

  6. Python中的http网络请求,用它就对了

    软硬件环境 windows 10 64bits anaconda with python 3.7 requests 2.25.0 简介 requests是用来在Python中进行标准HTTP请求的第三 ...

  7. php循环输出多个网络地址图片,php中curl循环往请求多个URL和多线程去请求多个URL的方法...

    php 中curl 循环去请求多个URL和多线程去请求多个URL的方法 第一种:循环请求$sr=array(url_1,url_2,url_3); foreach ($sr as $k=>$v) ...

  8. Php如何发出请求,PHP中如何发送HTTP请求?

    PHP中如何发送HTTP请求? 在 HTML上 提交参数 A 和 B 到 send.php中,在send.php中接收到传过来的参数 A 和 B 并将这2个参数以 http的形式发送给目标地址:一下是 ...

  9. 请求体的方式传参_Angularjs中$http以post请求通过消息体传递参数的实现方法

    本文实例讲述了Angularjs中$http以post请求通过消息体传递参数的方法.分享给大家供大家参考,具体如下: Angularjs中,$http以post在消息体中传递参数,需要做以下修改,以确 ...

最新文章

  1. linux下打印机共享及监控
  2. 大名鼎鼎的红黑树,你get了么?2-3树 绝对平衡 右旋转 左旋转 颜色反转
  3. 编码调试:UnicodeDecodeError: ‘gbk‘ codec can‘t decode byte 0xaf in position 12: illegal multibyte sequen
  4. Codeforces Round #494 (Div. 3)【未完结】
  5. 粒子群PSO优化算法学习笔记 及其python实现(附讲解如何使用python语言sko.PSO工具包)
  6. (最优解法)46行代码AC_HDU1242 Rescue(DFS解法+BFS解法)
  7. 一个 js 中值传递和引用传递的坑。
  8. struts2 中文乱码问题,自定义过滤器通用解决方法
  9. linux 无法打开.ttf_win7系统ttf文件打不开怎么办【解决方法】
  10. python删除列表中的元素
  11. PythonGUI编程|使用Tkinter制作快递查询软件
  12. 【数据结构笔记44】线性探测的散列表的逆问题(拓扑排序的方法)
  13. html的字号txt的制作,font 文本颜色 字体 大小标签
  14. 从.NET未来趋势发展的两则PPT而来的乱语
  15. 在博客园cnblogs的博客内容之中显示地图(测试中)
  16. AWS DeepRacer 参数调优 Amazon SageMaker 和 Amazon RoboMaker
  17. 程序员30岁后怎么办
  18. plsql以及instantclient下载安装配置使用
  19. 【Keras中文文档】Layer Convolutional网址
  20. WinRAR去广告:只需六步,教你去除WinRAR的广告

热门文章

  1. struts+hibernate+oracle+easyui实现lazyout组件的简单案例——Action的实现类
  2. 你们也只剩下点赞的交情
  3. Android RaingBar评分条的使用
  4. GitLab创建项目 命令上传代码
  5. java快排原理_Java数据结构与算法——快速排序
  6. java阻塞队列小结
  7. rabbitmq-消息追踪rabbitmq_tracing
  8. Hibernate的关联映射--一对多、
  9. jpa映射json_如何使用JPA和Hibernate映射JSON集合
  10. java web源代码_检测Java Web应用程序而无需修改其源代码