一概述

Streams框架是管道和过滤构架模式的一种实现,主要应用于处理数据流的系统。其实现以Task框架为基础。Task框架有两个特性非常适用于Streams框架:一是Task框架可用于创建独立线程的并发环境,这适合应用于ACE Streams框架中的主动过滤器;二是Task框架有统一的数据传输结果——消息队列,这适用于Streams框架中的管道。

二ACE_Task类

这里主要介绍与Streams框架相关的部分。

成员变量

 1 Task_T.h
 2
 3 class ACE_Task : public ACE_Task_Base
 4
 5 {
 6
 7 /// Queue of messages on the ACE_Task..
 8
 9   ACE_Message_Queue<ACE_SYNCH_USE, TIME_POLICY> *msg_queue_;
10
11
12
13   /// true if should delete Message_Queue, false otherwise.
14
15   bool delete_msg_queue_;
16
17
18
19   /// Back-pointer to the enclosing module.
20
21   ACE_Module<ACE_SYNCH_USE, TIME_POLICY> *mod_;
22
23
24
25   /// Pointer to adjacent ACE_Task.
26
27   ACE_Task<ACE_SYNCH_USE, TIME_POLICY> *next_;
28
29 }

put函数

在ACE_Task框架中,put函数没有实际作用,在默认情况下,该函数没有执行任何操作,仅仅返回0.但是在Streams框架中,put函数与put_next函数结合起来可以实现数据在过滤器间的传输。如果put函数将数据保存在消息队列中,通过独立的线程来处理这些消息,那么它将成为一个主动过滤器;反之,如果put函数直接对数据进行处理,然后交给下一个过滤器,那么它就是一个被动过滤器。

 1 Task.cpp
 2
 3 /// Default ACE_Task put routine.
 4
 5 int
 6
 7 ACE_Task_Base::put (ACE_Message_Block *, ACE_Time_Value *)
 8
 9 {
10
11   ACE_TRACE ("ACE_Task_Base::put");
12
13   return 0;
14
15 }

put_next函数

如果在数据处理流水线有下一个过滤器,那么put_next函数用于将数据交给下一个过滤器处理。如下:

 1 Task_T.inl
 2
 3 // Transfer msg to the next ACE_Task.
 4
 5
 6
 7 template <ACE_SYNCH_DECL, class TIME_POLICY> ACE_INLINE int
 8
 9 ACE_Task<ACE_SYNCH_USE, TIME_POLICY>::put_next (ACE_Message_Block *msg, ACE_Time_Value *tv)
10
11 {
12
13   ACE_TRACE ("ACE_Task<ACE_SYNCH_USE, TIME_POLICY>::put_next");
14
15   return this->next_ == 0 ? -1 : this->next_->put (msg, tv);
16
17 }

next_指向的是有序过滤器的下一个过滤器。通过put_next函数,可以将数据交给下一个过滤器处理。

Streams框架应用示例

在这个示例中,我们将一个数据流的处理分为4步,在Streams框架中,将每个处理步骤称为一个Module:

  • Logrec_Reader:从文件中读取记录,然后交给下一个步骤。
  • Logrec_Timer:在记录尾部加上“format_data”
  • Logrec_Suffix:在记录尾部加上一个后缀——suffix
  • Logrec_Write:将记录显示在终端上

Logrec_Reader类

