我们通过分析代码可以看出,PyAlgoTrade分为六个组件:Strategies、Feeds、Brokers、DataSeries、Technicals、Optimizer。从业务流上看也是比较容易理解的:Feed(数据源)->DataSeries(数据结构化)->Technicals(指标计算)->Strategy(策略构建)->Optimizer(策略回测)->Broker(模拟/实盘交易);

PS.这套流程是否也可以用于实现投顾组合工具的系统搭建呢?我认为也是可以的。虽然在财富管理行业,对于客户而言可操作交易的资产是基金,而不是股票。但PyAlgoTrade从数据结构设计上就支持兼容不同的数据类型。下面,我们逐一分析下这六大组件:

Strategies:主要是定义交易逻辑,生成交易信号。什么时候买,什么时候卖

Feeds:这是一个抽象的概念。抽象出了数据获取的来源。比如csvfeed表示从csv文件获取行情数据,然后投喂给Strategies。Feeds不仅仅可以支持Bar数据(指一定时间周期内的开盘价、收盘价、最高价、最低价等)还有Tick数据(指每一次交易的价格、数量、时间等)。同时也支持自定义各类数据来源。一个强大的量化交易框架,势必需要支持各类数据来源以扩展策略的丰富度;比如如下的数据类型都可以通过继承feed数据结构来支持:

1. 新闻事件:某支股票相关的新闻发布信息,可以用于事件驱动型交易策略。

2. 财务估值:上市公司的财报信息,用于基本面量化策略。

3. 宏观经济:GDP、CPI等经济指标消息,用于宏观量化策略。

4. 社交媒体:微博、微信等平台上对某行业的热度与舆情数据,用于市场情绪分析

Brokers:定义经纪商交易接口;比如可以对接各家券商提供的交易接口、Binance的数字货币交易接口、IB的期货期权交易接口等。对于一些交易平台的限制进行backtesting模拟测试也是在这里。

DataSeries:这也是个抽象概念;主要是基于时间维度/时区用于管理价格数据;因为量化分析常常需要处理不同周期(日线、周线、月线)下的不同价格序列(开盘、收盘、均价等)。从功能上看,pandas的dataSeries比pyalgotrade的dataseries更为强大。我们完全可以用pandas来替换这部分的功能实现,并且支持的数据范围和操作也更丰富。

Technicals:技术指标往往是策略制定和评估的一部分。pyalgotrade主要基于DataSeries上用于计算各种技术指标;在设计上technicals也被设计成了dataseries的装饰器。说白了就是定义计算口径,基于价格、成交量、估值、财务、趋势等数据中捕捉到关键信息,从而判断当前的市场情况。比如SMA、EMA、MACD、RSI等指标计算。装饰器的意思就是在不改变dataseries中的原有序列数据的情况下,计算相关的技术指标;

Optimizer:有点类似深度学习中的Optimizer(主要是通过BP算法来对网络模型进行参数调优),pyalgotrade的Optimizer主要是用于在多台机器上进行backtesting任务的分发,目标就是实现并行计算,缩短策略回测所需要的时间。它主要由server和worker两部分组成。server主要负责提供bars、parameters、以及收集每个worker在跑策略时产生的结果。worker主要负责基于bars、parameters来运行策略;相当于构建实现了一个简单的分布式计算。

简答解释下,在制定交易策略时,往往需要多种技术指标的组合。比如RSI2策略是一种简单的均值回归交易策略:当股票的RSI2跌破10时,那么被认为是超卖,应该生成买入信号。当股票的RSI2涨破90时,被认为是超买,应该生成卖出信号。为了实现这个策略,就需要先计算指标:

1,设置入场移动平均线(150到250之间)和出场移动平均线(5到15之间)

2,设置超买阈值(75到95之间)和超卖阈值(5到25之间)。

而范围内的数字组合是40w,如果只有一台机器遍历所有的可能性势必是比较慢的,但是如果拆分成多个任务下发到多台机器中,那么策略回测的时间就能大大的缩短。

对策略进行回测的代码如下:

#生成策略所需要的技术指标
def parameters_generator():instrument = ["0000001.XSHE"]entrySMA = range(150, 251)exitSMA = range(5, 16)rsiPeriod = range(2, 11)overBoughtThreshold = range(75, 96)overSoldThreshold = range(5, 26)return itertools.product(instrument, entrySMA, exitSMA, rsiPeriod, overBoughtThreshold, overSoldThreshold)#构建csvfeeds,同parameters一同提供给serverfeed = quandlfeed.Feed()
feed.addBarsFromCSV(instrument, data_dir)# 启动5000端口,运行server
server.serve(feed, parameters_generator(), "localhost", 5000)#启动多个worker
worker.run(rsi2.RSI2, "localhost", 5000, workerName="localworker")

