程序员人生 网站导航

EMIPLIB库分析一

栏目:互联网时间:2016-06-04 14:32:00

通过分析feedbackexample例程来了解EMIPLIB库。feedbackexample例子可以实现在本机播放1个wav文件。但这类播放不是简单的调用本机播放API来实现,而是利用EMIPLIB库提供的RTP框架来完成。在不了解细节前,可以将这个进程简单理解成这样:读取本地的1个wav文件,然后将其打包成RTP,再将这些RTP发送到feedbackexample监听的1个UDP端口,feedbackexample再解析这些RTP包,并还原成可以播放的语音数据提交给本机声卡。我通常在这类场合下使用这个例程。其他程序通过SIP协议注册到1个语音交换平台,只实现了SIP协议交互部份并未实现RTP。这时候,就能够借用feedbackexample例程,只需将feedbackexample例程监听的端口号和对真个IP地址和端口号都设置成另外一个程序中SIP协议交互得到的端口号便可。也就是说,通过这样两个简单的程序可以实现1个非常简易的SIP软电话。1个程序负责SIP信令,另外一个程序负责RTP语音传输。

这个例程很简单,但也包括了足够的信息去了解EMIPLIB的运作细节。接下来看看这些代码究竟是如何做到的。

这个例程的代码非常简单,只有3个额外的辅助函数,和1个类。这3个辅助函数完全可以不用去了解,这个类也只是为了输出1些日志信息而已并没有增加其他功能性方面的代码。那关注点就能够落在main函数里了。进入到main函数内,去除那些注释后,真正有用的代码从变量定义开始。

MIPTime interval(0.020); // We'll use 20 millisecond intervals. MIPAverageTimer timer(interval); MIPWAVInput sndFileInput; MIPSamplingRateConverter sampConv, sampConv2; MIPSampleEncoder sampEnc, sampEnc2, sampEnc3; MIPULawEncoder uLawEnc; MIPRTPULawEncoder rtpEnc; MIPRTPComponent rtpComp; MIPRTPDecoder rtpDec; MIPRTPULawDecoder rtpULawDec; MIPULawDecoder uLawDec; MIPAudioMixer mixer; MyChain chain("Sound file player");

这些类型都是EMIPLIB库提供的类。由于我们只是探究如何发送,所以这里列出来的这么多类型,我们只会关注1部份:

MIPTime MIPAverageTimer MIPWAVInput MIPSamplingRateConverter MIPSampleEncoder MIPULawEncoder MIPRTPULawEncoder MIPRTPComponent

未列出的那些类型都是解码相干的类。还得再重点说说MyChain类。这是个继承类,父类是MIPComponentChain。名称中最后的单词是Chain。Chain是链、链条的意思。这个类是EMIPLIB运行时框架的核心类。是它将各个类组合在1块,记录像互关系,调和各个类的履行前后顺序等。稍后我们将详细研究这个类的源码。

接下来的代码就是初始化这些变量。中间会出现1些RTP开头的类:

RTPSession RTPUDPv4TransmissionParams RTPSessionParams

这些是另外一个库jrtplib提供的类。jrtplib库也是EMIPLIB库的底层基础。这3个类的目的就是为了在网络上传输接收RTP包。由于我们这次研究的是EMIPLIB,所以这些类就不细说了。
初始化完后,接着就是调用MIPComponentChain的addConnection操作这些变量。类似于这样:

// Next, we'll create the chain returnValue = chain.setChainStart(&timer); checkError(returnValue, chain); returnValue = chain.addConnection(&timer, &sndFileInput); checkError(returnValue, chain);

注释的意思是这是在创建链条。最后的启动是调用MIPComponentChain的start。

chain.start();

