1,RTSP连接的建立过程 RTSPServer类用于构建一个RTSP服务器,该类同时在其内部定义了一个RTSPClientSession类,用于处理单独的客户会话。
首先创建RTSP服务器(具体实现类是DynamicRTSPServer),在创建过程中,先建立 Socket(ourSocket)在TCP的554端口进行监听,然后把连接处理函数句柄(RTSPServer:: incomingConnectionHandler)和socket句柄传给任务调度器(taskScheduler)。
任务调度器把socket句柄放入后面select调用中用到的socket句柄集(fReadSet)中,同时将socket句柄和incomingConnectionHandler句柄关联起来。接着,主程序开始进入任务调度器的主循环(doEventLoop),在主循环中调用系统函数select阻塞,等待网络连接。
当RTSP客户端输入(rtsp://192.168.1.109/1.mpg)连接服务器时,select返回对应的scoket,进而根据前面保存的对应关系,可找到对应处理函数句柄,这里就是前面提到的incomingConnectionHandler了。在incomingConnectionHandler中创建了RTSPClientSession,开始对这个客户端的会话进行处理。
具体分析如下:
DynamicRTSPServer::creatnew():
1.调用继承自RTPSever::setUpOurSocket: 调用GroupsockHelper 的setupStreamSocket创建一个socket连接,并绑定, 设置socket的发送缓存大小, 调用listen开始监听端口,设置同时最大能处理连接数LISTEN_BACKLOG_SIZE=20,如果达到这个上限则client端将收到ECONNERREFUSED的错误 测试绑定端口是否为0,为0的话重新绑定断口,并返回系统自己选择的新的端口。 返回建立的socket文件描述符 2.调用自己和RTPSever的构造函数: RTPSever构造函数:
用一个UsageEnvironment对象的引用构造其父类Medium 设置最大等待回收连接时间reclamationTestSeconds,超过这个时间从客户端没有RTSP命令或者RTSP的RR包则收回其RTSPClientSession 建立一个HashTable(实际上是一个BasicHashTable),fServerMediaSessions指向这个表。 调用参数UsageEnvironment对象env的成员,一个TaskScheduler指针所指对象(实际就是一个BasicTaskScheduler对象)的成员函数turnOnBackgroundReadHandling(): 1.调用一个HandlerSet::assignHandler()创建一个Handler,把socketNum【此处为服务器监听的socket描述符】和处理函数RTSPServer::incomingConnectionHandler(),还有指向RTSPSever的指针绑定在一起。 incomingConnectionHandler作用: 调用accept返回服务器与客户端连接的socket描述符 设置客户端描述符为非阻塞 增加客户端socket描述符的发送缓存为50*1024 为此客户端随机分配一个sessionId 用客户端socket描述符clientSocket,sessionId,和客户端地址clientAddr调用creatNewClientSession创建一个clientSession。 2,请求消息处理过程 上节我们谈到RTSP服务器收到客户端的连接请求,建立了RTSPClientSession类,处理单独的客户会话。在建立 RTSPClientSession的过程中,将新建立的socket句柄(clientSocket)和RTSP请求处理函数句柄RTSPClientSession::incomingRequestHandler传给任务调度器,由任务调度器对两者进行一对一关联。当客户端发出 RTSP请求后,服务器主循环中的select调用返回,根据socket句柄找到对应的incomingRequestHandler,开始消息处理。先进行消息的解析。
RTSPClientSession::RTSPClientSession()构造函数:
重置请求缓存 调用envir().taskScheduler().turnOnBackgroundReadHandling(),这次socketnumber为客户端socket描述符这次的处理函数是RTSPServer::RTSPClientSession::incomingRequestHandler() RTSPServer::RTSPClientSession::incomingRequestHandler(): 调用handleAlternativeRequestByte1(uint8_t requestByte): 1.fRequestBuffer[fRequestBytesAlreadySeen] =requestByte;把请求字符放入请求缓存fRequestBuffer 2.调用handleRequestBytes(1) 处理请求缓存 handleRequestBytes(int newBytesRead): 1.调用noteLiveness()查看请求是否到期,如果服务器的reclamationTestSeconds> 0,调用taskScheduler对象的rescheduleDelayedTask函数: 参数为(fLivenessCheckTask,fOurServer.fReclamationTestSeconds1000000,(TaskFunc)livenessTimeoutTask, this)
其中livenessTimeoutTask()函数作用是删除new出来的clientSession.
调用unscheduleDelayedTask(TaskToken&prevTask):从DelayQueue中删除prevTask项, prevTask置空.
调用scheduleDelayedTask(int64_t microseconds, TaskFunc* proc, void*clientData):
1.创建一个DelayInterval对象timeToDelay,用microseconds初始化。 2.创建一个AlarmHandler对象,用proc, clientData, timeToDelay初始化 3.调用fDelayQueue.addEntry(),把这个AlarmHandler对象加入到延迟队列中 4.返回AlarmHandler对象的token[long类型]的指针
如果请求的的长度超过请求缓存可读长度fRequestBufferBytesLeft,结束这个连接。
找到请求消息的结尾:。
如果找到消息结尾,调用RTSPServer::RTSPClientSession::handleRequestBytes()[值得关注此函数]把请求字符串转换成命令各部分包括:cmdName[方法],urlPreSuffix[url地址],urlSuffix[要读取的文件名],sceq[消息的Cseq],否则函数返回需要继续从连接中读取请求。分别存入对应的数组。
如果转换成功,调用handleCmd_xxx()处理对应的cmdName: xxx[此处实现了:OPTIONS,DESCRIBE,SETUP,TEARDOWN,PLAY,PAUSE,GET_PARAMETER,SET_PARAMETER],其中PLAY,PAUSE,GET_PARAMETER,SET_PARAMETER调用handleCmd_withinSession (cmdName,urlPreSuffix, urlSuffix,cseq,(char const*)fRequestBuffer);
liveMedia项目的源代码包括四个基本的库,各种测试代码以及Media Server。四个基本的库分别是:
UsageEnvironment&TaskScheduler, groupsock, liveMedia和BasicUsageEnvironment。
1,基础类介绍:
BasicUsageEnvironment和UsageEnvironment中的类都是用于整个系统的基础功能类.用于事件的调度,实现异步读取事件的句柄的设置以及错误信息的输出。比如UsageEnvironment代表了整个系统运行的环境,它提供了错误记录和错误报告的功能,无论哪一个类要输出错误,就需要保存UsageEnvironment的指针.而TaskScheduler则提供了任务调度功能.整个程序的运行发动机就是它,它调度任务,执行任务(任务就是一个函数).TaskScheduler由于在全局中只有一个,所以保存在了UsageEnvironment中.而所有的类又都保存了UsageEnvironment的指针,所以谁想把自己的任务加入调度中,那是很容易的.在此还看到一个结论:整个live555(服务端)只有一个线程.
类HashTable:实现了哈稀表.定义了一个通用的hash表,其它代码要用到这个表。
liveMedia库中有一系列类,基类是Medium,这些类针对不同的流媒体类型和编码。
基于liveMedia的程序,需要通过继承UsageEnvironment抽象类和TaskScheduler抽象类,定义相应的类来处理事件调度,数据读写以及错误处理。live项目的源代码里有这些类的一个基本实现,这就是“BasicUsageEnvironment”库。BasicUsageEnvironment主要是针对简单的控制台应用程序,利用select实现事件获取和处理。这个库利用Unix或者Windows的控制台作为输入输出,出于应用程序原形或者调试的目的,用户可以用这个库开发传统的运行与控制台的应用。
类DelayQueue:译为"延迟队列",它是一个队列,每一项代表了一个要调度的任务(在它的fToken变量中保存).同时保存了这个任务离执行时间点的剩余时间.可以预见,它就是在TaskScheduler中用于管理调度任务的东西.注意,此队列中的任务只被执行一次!执行完后这一项即被无情抛弃!
类HandlerSet:Handler集合.Handler是什么呢?它是一种专门用于执行socket操作的任务(函数),HandlerSet被TaskScheduler用来管理所有的socket任务(增删改查).所以TaskScheduler中现在已调度两种任务了:socket任务(handlerSet)和延迟任务(DelayQueue).其实TaskScheduler还调度第三种任务:Event,这个后面再说.
类Groupsock:这个是放在单独的库Groupsock中。它封装了socket操作,增加了多播支持和一对多单播的功能.但好像不支持TCP。它管理着一个本地socket和多个目的地址,因为是UDP,所以只需知道对方地址和端口即可发送数据。Groupsock的构造函数有一个参数是struct in_addr const& groupAddr,在构造函数中首先会调用父类构造函数创建socket对象,然后判断这个地址,若是多播地址,则加入多播组。Groupsock的两个成员变量destRecord* fDests和DirectedNetInterfaceSet fMembers都表示目的地址集和,但貌似这个变量DirectedNetInterfaceSet fMembers没有用到,且DirectedNetInterfaceSet是一个没有被继承的虚类,看起来fMembers没有什么用。仅fDesk也够用了,在addDestination()和removeDestination()函数中就是操作fDesk,添加或删除目的地址。
2,基本概念 先来熟悉在liveMedia库中Source,Sink以及Filter等概念。Sink就是消费数据的对象,比如把接收到的数据存储到文件,这个文件就是一个Sink。Source就是生产数据的对象,比如通过RTP读取数据。数据流经过多个’source’和’sinks’,下面是一个示例: source1’ -> ‘source2’ (a filter) -> ‘source3’ (a filter) -> ‘sink’ 从其它Source接收数据的source也叫做"filters"。Module是一个sink或者一个filter。数据接收的终点是Sink类,MediaSink是所有Sink类的基类。Sink类实现对数据的处理是通过实现纯虚函数continuePlaying(),通常情况continuePlaying调用fSource -> getNextFrame来为Source设置数据缓冲区,处理数据的回调函数等,fSource是MediaSink的类型为FramedSource*的类成员。
3,计划任务(TaskScheduler)深入探讨
我们且把三种任务命名为:socket handler,event handler,delay task。
这三种任务的特点是,前两个加入执行队列后会一直存在,而delay task在执行完一次后会立即弃掉。
socket handler保存在队列BasicTaskScheduler0::HandlerSet* fHandlers中;
event handler保存在数组BasicTaskScheduler0::TaskFunc *
fTriggeredEventHandlers[MAX_NUM_EVENT_TRIGGERS]中;
delay task保存在队列BasicTaskScheduler0::DelayQueue fDelayQueue中。
下面看一下三种任务的执行函数的定义: socket handler为 typedef void BackgroundHandlerProc(void* clientData, int mask); event handler为 typedef void TaskFunc(void* clientData); delay task 为 typedef void TaskFunc(void* clientData);//跟event handler一样。
这里主要分析一下,live555中关于RTP打包发送的部分。在处理完PLAY命令之后,就开始发送RTP数据包了(其实在发送PLAY命令的response包之前,就会发送一个RTP包,这里传输就已经开始了)
先介绍下主要的流程:RTP包的发送是从MediaSink::startPlaying函数调用开始的,在startPlaying函数的最后会调用函数continuePlaying。continuePlaying函数是定义在MediaSink类中的纯虚函数,需要到特定媒体的sink子类中实现,对于H264来讲是在H264VideoRTPSink中实现的。H264VideoRTPSink:continuePlaying中会创建一个辅助类H264FUAFragmenter,用于H264的RTP打包。因为H264的RTP包,有些特殊需要进一步处理。接着调用其父类MultiFramedRTPSink::continuePlaying实现。
在函数MultiFramedRTPSink::continuePlaying中主要是调MultiFramedRTPSink::buildAndSendPacket函数。
看名字就知道MultiFramedRTPSink::buildAndSendPacket()函数将完成打包并发送工作。传递了一个True参数,表示这是第一个packet。在该函数的最后调用了MultiFramedRTPSink::packFrame(),进一步的工作,在packFrame函数中进行,MultiFramedRTPSink::packFrame()将为RTP包填充数据。
packFrame函数需要处理两种情况:
buffer中存在未发送的数据(overflow data),这时可以将调用afterGettingFrame1函数进行后续处理工作。 buffer不存在数据,这时需要调用source上的getNextFrame函数获取数据。getNextFrame调用时,参数中有两个回调用函数:afterGettingFrame函数将在获取到数据后调用,其中只是简单的调用了afterGettingFrame1函数而已;ourHandleClosure函数将在数据已经处理完毕时调用,如文件结束。 可以看出,两种情况下最后都是要调用afterGettingFrame1函数,afterGettingFrame1函数的复杂之处在于处理frame的分片,若一个frame大于TCP/UDP有效载荷(程序中定义为1448个字节),就必需分片了。最简单的情况就是一个packet(RTP包)中最多只充许一个frame,即一个RTP包中存在一个frame或者frame的一个分片,H264就是这样处理的:方法是将剩余的数据记录为buffer的溢出部分。下次调用packFrame函数时,直接从溢出部分复制到packet中。不过应该注意,一个frame的大小不能超过buffer的大小(默认为60000),否则会真的溢出, 那就应该考虑增加buffer大小了。
处理完相关分片信息,将会调用函数MultiFramedRTPSink::sendPacketIfNecessary()来发送数据包。sendPacketIfNecessary()中函数处理一些发送的细节,如packet中含有Frame则调用RTPInterface::sendPacket函数发送packet。并将下一次RTP的发送操作加入到任务调度器中:nextTask() = envir().taskScheduler().
scheduleDelayedTask。函数参数中传递了sendNext函数指针。sendNext函数又调用了buildAndSendPacket函数,轮回了。。。!
最后来看下RTPInterface::sendPacket函数。若是使用UDP方式发送,将调用Groupsock::output函数,可以实现组播功能。groupsock只实现了UDP发送功能,当用TCP方式传送时调用sendRTPOverTcP函数,这个函数中直接调用socket的send函数。至此关于RTP打包发送的主要流程分析就差不多了,具体细节实现,可参考源代码。
原文地址:
Live555中RTP包的打包与发送过程分析
Medium live555几乎所有的处理单元都继承自Medium类;该类抽象了基本的接口,包括环境,task和媒体名和媒体查找函数(lookupByName)以及一些辅助函数。也包括返回当前的环境类UsageEnvironment,以及环境指向下一个TaskToken的指针nextTask等。
ServerMediaSession 对象的创建函数在文件DynamicRTSPServer.cpp中。DynamicRTSPServer的继承关系是
DynamicRTSPServer::RTSPServerSupportingHTTPStreaming::RTSPServer::Medium
DynamicRTSPServer从RTSPServer继承过来,仅仅添加了构造器和查找函数,没有添加其他成员。构造器是创建socket然后传给 RTSPServer,查找是如果没有已经打开的流服务,那么根据参数创建流服务。RTSPServer的属性有socket,端口号,Session点号,认证机制和ServerMediaSession表。
当接收到带有URI的请求后,会首先创建SMS,调用 ServerMediaSession的构造函数,除了创建时间戳,拷贝文件名外都是使用的缺省值,并且将初始化子会话链表。当处理describe命令时,ServerMediaSession通过调用generateSDPDescription函数生成。 在NEW_SMS()中创建ServerMediaSession对象,然后创建相应的ServerMediaSubsession并将这个子会话对象添加到添加到会话对象中。
如子回话是MPEG4,创建MPEG4VideoFileServerMediaSubsession对象,对象的继承关系:
MPEG4VideoFileServerMediaSubsession::FileServerMediaSubsession::OnDemandServerMediaSubsession::ServerMediaSubsession::Medium
相关类介绍:
ServerMediaSession:添加了子会话链表,SDP描述以及一些媒体相关处理函数。
ServerMediaSubsession:定义了指向ServerMediaSession的父指针,指向下个一个对象的指针。该媒体的SDP信息,该媒体的读取定位函数等。 ServerMediaSubsession类和具体的流播放相关,是个纯虚类。其中startStream和getStreamParameter是纯虚函数。
OnDemandServerMediaSubsession:添加了流source处理和RTPSink处理函数以及经典命名属性等。封装seek,pause等处理,把这些接口中clientSessionid号到这里转换成了FramedSource。 该类的成员函数大部分和ServerMediaSubsession相似,在流媒体完成定位等处理。createNewStreamSource和createNewRTPSink是两个纯虚函数,在子类中必须实现。类中getStreamParameters方法会创建streamState。这个方法在处理RTP的Setup命令时被调用。
FileServerMediaSubsession:增加了文件名和文件大小属性。
MPEG4VideoFileServerMediaSubsession:添加了RTPSink属性,并且实现了OnDemandServerMediaSubsession中定义的两个纯虚函数,即创建了source和sink对象。这个source是MPEG4VideoStreamFramer。该类中还定义了StreamState的内部类。
StreamState:包含了指向OnDemandServerMediaSubsession的引用,RTPSink指针,BasicUDPSink指针,RTCPInstance指针FramedSource指针,fRTPgs和fRTCPgs(groupsock).
StreamState类可以用OnDemandServerMediaSubsession的fLastStreamToken属性指向。
类streamState的属性:
OnDemandServerMediaSubsession& fMaster; Boolean fAreCurrentlyPlaying;
unsigned fReferenceCount; Port fServerRTPPort, fServerRTCPPort;
RTPSink* fRTPSink; BasicUDPSink* fUDPSink;
float fStreamDuration; unsigned fTotalBW; RTCPInstance* fRTCPInstance;
FramedSource* fMediaSource; Groupsock* fRTPgs; Groupsock* fRTCPgs;
Sink Sink类提供了总的媒体播放接口。sink有两种,一个是BasicUDPSink,一个是RTPSink,如果协商时没有RTP信息,那么创建BasicUDPSink。Source和Sink通过函数createNewRTPSink和createNewStreamSource。这两个函数在类 OnDemandServerMeidaSubsession中定义为纯虚函数,如果媒体类型是mpeg4videofileserver,那么对应的函数定义在类MPEG4VideoFileServerMediaSubsession中。
MPEG4ESVideoRTPSink::VideoRTPSink::MultiFramedRTPSink::RTPSink::MediaSink::Medium
MediaSink定义中有一个媒体源指针,主要处理函数有startplaying(),stopplaying()和 afterPlayingFunc函数指针。
RTPSink类定义了RTP相关的处理和属性。包含Socket组对象,时间处理系列,统计计数处理等相关属性。
MultiFramedRTPSink是RTPSink的子类,处理buffer中的多个RTP包。类中添加了辅助SDP处理和VOPIsPresent属性和一个判断性处理函数。
MultiFramedRTPSink类完成多帧组包处理主要函数有buildAndSendPacket,packFrame,sendNext, afterGettingFrame,这几个函数之间有相互调用。内部有OutPacketBuffer属性,在创建时设定为(1000(希望),1448(最大))大小,其他是统计或者标识属性。这个发送数据包是通过 fRTPInterface.sendPacket(fOutBuf->packet(), fOutBuf->curPacketSize());实现。这个fRTPInterface是父类RTPSink的属性。
VideoRTPSink仅仅添加了sdpMediaType处理函数, 返回SDP类型是“video”
MPEG4ESVideoRTPSink中的处理函数doSpecialFrameHandling:首先检测开头的四个字节看是否是 VOP_START_CODE,该函数处理RTP的起始/中止标识和添加时间戳。其他处理包括是否允许分片,是否是起始包判断以及辅助SDP处理。
Source createNewStreamSource调用的是MPEG4VideoFileServerMediaSubsession中的定义。在类 OnDemandServerMediaSubsession中的createNewStreamSource定义是一个纯虚函数。
创建的source是:
MPEG4VideoStreamFramer:MPEGVideoStreamFramer:FramedFilter:FramedSource:MediaSource:Medium
MediaSource在Medium类的基础上添加了更多媒体类型判断,比如是H264,mpeg还是jpeg。此外还有一个MIME类型。
1、什么是 Redis? Redis 是完全开源免费的,遵守 BSD 协议,是一个高性能的 key-value 数据库。
Redis 与其他 key - value 缓存产品有以下三个特点:
(1)Redis 支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用。
(2)Redis 不仅仅支持简单的 key-value 类型的数据,同时还提供 list,set,zset,hash 等数据结构的存储。
(3)Redis 支持数据的备份,即 master-slave 模式的数据备份。
Redis 优势
(1)性能极高 – Redis 能读的速度是 110000 次/s,写的速度是 81000 次/s 。
(2)丰富的数据类型 – Redis 支持二进制案例的 Strings, Lists, Hashes, Sets 及Ordered Sets 数据类型操作。
(3)原子 – Redis 的所有操作都是原子性的,意思就是要么成功执行要么失败完全不执行。单个操作是原子性的。多个操作也支持事务,即原子性,通过 MULTI 和 EXEC指令包起来。
(4)丰富的特性 – Redis 还支持 publish/subscribe, 通知, key 过期等等特性。
Redis 与其他 key-value 存储有什么不同?
(1)Redis 有着更为复杂的数据结构并且提供对他们的原子性操作,这是一个不同于其他数据库的进化路径。Redis 的数据类型都是基于基本数据结构的同时对程序员透明,无需进行额外的抽象。