然后就可以在server端看到RSI2策略跑出来的最优参数组合以及最优结果了。

策略构建是一个永远都在迭代的话题,私募二级/公募的产品之所以这么多,也是因为在策略构建模块上各家机构都存在不同的差异。

PyAlgotrade是基于事件驱动的框架,在事件回调的设计上实现的很优雅。我们继续深入学习一下。其实,所有的事件驱动机制基本上都遵循一个套路模式,就是subject/dispatcher/observer模式。简单来说,subject发布事件,dispatcher收集事件然后分发,observer对事件再进行处理。我们在设计App端架构时也常常会使用这个模式,比如NetParser模块收到API的返回结果,解析完成就可以分发该消息到controller层,然后controller层根据业务逻辑处理数据model后,最后更新view呈现到界面。

我们先看看这个Event的数据结构和处理方式,它既支持发布事件,也支持处理事件:

1. 用__handlers列表维护所有的事件处理器(handler就是表示订阅者,相当于回调函数指针)。

2. 提供subscribe()方法用于订阅事件,将事件处理器添加到__handlers列表。

3. 提供unsubscribe()方法用于取消订阅,从__handlers列表移除事件处理器。

4. 提供emit()方法发布事件,会调用__handlers列表中的所有事件处理器,传入事件参数。

5. 当事件触发时(__emitting > 0),subscribe()与unsubscribe()方法的调用会被暂存到__deferred列表。等 emit()调用结束后会统一应用__deferred列表中的订阅与取消订阅操作。这个设计避免了在发布事件过程中发生操作__handlers列表的情况(很优秀的实现,避免了多线程需要加锁的设计)

6. 使用__applyChanges()方法来统一应用__deferred列表中的操作。

class Event(object):def __init__(self):self.__handlers = []self.__deferred = []self.__emitting = 0def __subscribeImpl(self, handler):assert not self.__emittingif handler not in self.__handlers:self.__handlers.append(handler)def __unsubscribeImpl(self, handler):assert not self.__emittingself.__handlers.remove(handler)def __applyChanges(self):assert not self.__emittingfor action, param in self.__deferred:action(param)self.__deferred = []def subscribe(self, handler):if self.__emitting:self.__deferred.append((self.__subscribeImpl, handler))elif handler not in self.__handlers:self.__subscribeImpl(handler)def unsubscribe(self, handler):if self.__emitting:self.__deferred.append((self.__unsubscribeImpl, handler))else:self.__unsubscribeImpl(handler)def emit(self, *args, **kwargs):try:self.__emitting += 1for handler in self.__handlers:handler(*args, **kwargs)finally:self.__emitting -= 1if not self.__emitting:self.__applyChanges()

假设我们定义了一个策略,这个策略需要订阅处理多个事件。那么我们就可以声明一个dispatcher来实现事件驱动的循环。我们看看Dispatcher的代码实现,它定义了一个subject列表用于管理事件生产者,并且通过addSubject支持将Subject加入到Dispatcher,并根据调度优先级插入到相应位置。比如交易下单成功/失败事件的优先级更高。最后在run()方法内循环分发Subjects产生的事件。

从代码上看,这个Dispatcher为策略提供了多个事件来源的管理与同步功能。策略只需添加各个Subject,然后启动Dispatcher,即可在一个线程内接收来自所有Subject的事件。