main函数内的代码基本情况就是这样。分3个部份:初始化变量,构建1个链条,启动这个链条。现在虽然没有查看MIPTime和MIPWAVInput等类的声明,基于链条的概念,和MIPComponentChain类名的暗示,可以猜想出之条件到的那些MIP开头的类应当都是类似于MIPComponent类的子类。还有1个可以解释成链条、任务流的地方是这些类名。声明的顺序是MIPWAVInput、MIPSamplingRateConverter、MIPSampleEncoder、MIPULawEncoder、MIPRTPULawEncoder和MIPRTPComponent。根据这个顺序我们分明看到了1个清晰的将本地wav文件打包成RTP数据包的任务流程。这也是1个左证,MIPComponentChain是1个任务流履行体、任务链条。但这些毕竟是猜想,实际情况是否是正如猜想的那样,还得分析了实现源码再说。

MIPComponentChain

feedbackexample例程中使用了MIPComponentChain3个成员函数:setChainStart、addConnection和start。通过它们的名称我们可以立即知晓这3个函数的意图。第1个应当是设置1个出发点。最后1个是启动。第2个有点模糊,加1个connection。输入参数是两个Component(基于我们之前的猜想)。connection的意思是连接、联系。这么来理解的话,这第2个函数的作用就是建立两个component的关系。现在都是猜想,接下来看代码。

setChainStart

这个函数确切很简单。就是将输入参数赋值给m_pInputChainStart成员变量。

bool MIPComponentChain::setChainStart(MIPComponent *startComponent) { if (startComponent == 0) { setErrorString(MIPCOMPONENTCHAIN_ERRSTR_COMPONENTNULL); return false; } m_pInputChainStart = startComponent; return true; }

注意到1点,输入参数的类型是MIPComponent。猜想是对的,而且名称完全猜对了。

addConnection

这个也很简单,就是将两个MIPComponent放入1个list内。inputConnections的类型是个std::list。之所以说简单,是由于暂时疏忽后3个参数。

bool MIPComponentChain::addConnection(MIPComponent *pPullComponent, MIPComponent *pPushComponent, bool feedback, uint32_t allowedMessageTypes, uint32_t allowedSubmessageTypes) { if (pPullComponent == 0 || pPushComponent == 0) { setErrorString(MIPCOMPONENTCHAIN_ERRSTR_COMPONENTNULL); return false; } m_inputConnections.push_back(MIPConnection(pPullComponent, pPushComponent, feedback, allowedMessageTypes, allowedSubmessageTypes)); return true; }

start