其是ACE_Task的子类,是一个主动对象类,有独立的控制线程,线程处理函数是svc。在Streams框架中,Logrec_Reader类是一个主动过滤器,代码如下:

 1 class Logrec_Reader : public ACE_Task<ACE_MT_SYNCH>
 2 {
 3 private:
 4     ifstream fin;  //标准输入流
 5 public:
 6     Logrec_Reader(ACE_TString logfile)
 7     {
 8         fin.open(logfile.c_str()); //ACE_TString.c_str() 转换为char
 9     }
10     virtual int open (void *)
11     {
12         return activate();
13     }
14
15     virtual int svc()
16     {
17         ACE_Message_Block *mblk;
18         int len = 0;
19         const int LineSize = 256;
20         char file_buf[LineSize];
21
22         while(!fin.eof())
23         {
24             fin.getline(file_buf, LineSize);
25             len = ACE_OS::strlen(file_buf);
26             ACE_NEW_RETURN(mblk, ACE_Message_Block(len + 200), 0);
27             if (file_buf[len - 1] == '\r')
28             {
29                 len = len - 1;
30             }
31             mblk->copy(file_buf, len);
32             // 通过put_next函数,将消息传递给下一个过滤器
33             put_next(mblk);
34         }
35         //构造一个MB_STOP消息
36         ACE_NEW_RETURN(mblk, ACE_Message_Block (0, ACE_Message_Block::MB_STOP), 0);
37         put_next(mblk);
38         fin.close();
39         ACE_DEBUG((LM_DEBUG, "read svc return. \n"));
40         return 0;
41     }
42 };

Logrec_Timer类

也是ACE_Task的子类,但是它不是主动对象类,没有创建独立线程。其实现了put函数,这个函数被它的上一个过滤器(Logrec_Reader)调用,并且数据直接在这个函数中处理。

这里for循环用于处理消息链表,在示例中并没有使用链表因此for循环只会执行一次。

 1 class Logrec_Timer : public ACE_Task<ACE_SYNCH>
 2 {
 3 private:
 4     void format_data(ACE_Message_Block *mblk)
 5     {
 6         char *msg = mblk->data_block()->base();
 7         strcat(msg, "format_data");
 8     }
 9 public:
10     virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *)
11     {
12         for (ACE_Message_Block *temp = mblk;
13             temp != 0; temp = temp->cont())
14         {
15             if (temp->msg_type() != ACE_Message_Block::MB_STOP)
16             {
17                 format_data(temp);
18             }
19         }
20         return put_next(mblk);
21     }
22 };

Logrec_Suffix类

类似Logrec_Timer

 1 class Logrec_Suffix : public ACE_Task<ACE_SYNCH>
 2 {
 3 public:
 4     void suffix(ACE_Message_Block *mblk)
 5     {
 6         char *msg = mblk->data_block()->base();
 7         strcat(msg, "suffix\n");
 8     }
 9     virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *)
10     {
11         for (ACE_Message_Block *temp = mblk;
12             temp != 0; temp = temp->cont())
13         {
14             if (temp->msg_type() != ACE_Message_Block::MB_STOP)
15             {
16                 suffix(temp);
17             }
18         }
19         return put_next(mblk);
20     }
21 };

Logrec_Write类

这里put函数由上一个过滤器(Logrec_Suffix)调用,其并没有对数据进行实际处理,只是将数据放入队列中,由线程独立处理。

 1 class Logrec_Write : public ACE_Task<ACE_SYNCH>
 2 {
 3 public:
 4     virtual int open(void*)
 5     {
 6         ACE_DEBUG((LM_DEBUG, "Logrec_Writer.\n"));
 7         return activate();
 8     }
 9
10     virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *to)
11     {
12         return putq(mblk, to);
13     }
14
15     virtual int svc()
16     {
17         int stop = 0;
18         for (ACE_Message_Block *mb; !stop && getq(mb) != -1;)
19         {
20             if (mb->msg_type() == ACE_Message_Block::MB_STOP)
21             {
22                 stop = 1;
23             }
24             else{
25                 ACE_DEBUG((LM_DEBUG, "%s", mb->base()));
26             }
27             put_next(mb);
28         }
29         return 0;
30     }
31 };

Main

这里讲ACE_Module放入Streams中,ACE_Module才是真正数据处理的Module。ACE_Streams类有两个数据成员:stream_head_和stream_tail_,它们指向ACE_Module链表的首尾。对于每个Streams,其默认带有首尾两个Module,而后可以通过push将数据处理的Module放入执行链表中。每个Module包含两个Task,分别为读Task和写Task。在示例中仅注册了写Task,这些Module和Task通过next指针构成一个有序的串。