# This class is responsible for dispatching events from multiple subjects, synchronizing them if necessary.
class Dispatcher(object):def __init__(self):self.__subjects = []self.__stop = Falseself.__startEvent = observer.Event()self.__idleEvent = observer.Event()self.__currDateTime = None# Returns the current event datetime. It may be None for events from realtime subjects.def getCurrentDateTime(self):return self.__currDateTimedef getStartEvent(self):return self.__startEventdef getIdleEvent(self):return self.__idleEventdef stop(self):self.__stop = Truedef getSubjects(self):return self.__subjectsdef addSubject(self, subject):# Skip the subject if it was already added.if subject in self.__subjects:return# If the subject has no specific dispatch priority put it right at the end.if subject.getDispatchPriority() is dispatchprio.LAST:self.__subjects.append(subject)else:# Find the position according to the subject's priority.pos = 0for s in self.__subjects:if s.getDispatchPriority() is dispatchprio.LAST or subject.getDispatchPriority() < s.getDispatchPriority():breakpos += 1self.__subjects.insert(pos, subject)subject.onDispatcherRegistered(self)# Return True if events were dispatched.def __dispatchSubject(self, subject, currEventDateTime):ret = False# Dispatch if the datetime is currEventDateTime of if its a realtime subject.if not subject.eof() and subject.peekDateTime() in (None, currEventDateTime):ret = subject.dispatch() is Truereturn ret# Returns a tuple with booleans# 1: True if all subjects hit eof# 2: True if at least one subject dispatched events.def __dispatch(self):smallestDateTime = Noneeof = TrueeventsDispatched = False# Scan for the lowest datetime.for subject in self.__subjects:if not subject.eof():eof = FalsesmallestDateTime = utils.safe_min(smallestDateTime, subject.peekDateTime())# Dispatch realtime subjects and those subjects with the lowest datetime.if not eof:self.__currDateTime = smallestDateTimefor subject in self.__subjects:if self.__dispatchSubject(subject, smallestDateTime):eventsDispatched = Truereturn eof, eventsDispatcheddef run(self):try:for subject in self.__subjects:subject.start()self.__startEvent.emit()while not self.__stop:eof, eventsDispatched = self.__dispatch()if eof:self.__stop = Trueelif not eventsDispatched:self.__idleEvent.emit()finally:# There are no more events.self.__currDateTime = Nonefor subject in self.__subjects:subject.stop()for subject in self.__subjects:subject.join()

在EventProfiler这个文件中,PyAlgoTrade实现了四个类,分别是Event、Profiler、Result、Predicate,这个主要是用来评估事件对策略的影响。

Predicate类是一个事件判断类,用于判断事件是否发生。Event类是一个事件类,用于存储事件的信息,包括事件发生前后的时间长度,以及事件发生前后的收益率。Profiler类是一个事件分析类,用于分析事件发生前后的收益率,以及事件发生前后的收益率的累积收益率。Results类是一个事件分析结果类,用于存储事件分析的结果,包括事件发生前后的收益率,以及事件发生前后的收益率的累积收益率。

Event类实现的也很巧妙:

1. 在初始化时,Event接受lookBack与lookForward两个参数,代表向前与向后获取数据的范围。

2. 创建一个大小为lookBack + lookForward + 1的数组来保存数据。初始时设为NaN(空值)。

3. 提供setValue方法设置指定时刻t的value值。将值保存至数组中正确的位置。

4. 提供getValue方法获取指定时刻t的值。从数组中取出正确位置的值。

举个例子,如果我们希望评估某个交易事件对收益率带来影响,或者基于某个事件的发生需要对策略进行调整。那么我们可以设置lookBack为5天,lookForward为3天。然后可以调用setValue方法设置过去5天与未来3天内该事件每天的值。之后可以调用getValues获得过去5天与未来3天这段时间内该事件的所有值进行分析,判断趋势等。

class Event(object):def __init__(self, lookBack, lookForward):assert(lookBack > 0)assert(lookForward > 0)self.__lookBack = lookBackself.__lookForward = lookForwardself.__values = np.empty((lookBack + lookForward + 1))self.__values[:] = np.NANdef __mapPos(self, t):assert(t >= -1*self.__lookBack and t <= self.__lookForward)return t + self.__lookBackdef isComplete(self):return not any(np.isnan(self.__values))def getLookBack(self):return self.__lookBackdef getLookForward(self):return self.__lookForwarddef setValue(self, t, value):if value is not None:pos = self.__mapPos(t)self.__values[pos] = valuedef getValue(self, t):pos = self.__mapPos(t)return self.__values[pos]def getValues(self):return self.__values

相比于我们如果要搭建一个基金组合的量化系统,pyAlgoTrade提供的框架基本已经足够了。我看了看火富牛提供的智能投研平台,因为不需要和股票一样有实时行情的数据,再加上策略定制、效果回测都更侧重于长期表现,所以系统的实现难度也会大大降低。尤其是对私募基金而言,拿到的披露数据更少。所以在技术指标这块基本都是固定的。指标主要分两块:基金的业绩和风险评估指标(滚动收益率、区间胜率、年化波动率、夏普比率、最大回撤和回补期等等)、以及基于客户持仓计算组合净值曲线(考虑到加仓、减仓、红利再投等场景,虽然听起来很复杂,但梳理后其实还好,主要难点还是在于对净值缺失,净值频率不一致的情况处理)。这一块我就不赘述了。