bool MIPComponentChain::start() { if (JThread::IsRunning()) //如果已在运行,不做其它操作,返回false。 {setErrorString(MIPCOMPONENTCHAIN_ERRSTR_THREADRUNNING);return false;} if (m_pInputChainStart == 0) //如果没设置起始节点,不做其它操作,返回false。 {setErrorString(MIPCOMPONENTCHAIN_ERRSTR_NOSTARTCOMPONENT);return false;} std::list<MIPConnection> orderedList; std::list<MIPComponent *> feedbackChain; if (!orderConnections(orderedList)) return false; if (!buildFeedbackList(orderedList, feedbackChain)) return false; copyConnectionInfo(orderedList, feedbackChain); m_stopLoop = false; if (JThread::Start() < 0) {setErrorString(MIPCOMPONENTCHAIN_ERRSTR_CANTSTARTTHREAD);return false;} return true; }

起始处调用了JThread的IsRunning函数。难不成MIPComponentChain类继承自JThread?查看这个类的头文件。果然继承自jthread类。JThread类也是由jrtplib库提供。

class EMIPLIB_IMPORTEXPORT MIPComponentChain : private jthread::JThread, public MIPErrorBase

起始处的两个判断很容易理解。代码中我已添加了注释。接着是定义两个std::list。然后是顺次调用orderConnections、buildFeedbackLis和copyConnectionInfo这3个成员函数。最后是调用JThread::Start()函数,这个函数很明显目的就是启动1个线程。在没有具体分析orderConnections、buildFeedbackLis和copyConnectionInfo这3个成员函数前,对MIPComponnetChain::start函数的粗略认识就是终究要启动1个线程,但在启动线程前要做1些准备工作。那末接下来就看看到底在启动线程前都做了些甚么。先看看orderConnections。

orderConnections

这个函数内的第1步是遍历m_inputConnections链表。之前这个链表在addConnection函数内出现过。现在可以回过头再去看看,那是将两个MIPComponent变量组合成1个MIPConnection变量,再放入m_inputConnections链表内。在这里调用了MIPConnection的setMark函数,应当是将内部mark属性置为false。具体有何目的现在还不知道。

for (it = m_inputConnections.begin() ; it != m_inputConnections.end() ; it++) (*it).setMark(false);

接着往下看。componentLayer是在函数起初处定义的,类型是std::list<MIPComponent *>。这是1个存储MIPComponent指针的链表。m_pInputChainStart之前在setChainStart函数内出现过,这是存储起始节点的变量。现在它被第1个放入了componentLayer链表内。

componentLayer.push_back(m_pInputChainStart);

然后是1个while循环。为了看着方便,下面的代码片断去除源码中具有的日志输出语句。

while (!componentLayer.empty()) { std::list<MIPComponent *> newLayer; std::list<MIPComponent *>::const_iterator compit; for (compit = componentLayer.begin() ; compit != componentLayer.end() ; compit++) { for (it = m_inputConnections.begin() ; it != m_inputConnections.end() ; it++) { if (!(*it).isMarked()) // check that we haven't processed this connection yet { if ((*it).getPullComponent() == (*compit)) // check that this connection starts from the component under consideration { // mark the connection as processed (*it).setMark(true); // copy the connection in the ordered list orderedList.push_back(*it); // get the other end of the connection and add that // component to the new layer // we'll make sure that the component isn't already // in the list bool found = false; MIPComponent *component = (*it).getPushComponent(); std::list<MIPComponent *>::const_iterator compit2; for (compit2 = newLayer.begin() ; !found && compit2 != newLayer.end() ; compit2++) { if ((*compit2) == component) found = true; } if (!found) newLayer.push_back(component); } } } } componentLayer = newLayer; }

结束上面这个while循环的条件是componentLayer链表为空。这个循环之前的1句是将起始节点放入这个链表内。也就是说,进入循环前链表的初始状态是有1个元素在链表内。接着是两个嵌套的for循环。循环的目的是将每一个componentLayer链表内的元素取出后,再在m_inputConnections链表内遍历1次做些操作。第2层for循环的第1句的注释很清楚表明了目的,检查这个MIPConnection是不是被处理过。这也说明了这个函数内第1步的目的:确保m_inputConnections链表内所有的MIPConnection在处理前的处理标志变量值都是“未处理”,也就是将mark置为false。

如果这个MIPConnection未被处理过,继续下面的判断,检查这个MIPConnection的“pull componnet”是否是当前正在检查的MIPComponent。这个被检查的MIPComponent是从componentLayer链表内取出来的。由于初始时componentLayer内只有初始节点,也就是说这两个for循环的第1次迭代就是找到那个pull component是初始节点的MIPConnection。找到这个MIPConnection后将它的mark标志置为true,也就是说这个MIPConnection已处理过了,后续再迭代时不要再处理它。然后将这个MIPConnection放入orderedList链表内。orderedList也是在函数开始处定义的,类型是std::list<MIPConnection>,也是个链表,存储的是MIPConnection变量。然后取出这个MIPConnection内的“push component”,又进入另外一个for循环,目的是在newLayer链表内寻觅是不是存在这个“push component”。如果newLayer内不存在这个“push component”,那末将这个“push component”放入newLayer。履行完第1次两个for循环的迭代后,再将newLayer赋值给componentLayer。此时,指的是第1次迭代,componentLayer内应当只有1个元素,就是与起始节点组合成1个MIPConnection的“push component”。由于componentLayer不为空,所以while循环不会结束,继续下1次迭代。第2次迭代的目的就是找到以这个“push component”为“pull component”的MIPConnection,然后设置mark标志,再取出另外一个“push component”。至此,应当可以明白这个函数的目的了,就是以起始节点为开端找到这个任务流内MIPConnection的履行顺序,并顺次放入orderedList内。最后还有1个for循环,目的是确保不再存在未被处理过的MIPConnection,如果有肯定是哪出错了。最后将排过序的MIPConnection链表输出到函数外。

由于最外围的while循环肯定要在某种条件下结束。代码里显示结束的标志是componentLayer链表为空,而每次循环最后1条语句是将newLayer变量赋值给componentLayer。也就是说while每次循环后如果newLayer为空则while循环结束。换种说法就是每次whille循环如果没有找到下1个push component那末循环就结束了。依照这类思路倒推,在建立处理流时最后加入的MIPConnection变量里的push component1定应当是个空指针。这样的1个MIPConnection就是在告知处理逻辑,处理流要结束了。

现在再回过头去看start函数内调用这1函数的目的就是排序m_inputConnections链表:从起始的MIPComponent节点开始,依照MIPConnection类给出的前后依承关系,理清这个任务流的顺序。现在已弄明白了start函数内被调用的第1个函数,接着看下1个buildFeedbackList。

buildFeedbackList

这个函数需要两个参数。1个是之前分析过的orderConnections函数生成的排过序的MIPConnection链表。另外一个是输出参数,是个存储MIPComponent指针的链表。
函数起始处是个for循环。目的依然是设置orderedList链表内MIPConnection元素的已处理标志。如果MIPConnection的成员函数giveFeedback返回false,那末设置MIPConnection已处理过,或说这个MIPConnection可以不用再处理了。想详细看看这个giveFeedback的具体实现如何,所有就去看看了。MIPConnection类定义居然是在MIPComponentChain内部。giveFeedback只是简单的返回内部成员变量m_feedback的值。这个成员变量值是在MIPConnection类生成时被赋的值。MIPConnection类都是在MIPComponentChain的addConnection函数内被创建的。追溯到这里可以看到MIPConnection的m_feedback值的根源来自addConnection函数的feedback输入参数。再检查feedbackexample例程的代码,几近所有编码进程相干的MIPComponent的addConnection操作均没有设置这个值,使用的是函数的缺省参数false。

chain.addConnection(&timer, &sndFileInput); chain.addConnection(&sndFileInput, &sampConv); chain.addConnection(&sampConv, &sampEnc); chain.addConnection(&sampEnc, &uLawEnc); chain.addConnection(&uLawEnc, &rtpEnc); chain.addConnection(&rtpEnc, &rtpComp);

但大部份所解码进程相干的MIPComponent的addConnection操作均设置了feedback输入参数,并且使用的都是true。

chain.addConnection(&rtpComp, &rtpDec); // This is where the feedback chain is specified: we want // feedback from the mixer to reach the RTP audio decoder, // so we'll specify that over the links in between, feedback // should be transferred. chain.addConnection(&rtpDec, &uLawDec, true); chain.addConnection(&uLawDec, &sampEnc2, true); chain.addConnection(&sampEnc2, &sampConv2, true); chain.addConnection(&sampConv2, &mixer, true); chain.addConnection(&mixer, &sampEnc3); chain.addConnection(&sampEnc3, &sndCardOutput);

中间还夹着着1小段注释。感觉是解释feedback机制的,但看不太懂。到现在还是不知道什么时候和为什么设置feedback标志。接着看下面的代码。接下来是1大段while。

while (!done) { bool found = false; it = orderedList.begin(); while (!found && it != orderedList.end()) { if (!(*it).isMarked() && (*it).giveFeedback()) found = true; else it++; } ............ }

这段while的最前部是又1个while循环。遍历检查orderedList链表,找到第1个未被处理过、且giveFeedback返回true的MIPConnection。如果找到了,就立即结束这部份处理。从大的while循环角度看,每次都得再做这么1次处理。也就是说,每次大的循环开始时,都得在orderedList链表内找出1个未被处理过、且giveFeedback返回true的MIPConnection。如果有1次遍历找不到满足条件的MIPConnection,那末遍历结束。

现在我们找到了1个需要处理的MIPConnection。下面列出的代码去除1些注释,重新做了些排布工作。首先是设置这个MIPConnection的处理标志为true,下次就不会再找到它了。然后是清空subChain,这是个链表,存储的是指向MIPComponent的指针。然后将这个正在处理的MIPConnection的“pull component”和“push component”顺次放入这个subChain。

然后是另外一个while循环。这次遍历是从刚才在orderedList中找到的那个MIPConnection的后1个MIPConnection开始。如果排在后面的MIPConnection的“pull component”等于这个被找到的MIPConnection的“push component”,这个排在后面的MIPConnection的处理标志被置为true,且它的“push component”被放入subChain链表内。但如果后面有两个满足这样条件的MIPConnection,说明有错。如果还需遍历,会从在这次遍历中找到的MIPConnection的后面1个MIPConnection开始,且startIt存的是这次遍历中找到的MIPConnection。也就是说,subChain存的是那些MIPConnection feedback标志为true的所有MIPComponent,且先放入“pull component”再放入“push component”。

while (!done) { ............ if (found) // ok, found a starting point, build the subchain { (*it).setMark(true); subChain.clear();subChain.push_back((*it).getPullComponent());subChain.push_back((*it).getPushComponent()); (*it).setMark(true); std::list<MIPConnection>::iterator startIt = it;bool done2 = false; while (!done2) { std::list<MIPConnection>::iterator nextStartIt; bool foundNextStartIt = false; it = startIt; it++; while (it != orderedList.end()) { if ( (*it).giveFeedback() ) { if ((*it).getPullComponent() == (*startIt).getPushComponent()) { if (foundNextStartIt) {setErrorString(MIPCOMPONENTCHAIN_ERRSTR_CANTMERGEFEEDBACK);return false;} foundNextStartIt = true; nextStartIt = it; subChain.push_back((*it).getPushComponent()); (*it).setMark(true); } } it++; } if ( !foundNextStartIt ) done2 = true; else startIt = nextStartIt; } // add the subchain to the feedbacklist in reverse if (!feedbackChain.empty()) feedbackChain.push_front(0); // mark new subchain std::list<MIPComponent *>::const_iterator it2; for (it2 = subChain.begin() ; it2 != subChain.end() ; it2++) feedbackChain.push_front(*it2); } else done = true;

在每次大的循环迭代处理过后,subChain存的是那些在这次迭代开始时在orderedList中找到的第1个feedback为true的MIPConnection的“pull component”,且以这个MIPComponent为出发点的子任务链。这个子任务链是全部任务链的1小部份,它是整体的子集。这个子任务链的结束标志有两个,1个是到达了链尾,另外一个是遇到了1个feedback标志为false的MIPConnection。我想这个处理进程应当是想找到所有这样的子链。这样的存在有它本身的意义,现在还不得而知。

最后1部份有点意思。将某次迭代中找到的subChain内元素按倒序方式放入feedbackChain链表内。两两子链以1个0做为分割。最后将feedbackChain内元素拷贝给函数的第2个参数feedbackComponentChain。

这个函数的目的略微清晰了点。找出1些子链,这些子链再按倒序放入1个链表内,子链间以0分隔。现在可以继续start函数内出现的最后1个函数了,copyConnectionInfo。

copyConnectionInfo 

可以说这个函数非常简单1目了然。就是将之前两个函数orderConnections和buildFeedbackList处理的结果保存在成员变量中。

void MIPComponentChain::copyConnectionInfo(const std::list<MIPConnection> &orderedList, const std::list<MIPComponent *> &feedbackChain) { std::list<MIPConnection>::const_iterator it; std::list<MIPComponent *>::const_iterator it2; m_chainMutex.Lock(); m_orderedConnections.clear(); m_feedbackChain.clear(); for (it = orderedList.begin() ; it != orderedList.end() ; it++) m_orderedConnections.push_back(*it); for (it2 = feedbackChain.begin() ; it2 != feedbackChain.end() ; it2++) m_feedbackChain.push_back(*it2); m_pInternalChainStart = m_pInputChainStart; m_chainMutex.Unlock(); }


 

至此,start函数内牵涉到的3个成员函数都看了1遍。大致了解了他们的作用。这些都只是为了启动线程前的准备工作。我们知道start函数的终究目的是启动1个线程。那末真正线程内的处理都是些甚么呢,哪一个函数是做这件事的呢?扫1遍mipcomponentchain.cpp文件后发现Thread函数有点像。再进入这个函数内部看到起始处有这么1句日志输出:

#ifdef MIPDEBUG std::cout << "MIPComponentChain::Thread started" << std::endl; #endif // MIPDEBUG

这应当毫无疑问就是线程的入口函数了。接下来就看看它到底做了些甚么。

 

Thread

大致看了1眼,再加上之前有过线程编码的经历,可以推测出这个函数内最主要的部份应当就是1个循环。但不多是个无穷循环,肯定有退出机制。进入循环前应当有1些初始化操作。

bool done = false;//感觉这句赋值没必要,下面有1句赋值语句。 bool error = false;//初始值为false。 int64_t iteration = 1; std::string errorComponent, errorString; m_loopMutex.Lock(); done = m_stopLoop;//在启动线程前,这个值被赋值为false。 m_loopMutex.Unlock(); JThread::ThreadStarted();//调用父类的函数,这应当是JThread类的使用规范。 MIPSystemMessage startMsg(MIPSYSTEMMESSAGE_TYPE_WAITTIME);

上面这些初始化操作大部份都很好理解。只是又多了1个之前未遇到过的类:MIPSystemMessage。既然出现了,那就看看他的定义。这个类的头文件里有1段描写这个类作用的文字。

/** A system message. * This kind of message is used to instruct a component to wait until messages can be * distributed in the chain or to inform a component that an interval has elapsed. */

大致的意思是说,这个类的作用是消耗掉特定时长。有点类似于多线程编程中常常使用的Sleep函数,只不过在这将其封装成了类。同时还发现,MIPSystemMessage继承自MIPMessage。MIPMessage类也有1段解释文字。从中可以看出,MIPMessage是个基类。MIPMessage消息将会从1个MIPComponent传递给另外一个MIPComponent。

/** Base class of messages passed in a MIPComponentChain instance. * This is the base class of messages distributed in a MIPComponentChain * instance. Messages are distributed from component to component using * the MIPComponent::pull and MIPComponent::push functions. The * message type numbers can be found in mipmessage.h */

这同时也说明了链条中各个MIPComponent之间是如何通讯的。同时,也应当注意到MIPComponentChain会生成1个子类型是MIPSYSTEMMESSAGE_TYPE_WAITTIME的MIPSystemMessage对象,并提供给链条中的第1个MIPComponent。也就是说,任何1个可以充当链条中第1个元素的MIPComponent必须可以处理子类型是MIPSYSTEMMESSAGE_TYPE_WAITTIME的MIPSystemMessage对象。以上就是循环前的初始化操作。接下来进入循环内部。此循环大致由3部份组成。1,针对链条初始节点的操作。2,遍历m_orderedConnections链表。3,遍历m_feedbackChain链表。这3步中任何1步履行完后都会检查毛病标志,用来判断是不是需要立即结束循环。

第1阶段。

第1步是调用初始节点的push函数。向其传入初始化阶段生成的startMsg变量。其他加锁操作就不做分析了。

MIPTime::wait(MIPTime(0,0)); m_chainMutex.Lock(); m_pInternalChainStart->lock(); if (!m_pInternalChainStart->push(*this, iteration, &startMsg)) { error = true; errorComponent = m_pInternalChainStart->getComponentName(); errorString = m_pInternalChainStart->getErrorString(); m_pInternalChainStart->unlock(); m_chainMutex.Unlock(); break; } m_pInternalChainStart->unlock();

push操作如果失败就结束全部循环,也就是说结束线程。这1步最关键的1点就是push操作都做了哪些事。必须以1个具体的实例来做说明才能对这1步有清晰的认识。正好再回到feedbackexample例程中代码结合着来分析。在例程中设置的初始节点类型是MIPAverageTimer。打开mipaveragetimer.h文件看看这个类是如何定义的。下面这段内容很有帮助。

/** A simple timing component. * This is a simple timing component which accepts MIPSYSTEMMESSAGE_WAITTIME system * messages. It generates a MIPSYSTEMMESSAGE_ISTIME system message each time the * specified interval has elapsed. Note that this is only on average after each interval: * fluctuation will be present. */

意思是接收MIPSYSTEMMESSAGE_WAITTIME消息,产生MIPSYSTEMMESSAGE_ISTIME消息。产生消息前必须是经过了多少秒后。再看看push和pull两个函数的实现。由于这个类也是继承自MIPComponent。所谓的“接收MIPSYSTEMMESSAGE_WAITTIME消息”是在push函数内实现的。push函数内会判断输入的MIPMessage变量类型是不是满足要求。其次,push函数内还实现了“经过了多少秒后”,可以在源码中看到下面这句:

MIPTime::wait(MIPTime(diff));

diff的值由下面方法生成:

MIPTime curTime = MIPTime::getCurrentTime(); real_t diff = (m_startTime.getValue()+((real_t)iteration)*m_interval.getValue())-curTime.getValue();

m_interval的值在创建MIPAverageTimer时指定,例程中给出的值是0.02。m_startTime的值在创建MIPAverageimer时指定,实际值是创建时的系统时间。iteration是迭代值,每次Thread的for循环履行1次累加这个值,并传给所有的MIPComponent对象。curTime是MIPAverageTimer的push函数被调用时的时间。diff值的含义是,如果每次迭代都消耗了0.02秒,那末diff值就是这次push被调用时的时间与理想情况下应当被消耗的时间的差值。如果这个值大于零,说明实际情况是之前的处理有些快需要减慢,所以就休眠1段时间。
现在再回到第1部份的处理场景中。加上之前针对MIPAverageTimer类的分析。现在能够明白这第1部份都做了些甚么:履行了MIPAverageTimer的push函数。startMsg变量正是push函数所需要的类类型。由于满足了这些条件,所以push函数内还履行了休眠操作。也就是说,这第1部份的真正作用就是休眠了固定时长。也就是说每次迭代的第1步都是休眠固定时长。

第2阶段/
接着看第2步:遍历m_orderedConnections。现在我们知道这个链表存储的是1个有序的MIPConnectiont集合。其实也是有序的MIPComponent集合。由于前1个MIPConnection的pull component就是后1个MIPConnection的push component。

for (it = m_orderedConnections.begin() ; !error && it != m_orderedConnections.end() ; it++) { MIPComponent *pPullComp = (*it).getPullComponent(); MIPComponent *pPushComp = (*it).getPushComponent(); uint32_t mask1 = (*it).getMask1(); uint32_t mask2 = (*it).getMask2(); pPullComp->lock(); pPushComp->lock(); MIPMessage *msg = 0; do { if (!pPullComp->pull(*this, iteration, &msg)) {error = true;errorComponent = pPullComp->getComponentName();errorString = pPullComp->getErrorString();} else { if ( msg ) { uint32_t msgType = msg->getMessageType();uint32_t msgSubtype = msg->getMessageSubtype(); if ( ( msgType&mask1 ) && ( msgSubtype&mask2 ) ) { if ( !pPushComp->push(*this, iteration, msg) ) {error = true;errorComponent = pPushComp->getComponentName();errorString = pPushComp->getErrorString();} } } } } while (!error && msg); pPullComp->unlock(); if (pPushComp->getComponentPointer() != pPullComp->getComponentPointer()) pPushComp->unlock(); }

每次遍历都会取出1个MIPConnection,目的是得到这个MIPConnection的pull component和push component。然后每次从pull component中取出1个MIPMessage再提供给push component。结束某1个MIPConnection的处理条件是没法再从pull component取出MIPMessage,或出错了。我们模糊能感觉到消息在全部处理链条中被传递的进程。

第3阶段。
现在进入第3步,feedback链条。处理feedback链条会用到1个类MIPFeedback。应当还记得,m_feedbackChain存储的是多个feedback链条,链条间以空指针分隔。所以在遍历处理进程中也看到了针对这个情况的处理。每次重新1个新的子链条的处理前要重置feedback变量。除此以外每次遍用时,都是调用链条中MIPComponent的processFeedback函数。processFeedback函数接收的参数中就包括MIPFeedback类型的变量。之前已提到过,每次1个新的子链条处理前都会重置MIPFeedback变量。也就是说,每一个子链条内MIPComponent间传递消息是通过MIPFeedback类。目前为止还只是猜想。接着看MIPFeedback类的说明。

/** Message passed through a feedback chain. * A MIPFeedback object is used in a feedback chain of a MIPComponentChain instance. * Each object in the same chain can inspect and/or modify the information in the * MIPFeedback instance. */

这段关于类用处的注释左证了我们之前的猜想。第3部份的框架其实很简单。处理m_feedbackChain内保存的每一个feedback链条。链条内MIPComponent间通过MIPFeedback传递消息。但是,EMIPLIB库设计这个feedback链条的目的是甚么依然不清晰。

通过分析例程代码,和库源码,大致了解了该如何使用EMIPLIB库和EMIPLIB库内部实现。现在知道了为了向网络的1个端点发送语音,该使用哪些MIPCompnent组件,该如何建立这些组件间在运行期间的关系。同时,也知晓在运行期EMIPLIB将会创建1个线程在后台履行语音数据的转换和发送。基本的进程是这样的。先创建1些语音数据转换和发送用的MIPCompnent组件。然后初始化这些组件实例。接着创建1个MIPComponentChain类实例。指定1个初始MIPComponent节点。接下来依照顺序加入MIPComponeng组件实例。最后1步就是调用MIPComponentChain类的start函数。

经过上述的分析,现在知道了该如何启动1个将本地文件打包成RTP并发送给特定网络地址端口的进程。可以在任什么时候间暂停或停止这样1个进程吗?我们再看看MIPComponentChain类的源码,是不是存在1个这样的成员函数。发现还有这样几个成员函数未仔细分析:

<pre class="cpp" name="code"><pre class="cpp" name="code">stop() rebuild() clearChain() deleteConnection


从函数名称中可以大致知晓这几个成员函数的用处。没有找到暂停这样1个进程的函数,只是找到了结束这样1个进程的函数,stop。这4个函数的内部逻辑都非常简单,就不逐一罗列了。除stop函数外,其他3个函数都未在类内部被调用过。

 

现在已了解到,MIPComponentChain类在EMIPLIB库中所处的核心位置。这个类实现了EMIPLIB库的处理框架。它负责协同调用各个MIPComponent组件。MIPComponent组件间通过MIPMessage消息类传递信息。这篇分析文章只是揭露了EMIPLIB在高层是如何运作的。接下来有必要深入到具体的MIPComponent类源码中1探究竟。





 

------分隔线----------------------------
------分隔线----------------------------

最新技术推荐