这里注意push有顺序要求,最后push即栈顶的为先执行的Module。

 1 int ACE_TMAIN(int argc, ACE_TCHAR *argv[])
 2 {
 3     if (argc != 2)
 4     {
 5         ACE_ERROR_RETURN((LM_ERROR, "usage: %s logfile\n", argv[0]), 1);
 6     }
 7
 8     ACE_TString logfile (argv[1]);
 9
10     ACE_Stream<ACE_SYNCH> stream;
11
12
13     ACE_Module<ACE_MT_SYNCH> *module[4];
14     module[0] = new ACE_Module<ACE_MT_SYNCH>("Reader", new Logrec_Reader(logfile), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
15     module[1] = new ACE_Module<ACE_MT_SYNCH>("Formatter", new Logrec_Timer(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
16     module[2] = new ACE_Module<ACE_MT_SYNCH>("Separator", new Logrec_Suffix(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
17     module[3] = new ACE_Module<ACE_MT_SYNCH>("Writer", new Logrec_Write(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
18
19     for ( int i = 3; i >= 0; --i )
20     {
21         if (stream.push(module[i]) == -1)
22         {
23             ACE_ERROR_RETURN((LM_ERROR, "push %s module into stream failed.\n", module[i]->name()), 1);
24         }
25         ACE_DEBUG((LM_DEBUG, "push %s module into stream success. \n", module[i]->name()));
26     }
27     ACE_Thread_Manager::instance()->wait();
28 }

附完整代码及结果:https://github.com/ShiningZhang/ACE_Learning/tree/master/streams

  1 /*************************************************************************
  2     > File Name: logrec.cpp
  3     > Author:
  4     > Mail:
  5     > Created Time: Fri 13 Oct 2017 04:19:39 PM CST
  6  ************************************************************************/
  7  #include <fstream>
  8  #include <ace/Synch.h>
  9  #include <ace/Task.h>
 10  #include <ace/Message_Block.h>
 11  #include <ace/Stream.h>
 12  #include "ace/Thread_Manager.h"
 13  #include <ace/Time_Value.h>
 14  #include <ace/Module.h>
 15
 16 using namespace std;
 17
 18 class Logrec_Reader : public ACE_Task<ACE_MT_SYNCH>
 19 {
 20 private:
 21     ifstream fin;  //标准输入流
 22 public:
 23     Logrec_Reader(ACE_TString logfile)
 24     {
 25         fin.open(logfile.c_str()); //ACE_TString.c_str() 转换为char
 26     }
 27     virtual int open (void *)
 28     {
 29         return activate();
 30     }
 31
 32     virtual int svc()
 33     {
 34         ACE_Message_Block *mblk;
 35         int len = 0;
 36         const int LineSize = 256;
 37         char file_buf[LineSize];
 38
 39         while(!fin.eof())
 40         {
 41             fin.getline(file_buf, LineSize);
 42             len = ACE_OS::strlen(file_buf);
 43             ACE_NEW_RETURN(mblk, ACE_Message_Block(len + 200), 0);
 44             if (file_buf[len - 1] == '\r')
 45             {
 46                 len = len - 1;
 47             }
 48             mblk->copy(file_buf, len);
 49             // 通过put_next函数,将消息传递给下一个过滤器
 50             put_next(mblk);
 51         }
 52         //构造一个MB_STOP消息
 53         ACE_NEW_RETURN(mblk, ACE_Message_Block (0, ACE_Message_Block::MB_STOP), 0);
 54         put_next(mblk);
 55         fin.close();
 56         ACE_DEBUG((LM_DEBUG, "read svc return. \n"));
 57         return 0;
 58     }
 59 };
 60
 61 class Logrec_Timer : public ACE_Task<ACE_SYNCH>
 62 {
 63 private:
 64     void format_data(ACE_Message_Block *mblk)
 65     {
 66         char *msg = mblk->data_block()->base();
 67         strcat(msg, "format_data");
 68     }
 69 public:
 70     virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *)
 71     {
 72         for (ACE_Message_Block *temp = mblk;
 73             temp != 0; temp = temp->cont())
 74         {
 75             if (temp->msg_type() != ACE_Message_Block::MB_STOP)
 76             {
 77                 format_data(temp);
 78             }
 79         }
 80         return put_next(mblk);
 81     }
 82 };
 83
 84 class Logrec_Suffix : public ACE_Task<ACE_SYNCH>
 85 {
 86 public:
 87     void suffix(ACE_Message_Block *mblk)
 88     {
 89         char *msg = mblk->data_block()->base();
 90         strcat(msg, "suffix\n");
 91     }
 92     virtual int put(ACE_Message_Block *mblk, ACE_Time_Value *)
 93     {
 94         for (ACE_Message_Block *temp = mblk;
 95             temp != 0; temp = temp->cont())
 96         {
 97             if (temp->msg_type() != ACE_Message_Block::MB_STOP)
 98             {
 99                 suffix(temp);
100             }
101         }
102         return put_next(mblk);
103     }
104 };
105
106 class Logrec_Write : public ACE_Task<ACE_SYNCH>
107 {
108 public:
109     virtual int open(void*)
110     {
111         ACE_DEBUG((LM_DEBUG, "Logrec_Writer.\n"));
112         return activate();
113     }
114
115     virtual int put (ACE_Message_Block *mblk, ACE_Time_Value *to)
116     {
117         return putq(mblk, to);
118     }
119
120     virtual int svc()
121     {
122         int stop = 0;
123         for (ACE_Message_Block *mb; !stop && getq(mb) != -1;)
124         {
125             if (mb->msg_type() == ACE_Message_Block::MB_STOP)
126             {
127                 stop = 1;
128             }
129             else{
130                 ACE_DEBUG((LM_DEBUG, "%s", mb->base()));
131             }
132             put_next(mb);
133         }
134         return 0;
135     }
136 };
137
138 int ACE_TMAIN(int argc, ACE_TCHAR *argv[])
139 {
140     if (argc != 2)
141     {
142         ACE_ERROR_RETURN((LM_ERROR, "usage: %s logfile\n", argv[0]), 1);
143     }
144
145     ACE_TString logfile (argv[1]);
146
147     ACE_Stream<ACE_SYNCH> stream;
148
149
150     ACE_Module<ACE_MT_SYNCH> *module[4];
151     module[0] = new ACE_Module<ACE_MT_SYNCH>("Reader", new Logrec_Reader(logfile), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
152     module[1] = new ACE_Module<ACE_MT_SYNCH>("Formatter", new Logrec_Timer(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
153     module[2] = new ACE_Module<ACE_MT_SYNCH>("Separator", new Logrec_Suffix(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
154     module[3] = new ACE_Module<ACE_MT_SYNCH>("Writer", new Logrec_Write(), 0, 0, ACE_Module<ACE_SYNCH>::M_DELETE_READER);
155
156     for ( int i = 3; i >= 0; --i )
157     {
158         if (stream.push(module[i]) == -1)
159         {
160             ACE_ERROR_RETURN((LM_ERROR, "push %s module into stream failed.\n", module[i]->name()), 1);
161         }
162         ACE_DEBUG((LM_DEBUG, "push %s module into stream success. \n", module[i]->name()));
163     }
164     ACE_Thread_Manager::instance()->wait();
165 }

test:

转载于:https://www.cnblogs.com/zl1991/p/7662993.html

ACE-Streams架构简介及应用相关推荐

  1. 大数据分类和架构简介

    第 1 部分: 大数据分类和架构简介 概述 大数据可通过许多方式来存储.获取.处理和分析.每个大数据来源都有不同的特征,包括数据的频率.量.速度.类型和真实性.处理并存储大数据时,会涉及到更多维度,比 ...

  2. LoadRunner系统架构简介与运行原理

    1.LoadRunner系统架构简介 LoadRunner是通过创建虚拟用户来代替真实实际用户来操作客户端软件比如Internet Explorer,来向IIS.Apache等Web服务器发送HTTP ...

  3. 单线程与多线程网络程序架构简介

    文章目录 1 单线程与多线程网络程序架构简介 1.1 服务端单线程处理多客户端 1.2 服务端多进程多端口处理多客户端 1.3 服务端多线程单端口分组处理多客户端 1.4 服务端多线程多端口分组处理多 ...

  4. MVC架构简介及其测试策略

    MVC架构简介及其测试策略  https://www.cnblogs.com/rd-ddddd/p/6959232.html 转载于:https://www.cnblogs.com/highpoint ...

  5. Mysql逻辑架构简介

    Mysql逻辑架构简介 整体架构图 和其它数据库相比,MySQL有点与众不同,它的架构可以在多种不同场景中应用并发挥良好作用.主要体现在存储引擎的架构上,插件式的存储引擎架构将查询处理和其它的系统任务 ...

  6. mme 服务器位置,NB-IOT的网络架构简介

    原标题:NB-IOT的网络架构简介 一.NB-iot网络架构 整个NB-IoT网络架构分为五个部分:终端,无线网络,核心网络(EPC),IoT支持平台和应用服务器. 二.NB-iot网络架构简介 1. ...

  7. Xtensa处理器架构基础-架构简介与常见寄存器

    1.架构简介 应用的发展对处理器的需求越来越多样化.与通用处理器架构相比,Xtensa架构的特色在于它是可配置可扩展的微处理器架构.通俗地讲,将其与常见的ARM架构相比,Xtensa架构的特色在于可以 ...

  8. ARMv8-a架构简介

    1. 前言 ARMv8(当前只有A系列,即ARMv8-A)架构,是ARM公司为满足新需求而重新设计的一个架构,是近20年来,ARM架构变动最大的一次.它引入的Execution State.Excep ...

  9. a55计算机主板,A55架构简介与A55主板赏析

    A55架构简介与主板赏析: A55 FCH芯片架构 FCH芯片的角色大概相当于以往的南桥芯片,自身并没有整合显示核心,而是通过UMI接口(而不是HT总线)连接APU整合的显示核心进行输出.A55 FC ...

最新文章

  1. 【职场】面试中最难回答频率最高的70个问题
  2. SAP ABAP 客户退出
  3. modelandview 可以返回html么_Python: 爬虫网页解析工具lxml.html(一)
  4. 携程elong相继牵手支付宝转“危”为“机”
  5. 操作系统复习题+最终版
  6. 三招快速重新打开被关闭的Chrome标签页
  7. 03-linux下离线安装R环境
  8. php执行cmd/shell命令 木马小后门
  9. CCF201903-4 消息传递接口(100分)【模拟】
  10. 创建对象 --- 构造函数模式
  11. fn:startsWith()函数
  12. 大规模业务技术架构设计与战术(架构师必看)
  13. 如何将html转化成mp4,怎么把mov转换成mp4格式?方法很简单,1分钟完成转换
  14. 计算机 及其 应用系统
  15. 2020伊始,我结束了人生的两个轮回
  16. 经典怀旧FCgame红白机小游戏在线网页合集版畅玩HTML网站源码
  17. Basler Blaze-101开发实践(1)——实时采图
  18. 大数据开发第一站ODS篇
  19. 四边形围栅栏c语言长度,不同结构围油栏拦油特性数值的研究.pdf
  20. java未来三年的工作计划_未来三年工作计划精选.doc

热门文章

  1. python语言入门n-Python基础语法学习笔记
  2. python怎么安装matplotlib-[Python]一步步安装numpy,matplotlib
  3. python在线工具-在线 Python运行工具
  4. python之父去面试-Python面试题之Python的Super方法
  5. python浪漫代码-Python打造浪漫的心形,助你情人节表白成功!
  6. python打开界面是什么样的-python学习笔记(图形用户界面)
  7. python3安装pip3-python3安装pip3的实例步骤
  8. 学会python之后-python学会后做什么
  9. 2020年python工资一般多少钱-2020年Python发展前景如何呢?
  10. python if语句多个条件-python – if / elif语句的多个条件