量化交易框架开发实践(二)相关推荐

  1. ASP.NET MVC5 网站开发实践(二) Member区域 - 用户部分(2)用户登录、注销

    上次实现了用户注册,这次来实现用户登录,用到IAuthenticationManager的SignOut.SignIn方法和基于声明的标识.最后修改用户注册代码实现注册成功后直接登录. 目录: ASP ...

  2. 【AIOQuant量化交易框架】第1讲-高频交易介绍

    1.什么是高频交易 一提到高频交易,对于大部人来说,高频交易是比较神秘的.在大部分人的认知里,高频交易有超强的盈利能力,堪比印钞机:纯粹靠交易赚钱,有着神秘的数学模型和尖端科技,精准的预测市场走势,带 ...

  3. PHP框架开发:二、从何处开始?URL

    PHP框架开发:二.从何处开始?URL 注:草稿性质的文章,会不断进行修改完善,故请抱着怀疑的态度阅读 所谓万事开头难,我们的框架应该从何处开始"着笔"呢? 让我们来分析一下... ...

  4. python量化交易的框架_python量化交易框架easyquant试用体会

    在github上发现一个python写的,看上去简单实用的量化交易框架easyquant,作者是在他写的easytrader上实现了自动读取行情和交易登入,初步试验了雪球登入,效果还不错. 安装这个框 ...

  5. 用 Python 写了个简单的股票量化交易框架

    原文链接:用python写了个简单的股票量化交易框架 摘抄他人的文章,方便存个底. 集成了以前写的 [easytrader]( http://github.com/shidenggui/easytra ...

  6. python 量化交易 框架 开源_Hikyuu首页、文档和下载 - 基于 C++/Python 的开源量化交易研究框架 - OSCHINA - 中文开源技术交流社区...

    Hikyuu Quant Framework是一款基于C++/Python的开源量化交易研究框架,用于策略分析及回测.其核心思想基于当前成熟的系统化交易方法,将整个系统化交易抽象为由市场环境判断策略. ...

  7. iOS-MultipeerConnectivity框架开发(二)

    原文地址:http://www.appcoda.com/intro-ios-multipeer-connectivity-programming/ 编者:在iOS-MultipeerConnectiv ...

  8. python股票量化交易入门到实践_量化资料学习《Python与量化投资从基础到实战》+《量化交易之路用Python做股票量化分析》+《组织与管理研究的实证方法第2版》...

    我们需要利用Python进行数据分析的指南,有大量的关于数据处理分析的应用,重点学习如何高效地利用Python解决投资策略问题,推荐学习<Python与量化投资从基础到实战>等电子资料. ...

  9. 股票自动交易python下单接口_用 Python 写了个简单的股票量化交易框架

    因为行情的获取用到了 async / await 所以暂时只支持 Python3.5+ 交易 支持 佣金宝 和 华泰 两家券商的自动登录和买卖. 行情 使用的是新浪的免费行情,大概一秒钟推送一次 所有 ...

最新文章

  1. Spring-Aop-注解实现
  2. PHP Deprecated: Function ereg_replace() is deprecated in 的解决方法
  3. Detection of Extraterrestrial KMP匹配 重复k次子串 好题
  4. Java 8中的默认方法(Defender方法)简介
  5. 实战06_SSM整合ActiveMQ支持多种类型消息
  6. 关于在 matlab 中使用 ode45 算出拉格朗日方程中的关节加速度
  7. MySQL slowlog 统计_mysql slow log 简单统计
  8. DeepStream插件Gstreamer(一):插件汇总
  9. Java开发框架——Struts框架
  10. app_offline.htm的作用
  11. python中列表的嵌套是指列表的元素是另一个列表_Python实现嵌套列表去重方法示例...
  12. Quartz 配置详解
  13. 转:MediaCoder H.264格式编码参数设置及详解
  14. Pickit 3 Programmer使用说明及 烧写程序步骤
  15. http状态404 vscode_HTTP常见状态码(404、400、500)等错误
  16. Tumblr 的架构演进过程
  17. u3d计算机获取键盘输入,Unity 中的键盘输入
  18. 白噪声校验matlab,白噪声的测试MATLAB程序.doc
  19. uniapp:uni_modules组件开发与发布
  20. 展讯平台 LCD(Mipi)移植步骤及问题归纳

热门文章

  1. DropWizard入门实践
  2. Docker 搭建「个人网盘」,放弃 Pandownload!
  3. win7一直卡在正在启动windows
  4. nCode:GlyphWorks案例教程一
  5. 高级程序员最爱用的8款代码编辑器,你用哪几个?
  6. java需求设计文档模板下载_设计Java
  7. HTML 访问本地 Markdown 文件
  8. Open-iscs源码分析之---iscsiadm.c
  9. CAN总线数据转成多路RS485信号方式,竣达技术来给您解析
  10. HTML进阶--如何使用Sublime Text来创建网页 -------16岁的小前端