ACE-Streams架构简介及应用
一概述
Streams框架是管道和过滤构架模式的一种实现,主要应用于处理数据流的系统。其实现以Task框架为基础。Task框架有两个特性非常适用于Streams框架:一是Task框架可用于创建独立线程的并发环境,这适合应用于ACE Streams框架中的主动过滤器;二是Task框架有统一的数据传输结果——消息队列,这适用于Streams框架中的管道。
二ACE_Task类
这里主要介绍与Streams框架相关的部分。
成员变量
1 Task_T.h 2 3 class ACE_Task : public ACE_Task_Base 4 5 { 6 7 /// Queue of messages on the ACE_Task.. 8 9 ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> *msg_queue_; 10 11 12 13 /// true if should delete Message_Queue, false otherwise. 14 15 bool delete_msg_queue_; 16 17 18 19 /// Back-pointer to the enclosing module. 20 21 ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod_; 22 23 24 25 /// Pointer to adjacent ACE_Task. 26 27 ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *next_; 28 29 }
put函数
在ACE_Task框架中,put函数没有实际作用,在默认情况下,该函数没有执行任何操作,仅仅返回0.但是在Streams框架中,put函数与put_next函数结合起来可以实现数据在过滤器间的传输。如果put函数将数据保存在消息队列中,通过独立的线程来处理这些消息,那么它将成为一个主动过滤器;反之,如果put函数直接对数据进行处理,然后交给下一个过滤器,那么它就是一个被动过滤器。
1 Task.cpp 2 3 /// Default ACE_Task put routine. 4 5 int 6 7 ACE_Task_Base::put (ACE_Message_Block *, ACE_Time_Value *) 8 9 { 10 11 ACE_TRACE ("ACE_Task_Base::put"); 12 13 return 0; 14 15 }
put_next函数
如果在数据处理流水线有下一个过滤器,那么put_next函数用于将数据交给下一个过滤器处理。如下:
1 Task_T.inl 2 3 // Transfer msg to the next ACE_Task. 4 5 6 7 template <ACE_SYNCH_DECL, class TIME_POLICY> ACE_INLINE int 8 9 ACE_Task<ACE_SYNCH_USE, TIME_POLICY>::put_next (ACE_Message_Block *msg, ACE_Time_Value *tv) 10 11 { 12 13 ACE_TRACE ("ACE_Task<ACE_SYNCH_USE, TIME_POLICY>::put_next"); 14 15 return this->next_ == 0 ? -1 : this->next_->put (msg, tv); 16 17 }
next_指向的是有序过滤器的下一个过滤器。通过put_next函数,可以将数据交给下一个过滤器处理。
Streams框架应用示例
在这个示例中,我们将一个数据流的处理分为4步,在Streams框架中,将每个处理步骤称为一个Module:
- Logrec_Reader:从文件中读取记录,然后交给下一个步骤。
- Logrec_Timer:在记录尾部加上“format_data”
- Logrec_Suffix:在记录尾部加上一个后缀——suffix
- Logrec_Write:将记录显示在终端上
Logrec_Reader类
其是ACE_Task的子类,是一个主动对象类,有独立的控制线程,线程处理函数是svc。在Streams框架中,Logrec_Reader类是一个主动过滤器,代码如下:
1 class Logrec_Reader : public ACE_Task<ACE_MT_SYNCH> 2 { 3 private: 4 ifstream fin; //标准输入流 5 public: 6 Logrec_Reader(ACE_TString logfile) 7 { 8 fin.open(logfile.c_str()); //ACE_TString.c_str() 转换为char 9 } 10 virtual int open (void *) 11 { 12 return activate(); 13 } 14 15 virtual int svc() 16 { 17 ACE_Message_Block *mblk; 18 int len = 0; 19 const int LineSize = 256; 20 char file_buf[LineSize]; 21 22 while(!fin.eof()) 23 { 24 fin.getline(file_buf, LineSize); 25 len = ACE_OS::strlen(file_buf); 26 ACE_NEW_RETURN(mblk, ACE_Message_Block(len + 200), 0); 27 if (file_buf[len - 1] == '\r') 28 { 29 len = len - 1; 30 } 31 mblk->copy(file_buf, len); 32 // 通过put_next函数,将消息传递给下一个过滤器 33 put_next(mblk); 34 } 35 //构造一个MB_STOP消息 36 ACE_NEW_RETURN(mblk, ACE_Message_Block (0, ACE_Message_Block::MB_STOP), 0); 37 put_next(mblk); 38 fin.close(); 39 ACE_DEBUG((LM_DEBUG, "read svc return. \n")); 40 return 0; 41 } 42 };
Logrec_Timer类
也是ACE_Task的子类,但是它不是主动对象类,没有创建独立线程。其实现了put函数,这个函数被它的上一个过滤器(Logrec_Reader)调用,并且数据直接在这个函数中处理。
这里for循环用于处理消息链表,在示例中并没有使用链表因此for循环只会执行一次。
1 class Logrec_Timer : public ACE_Task<ACE_SYNCH> 2 { 3 private: 4 void format_data(ACE_Message_Block *mblk) 5 { 6 char *msg = mblk->data_block()->base(); 7 strcat(msg, "format_data"); 8 } 9 public: 10 virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *) 11 { 12 for (ACE_Message_Block *temp = mblk; 13 temp != 0; temp = temp->cont()) 14 { 15 if (temp->msg_type() != ACE_Message_Block::MB_STOP) 16 { 17 format_data(temp); 18 } 19 } 20 return put_next(mblk); 21 } 22 };
Logrec_Suffix类
类似Logrec_Timer
1 class Logrec_Suffix : public ACE_Task<ACE_SYNCH> 2 { 3 public: 4 void suffix(ACE_Message_Block *mblk) 5 { 6 char *msg = mblk->data_block()->base(); 7 strcat(msg, "suffix\n"); 8 } 9 virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *) 10 { 11 for (ACE_Message_Block *temp = mblk; 12 temp != 0; temp = temp->cont()) 13 { 14 if (temp->msg_type() != ACE_Message_Block::MB_STOP) 15 { 16 suffix(temp); 17 } 18 } 19 return put_next(mblk); 20 } 21 };
Logrec_Write类
这里put函数由上一个过滤器(Logrec_Suffix)调用,其并没有对数据进行实际处理,只是将数据放入队列中,由线程独立处理。
1 class Logrec_Write : public ACE_Task<ACE_SYNCH> 2 { 3 public: 4 virtual int open(void*) 5 { 6 ACE_DEBUG((LM_DEBUG, "Logrec_Writer.\n")); 7 return activate(); 8 } 9 10 virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *to) 11 { 12 return putq(mblk, to); 13 } 14 15 virtual int svc() 16 { 17 int stop = 0; 18 for (ACE_Message_Block *mb; !stop && getq(mb) != -1;) 19 { 20 if (mb->msg_type() == ACE_Message_Block::MB_STOP) 21 { 22 stop = 1; 23 } 24 else{ 25 ACE_DEBUG((LM_DEBUG, "%s", mb->base())); 26 } 27 put_next(mb); 28 } 29 return 0; 30 } 31 };
Main
这里讲ACE_Module放入Streams中,ACE_Module才是真正数据处理的Module。ACE_Streams类有两个数据成员:stream_head_和stream_tail_,它们指向ACE_Module链表的首尾。对于每个Streams,其默认带有首尾两个Module,而后可以通过push将数据处理的Module放入执行链表中。每个Module包含两个Task,分别为读Task和写Task。在示例中仅注册了写Task,这些Module和Task通过next指针构成一个有序的串。
这里注意push有顺序要求,最后push即栈顶的为先执行的Module。
1 int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) 2 { 3 if (argc != 2) 4 { 5 ACE_ERROR_RETURN((LM_ERROR, "usage: %s logfile\n", argv[0]), 1); 6 } 7 8 ACE_TString logfile (argv[1]); 9 10 ACE_Stream<ACE_SYNCH> stream; 11 12 13 ACE_Module<ACE_MT_SYNCH> *module[4]; 14 module[0] = new ACE_Module<ACE_MT_SYNCH>("Reader", new Logrec_Reader(logfile), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER); 15 module[1] = new ACE_Module<ACE_MT_SYNCH>("Formatter", new Logrec_Timer(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER); 16 module[2] = new ACE_Module<ACE_MT_SYNCH>("Separator", new Logrec_Suffix(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER); 17 module[3] = new ACE_Module<ACE_MT_SYNCH>("Writer", new Logrec_Write(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER); 18 19 for ( int i = 3; i >= 0; --i ) 20 { 21 if (stream.push(module[i]) == -1) 22 { 23 ACE_ERROR_RETURN((LM_ERROR, "push %s module into stream failed.\n", module[i]->name()), 1); 24 } 25 ACE_DEBUG((LM_DEBUG, "push %s module into stream success. \n", module[i]->name())); 26 } 27 ACE_Thread_Manager::instance()->wait(); 28 }
附完整代码及结果:https://github.com/ShiningZhang/ACE_Learning/tree/master/streams
1 /************************************************************************* 2 > File Name: logrec.cpp 3 > Author: 4 > Mail: 5 > Created Time: Fri 13 Oct 2017 04:19:39 PM CST 6 ************************************************************************/ 7 #include <fstream> 8 #include <ace/Synch.h> 9 #include <ace/Task.h> 10 #include <ace/Message_Block.h> 11 #include <ace/Stream.h> 12 #include "ace/Thread_Manager.h" 13 #include <ace/Time_Value.h> 14 #include <ace/Module.h> 15 16 using namespace std; 17 18 class Logrec_Reader : public ACE_Task<ACE_MT_SYNCH> 19 { 20 private: 21 ifstream fin; //标准输入流 22 public: 23 Logrec_Reader(ACE_TString logfile) 24 { 25 fin.open(logfile.c_str()); //ACE_TString.c_str() 转换为char 26 } 27 virtual int open (void *) 28 { 29 return activate(); 30 } 31 32 virtual int svc() 33 { 34 ACE_Message_Block *mblk; 35 int len = 0; 36 const int LineSize = 256; 37 char file_buf[LineSize]; 38 39 while(!fin.eof()) 40 { 41 fin.getline(file_buf, LineSize); 42 len = ACE_OS::strlen(file_buf); 43 ACE_NEW_RETURN(mblk, ACE_Message_Block(len + 200), 0); 44 if (file_buf[len - 1] == '\r') 45 { 46 len = len - 1; 47 } 48 mblk->copy(file_buf, len); 49 // 通过put_next函数,将消息传递给下一个过滤器 50 put_next(mblk); 51 } 52 //构造一个MB_STOP消息 53 ACE_NEW_RETURN(mblk, ACE_Message_Block (0, ACE_Message_Block::MB_STOP), 0); 54 put_next(mblk); 55 fin.close(); 56 ACE_DEBUG((LM_DEBUG, "read svc return. \n")); 57 return 0; 58 } 59 }; 60 61 class Logrec_Timer : public ACE_Task<ACE_SYNCH> 62 { 63 private: 64 void format_data(ACE_Message_Block *mblk) 65 { 66 char *msg = mblk->data_block()->base(); 67 strcat(msg, "format_data"); 68 } 69 public: 70 virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *) 71 { 72 for (ACE_Message_Block *temp = mblk; 73 temp != 0; temp = temp->cont()) 74 { 75 if (temp->msg_type() != ACE_Message_Block::MB_STOP) 76 { 77 format_data(temp); 78 } 79 } 80 return put_next(mblk); 81 } 82 }; 83 84 class Logrec_Suffix : public ACE_Task<ACE_SYNCH> 85 { 86 public: 87 void suffix(ACE_Message_Block *mblk) 88 { 89 char *msg = mblk->data_block()->base(); 90 strcat(msg, "suffix\n"); 91 } 92 virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *) 93 { 94 for (ACE_Message_Block *temp = mblk; 95 temp != 0; temp = temp->cont()) 96 { 97 if (temp->msg_type() != ACE_Message_Block::MB_STOP) 98 { 99 suffix(temp); 100 } 101 } 102 return put_next(mblk); 103 } 104 }; 105 106 class Logrec_Write : public ACE_Task<ACE_SYNCH> 107 { 108 public: 109 virtual int open(void*) 110 { 111 ACE_DEBUG((LM_DEBUG, "Logrec_Writer.\n")); 112 return activate(); 113 } 114 115 virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *to) 116 { 117 return putq(mblk, to); 118 } 119 120 virtual int svc() 121 { 122 int stop = 0; 123 for (ACE_Message_Block *mb; !stop && getq(mb) != -1;) 124 { 125 if (mb->msg_type() == ACE_Message_Block::MB_STOP) 126 { 127 stop = 1; 128 } 129 else{ 130 ACE_DEBUG((LM_DEBUG, "%s", mb->base())); 131 } 132 put_next(mb); 133 } 134 return 0; 135 } 136 }; 137 138 int ACE_TMAIN(int argc, ACE_TCHAR *argv[]) 139 { 140 if (argc != 2) 141 { 142 ACE_ERROR_RETURN((LM_ERROR, "usage: %s logfile\n", argv[0]), 1); 143 } 144 145 ACE_TString logfile (argv[1]); 146 147 ACE_Stream<ACE_SYNCH> stream; 148 149 150 ACE_Module<ACE_MT_SYNCH> *module[4]; 151 module[0] = new ACE_Module<ACE_MT_SYNCH>("Reader", new Logrec_Reader(logfile), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER); 152 module[1] = new ACE_Module<ACE_MT_SYNCH>("Formatter", new Logrec_Timer(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER); 153 module[2] = new ACE_Module<ACE_MT_SYNCH>("Separator", new Logrec_Suffix(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER); 154 module[3] = new ACE_Module<ACE_MT_SYNCH>("Writer", new Logrec_Write(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER); 155 156 for ( int i = 3; i >= 0; --i ) 157 { 158 if (stream.push(module[i]) == -1) 159 { 160 ACE_ERROR_RETURN((LM_ERROR, "push %s module into stream failed.\n", module[i]->name()), 1); 161 } 162 ACE_DEBUG((LM_DEBUG, "push %s module into stream success. \n", module[i]->name())); 163 } 164 ACE_Thread_Manager::instance()->wait(); 165 }
test:
转载于:https://www.cnblogs.com/zl1991/p/7662993.html
ACE-Streams架构简介及应用相关推荐
- 大数据分类和架构简介
第 1 部分: 大数据分类和架构简介 概述 大数据可通过许多方式来存储.获取.处理和分析.每个大数据来源都有不同的特征,包括数据的频率.量.速度.类型和真实性.处理并存储大数据时,会涉及到更多维度,比 ...
- LoadRunner系统架构简介与运行原理
1.LoadRunner系统架构简介 LoadRunner是通过创建虚拟用户来代替真实实际用户来操作客户端软件比如Internet Explorer,来向IIS.Apache等Web服务器发送HTTP ...
- 单线程与多线程网络程序架构简介
文章目录 1 单线程与多线程网络程序架构简介 1.1 服务端单线程处理多客户端 1.2 服务端多进程多端口处理多客户端 1.3 服务端多线程单端口分组处理多客户端 1.4 服务端多线程多端口分组处理多 ...
- MVC架构简介及其测试策略
MVC架构简介及其测试策略 https://www.cnblogs.com/rd-ddddd/p/6959232.html 转载于:https://www.cnblogs.com/highpoint ...
- Mysql逻辑架构简介
Mysql逻辑架构简介 整体架构图 和其它数据库相比,MySQL有点与众不同,它的架构可以在多种不同场景中应用并发挥良好作用.主要体现在存储引擎的架构上,插件式的存储引擎架构将查询处理和其它的系统任务 ...
- mme 服务器位置,NB-IOT的网络架构简介
原标题:NB-IOT的网络架构简介 一.NB-iot网络架构 整个NB-IoT网络架构分为五个部分:终端,无线网络,核心网络(EPC),IoT支持平台和应用服务器. 二.NB-iot网络架构简介 1. ...
- Xtensa处理器架构基础-架构简介与常见寄存器
1.架构简介 应用的发展对处理器的需求越来越多样化.与通用处理器架构相比,Xtensa架构的特色在于它是可配置可扩展的微处理器架构.通俗地讲,将其与常见的ARM架构相比,Xtensa架构的特色在于可以 ...
- ARMv8-a架构简介
1. 前言 ARMv8(当前只有A系列,即ARMv8-A)架构,是ARM公司为满足新需求而重新设计的一个架构,是近20年来,ARM架构变动最大的一次.它引入的Execution State.Excep ...
- a55计算机主板,A55架构简介与A55主板赏析
A55架构简介与主板赏析: A55 FCH芯片架构 FCH芯片的角色大概相当于以往的南桥芯片,自身并没有整合显示核心,而是通过UMI接口(而不是HT总线)连接APU整合的显示核心进行输出.A55 FC ...
最新文章
- 【职场】面试中最难回答频率最高的70个问题
- SAP ABAP 客户退出
- modelandview 可以返回html么_Python: 爬虫网页解析工具lxml.html(一)
- 携程elong相继牵手支付宝转“危”为“机”
- 操作系统复习题+最终版
- 三招快速重新打开被关闭的Chrome标签页
- 03-linux下离线安装R环境
- php执行cmd/shell命令 木马小后门
- CCF201903-4 消息传递接口(100分)【模拟】
- 创建对象 --- 构造函数模式
- fn:startsWith()函数
- 大规模业务技术架构设计与战术(架构师必看)
- 如何将html转化成mp4,怎么把mov转换成mp4格式?方法很简单,1分钟完成转换
- 计算机 及其 应用系统
- 2020伊始,我结束了人生的两个轮回
- 经典怀旧FCgame红白机小游戏在线网页合集版畅玩HTML网站源码
- Basler Blaze-101开发实践(1)——实时采图
- 大数据开发第一站ODS篇
- 四边形围栅栏c语言长度,不同结构围油栏拦油特性数值的研究.pdf
- java未来三年的工作计划_未来三年工作计划精选.doc
热门文章
- python语言入门n-Python基础语法学习笔记
- python怎么安装matplotlib-[Python]一步步安装numpy,matplotlib
- python在线工具-在线 Python运行工具
- python之父去面试-Python面试题之Python的Super方法
- python浪漫代码-Python打造浪漫的心形,助你情人节表白成功!
- python打开界面是什么样的-python学习笔记(图形用户界面)
- python3安装pip3-python3安装pip3的实例步骤
- 学会python之后-python学会后做什么
- 2020年python工资一般多少钱-2020年Python发展前景如何呢?
- python if语句多个条件-python – if / elif语句的多个条件