Source:数据源、获取数据数据传递功能
Sink:消耗数据,发送数据
Source->Sink:Source获取网络数据,Sink消耗数据写入文件
注意:live555两个循环一个是发送RTP数据,一个就是读取RTP即
SingleStep();
-
void
BasicTaskScheduler0::doEventLoop(
char
* watchVariable) - {
-
// Repeatedly loop, handling readble sockets and timed events:
-
while
(1) - {
-
if
(watchVariable != NULL && *watchVariable != 0)
break
; - SingleStep();
- }
- }
从这里可知,live555在客户端处理数据实际上是单线程的程序,不断执行SingleStep()函数中的代码。通过查看该函数代码里,下面一句代码为重点
- (*handler->handlerProc)(handler->clientData, resultConditionSet);
其中该条代码出现了两次,通过调试跟踪它的执行轨迹,第一次出现调用的函数是为了处理和RTSP服务器的通信协议的商定,而第二次出现调用的函数才是处理真正的视频和音频数据。对于RTSP通信协议的分析我们暂且不讨论,而直接进入第二次调用该函数的部分。
在我们的调试过程中在执行到上面的函数时就直接调用到livemedia目录下的如下函数
-
void
MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source,
int
/*mask*/
) - {
- source->networkReadHandler1();
- }
//下面这个函数实现的主要功能就是从socket端读取数据并存储数据
-
void
MultiFramedRTPSource::networkReadHandler1() - {
- BufferedPacket* bPacket = fPacketReadInProgress;
-
if
(bPacket == NULL) - {
-
// Normal case: Get a free BufferedPacket descriptor to hold the new network packet:
-
//分配一块新的存储空间来存储从socket端读取的数据
-
bPacket = fReorderingBuffer->getFreePacket(
this
); - }
-
// Read the network packet, and perform sanity checks on the RTP header:
- Boolean readSuccess = False;
-
do
- {
- Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL;
-
//fillInData()函数封装了从socket端获取数据的过程,到此函数执行完已经将数据保存到了bPacket对象中
-
if
(!bPacket->fillInData(fRTPInterface, packetReadWasIncomplete)) - {
-
if
(bPacket->bytesAvailable() == 0) - {
-
envir() <<
“MultiFramedRTPSource error: Hit limit when reading incoming packet over TCP. Increase \”MAX_PACKET_SIZE\”\n”
; - }
-
break
; - }
-
if
(packetReadWasIncomplete) - {
-
// We need additional read(s) before we can process the incoming packet:
- fPacketReadInProgress = bPacket;
-
return
; -
}
else
- {
- fPacketReadInProgress = NULL;
- }
-
//省略关于RTP包的处理
- …
- …
- …
-
//fReorderingBuffer为MultiFramedRTPSource类中的对象,该对象建立了一个存储Packet数据包对象的链表
-
//下面的storePacket()函数即将上面获取的数据包存储在链表中
-
if
(!fReorderingBuffer->storePacket(bPacket))
break
; - readSuccess = True;
-
}
while
(0); -
if
(!readSuccess) fReorderingBuffer->freePacket(bPacket); - doGetNextFrame1();
-
// If we didn’t get proper data this time, we’ll get another chance
- }
//下面的这个函数则实现从上面函数中介绍的存储数据包链表的对象(即fReorderingBuffer)中取出数据包并调用相应函数使用它
//代码1.1
-
void
MultiFramedRTPSource::doGetNextFrame1() - {
-
while
(fNeedDelivery) - {
-
// If we already have packet data available, then deliver it now.
- Boolean packetLossPrecededThis;
-
//从fReorderingBuffer对象中取出一个数据包
- BufferedPacket* nextPacket
- = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);
-
if
(nextPacket == NULL)
break
; - fNeedDelivery = False;
-
if
(nextPacket->useCount() == 0) - {
-
// Before using the packet, check whether it has a special header
-
// that needs to be processed:
- unsigned specialHeaderSize;
-
if
(!processSpecialHeader(nextPacket, specialHeaderSize)) - {
-
// Something’s wrong with the header; reject the packet:
- fReorderingBuffer->releaseUsedPacket(nextPacket);
- fNeedDelivery = True;
-
break
; - }
- nextPacket->skip(specialHeaderSize);
- }
-
// Check whether we’re part of a multi-packet frame, and whether
-
// there was packet loss that would render this packet unusable:
-
if
(fCurrentPacketBeginsFrame) - {
-
if
(packetLossPrecededThis || fPacketLossInFragmentedFrame) - {
-
// We didn’t get all of the previous frame.
-
// Forget any data that we used from it:
- fTo = fSavedTo; fMaxSize = fSavedMaxSize;
- fFrameSize = 0;
- }
- fPacketLossInFragmentedFrame = False;
-
}
else
if
(packetLossPrecededThis) - {
-
// We’re in a multi-packet frame, with preceding packet loss
- fPacketLossInFragmentedFrame = True;
- }
-
if
(fPacketLossInFragmentedFrame) - {
-
// This packet is unusable; reject it:
- fReorderingBuffer->releaseUsedPacket(nextPacket);
- fNeedDelivery = True;
-
break
; - }
-
// The packet is usable. Deliver all or part of it to our caller:
- unsigned frameSize;
-
//将上面取出的数据包拷贝到fTo指针所指向的地址
- nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
- fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
- fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
- fCurPacketMarkerBit);
- fFrameSize += frameSize;
-
if
(!nextPacket->hasUsableData()) - {
-
// We’re completely done with this packet now
- fReorderingBuffer->releaseUsedPacket(nextPacket);
- }
-
if
(fCurrentPacketCompletesFrame)
//如果完整的取出了一帧数据,则可调用需要该帧数据的函数去处理它
- {
-
// We have all the data that the client wants.
-
if
(fNumTruncatedBytes > 0) - {
-
envir() <<
“MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client’s buffer size (”
-
<< fSavedMaxSize <<
“). ”
-
<< fNumTruncatedBytes <<
” bytes of trailing data will be dropped!\n”
; - }
-
// Call our own ‘after getting’ function, so that the downstream object can consume the data:
-
if
(fReorderingBuffer->isEmpty()) - {
-
// Common case optimization: There are no more queued incoming packets, so this code will not get
-
// executed again without having first returned to the event loop. Call our ‘after getting’ function
-
// directly, because there’s no risk of a long chain of recursion (and thus stack overflow):
-
afterGetting(
this
);
//调用函数去处理取出的数据帧
-
}
else
- {
-
// Special case: Call our ‘after getting’ function via the event loop.
- nextTask() = envir().taskScheduler().scheduleDelayedTask(0,
-
(TaskFunc*)FramedSource::afterGetting,
this
); - }
- }
-
else
- {
-
// This packet contained fragmented data, and does not complete
-
// the data that the client wants. Keep getting data:
- fTo += frameSize; fMaxSize -= frameSize;
- fNeedDelivery = True;
- }
- }
- }
//下面这个函数即开始调用执行需要该帧数据的函数
-
void
FramedSource::afterGetting(FramedSource* source) - {
- source->fIsCurrentlyAwaitingData = False;
-
// indicates that we can be read again
-
// Note that this needs to be done here, in case the “fAfterFunc”
-
// called below tries to read another frame (which it usually will)
-
if
(source->fAfterGettingFunc != NULL)
- {
- (*(source->fAfterGettingFunc))(source->fAfterGettingClientData,
- source->fFrameSize, source->fNumTruncatedBytes,
- source->fPresentationTime,
- source->fDurationInMicroseconds);
- }
- }
上面的fAfterGettingFunc为我们自己注册的函数,如果运行的是testProgs中的openRTSP实例,则该函数指向下列代码中通过调用getNextFrame()注册的afterGettingFrame()函数
- Boolean FileSink::continuePlaying()
- {
-
if
(fSource == NULL)
return
False; - fSource->getNextFrame(fBuffer, fBufferSize,
-
afterGettingFrame,
this
, -
onSourceClosure,
this
); -
return
True; - }
如果运行的是testProgs中的testRTSPClient中的实例,则该函数指向这里注册的afterGettingFrame()函数
- Boolean DummySink::continuePlaying()
- {
-
if
(fSource == NULL)
return
False;
// sanity check (should not happen)
-
// Request the next frame of data from our input source. “afterGettingFrame()” will get called later, when it arrives:
- fSource->getNextFrame(fReceiveBuffer, DUMMY_SINK_RECEIVE_BUFFER_SIZE,
-
afterGettingFrame,
this
, -
onSourceClosure,
this
); -
return
True; - }
从上面的代码中可以看到getNextFrame()函数的第一个参数为分别在各自类中定义的buffer,我们继续以openRTSP为运行程序来分析,fBuffer为FileSink类里定义的指针:unsigned char* fBuffer;
这里我们先绕一个弯,看看getNextFrame()函数里做了什么
-
void
FramedSource::getNextFrame(unsigned
char
* to, unsigned maxSize, - afterGettingFunc* afterGettingFunc,
-
void
* afterGettingClientData, - onCloseFunc* onCloseFunc,
-
void
* onCloseClientData) - {
-
// Make sure we’re not already being read:
-
if
(fIsCurrentlyAwaitingData) - {
-
envir() <<
“FramedSource[”
<<
this
<<
“]::getNextFrame(): attempting to read more than once at the same time!\n”
; - envir().internalError();
- }
- fTo = to;
- fMaxSize = maxSize;
-
fNumTruncatedBytes = 0;
// by default; could be changed by doGetNextFrame()
-
fDurationInMicroseconds = 0;
// by default; could be changed by doGetNextFrame()
- fAfterGettingFunc = afterGettingFunc;
- fAfterGettingClientData = afterGettingClientData;
- fOnCloseFunc = onCloseFunc;
- fOnCloseClientData = onCloseClientData;
- fIsCurrentlyAwaitingData = True;
- doGetNextFrame();
- }
从代码可以知道上面getNextFrame()中传入的第一个参数fBuffer指向了指针fTo,而我们在前面分析代码1.1中的void MultiFramedRTPSource::doGetNextFrame1()函数中有下面一段代码:
-
//将上面取出的数据包拷贝到fTo指针所指向的地址
- nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
- fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
- fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
- fCurPacketMarkerBit);
实际上现在应该明白了,从getNextFrame()函数中传入的第一个参数fBuffer最终存储的即是从数据包链表对象中取出的数据,并且在调用上面的use()函数后就可以使用了。
而在void MultiFramedRTPSource::doGetNextFrame1()函数中代码显示的最终调用我们注册的void FileSink::afterGettingFrame()正好是在use()函数调用之后的afterGetting(this)中调用。我们再看看afterGettingFrame()做了什么处理:
-
void
FileSink::afterGettingFrame(
void
* clientData, unsigned frameSize, - unsigned numTruncatedBytes,
-
struct
timeval presentationTime, -
unsigned
/*durationInMicroseconds*/
) - {
- FileSink* sink = (FileSink*)clientData;
- sink->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime);
- }
-
void
FileSink::afterGettingFrame(unsigned frameSize, - unsigned numTruncatedBytes,
-
struct
timeval presentationTime) - {
-
if
(numTruncatedBytes > 0) - {
-
envir() <<
“FileSink::afterGettingFrame(): The input frame data was too large for our buffer size (”
-
<< fBufferSize <<
“). ”
-
<< numTruncatedBytes <<
” bytes of trailing data was dropped! Correct this by increasing the \”bufferSize\” parameter in the \”createNew()\” call to at least ”
-
<< fBufferSize + numTruncatedBytes <<
“\n”
; - }
- addData(fBuffer, frameSize, presentationTime);
-
if
(fOutFid == NULL || fflush(fOutFid) == EOF) - {
-
// The output file has closed. Handle this the same way as if the
-
// input source had closed:
-
onSourceClosure(
this
); - stopPlaying();
-
return
; - }
-
if
(fPerFrameFileNameBuffer != NULL) - {
-
if
(fOutFid != NULL) { fclose(fOutFid); fOutFid = NULL; } - }
-
// Then try getting the next frame:
- continuePlaying();
- }
从上面代码可以看到调用了addData()函数将数据保存到文件中,然后继续continuePlaying()又去获取下一帧数据然后处理,直到遇到循环结束然后依次退出调用函数。最后看看addData()函数的实现即可知:
-
void
FileSink::addData(unsigned
char
const
* data, unsigned dataSize, -
struct
timeval presentationTime) - {
-
if
(fPerFrameFileNameBuffer != NULL) - {
-
// Special case: Open a new file on-the-fly for this frame
-
sprintf(fPerFrameFileNameBuffer,
“%s-%lu.%06lu”
, fPerFrameFileNamePrefix, - presentationTime.tv_sec, presentationTime.tv_usec);
- fOutFid = OpenOutputFile(envir(), fPerFrameFileNameBuffer);
- }
-
// Write to our file:
-
#ifdef TEST_LOSS
-
static
unsigned
const
framesPerPacket = 10; -
static
unsigned
const
frameCount = 0; -
static
Boolean
const
packetIsLost; -
if
((frameCount++)%framesPerPacket == 0) - {
-
packetIsLost = (our_random()%10 == 0);
// simulate 10% packet loss #####
- }
-
if
(!packetIsLost) -
#endif
-
if
(fOutFid != NULL && data != NULL) - {
- fwrite(data, 1, dataSize, fOutFid);
- }
- }
最后调用系统函数fwrite()实现写入文件功能。
总结:从上面的分析可知,如果要取得从RTSP服务器端接收并保存的数据帧,我们只需要定义一个类并实现如下格式两个的函数,并声明一个指针地址buffer用于指向数据帧,再在continuePlaying()函数中调用getNextFrame(buffer,…)即可。
-
typedef
void
(afterGettingFunc)(
void
* clientData, unsigned frameSize, - unsigned numTruncatedBytes,
-
struct
timeval presentationTime, - unsigned durationInMicroseconds);
-
typedef
void
(onCloseFunc)(
void
* clientData);
然后再在afterGettingFunc的函数中即可使用buffer。.