#include "RtmpPuller.h" RtmpPuller::RtmpPuller() :mFrameIndex(0), mAudioIndex(-1),mVideoIndex(-1) , mAudioStream(nullptr),mVideoStream(nullptr) { av_register_all(); //Network avformat_network_init(); //Input } int RtmpPuller::ConnectServer(const char *p) { int ret = 0; if ((ret = avformat_open_input(&mIfmtCtx, p, 0, 0)) < 0) { printf("Could not open input file."); return -1; } if ((ret = avformat_find_stream_info(mIfmtCtx, 0)) < 0) { printf("Failed to retrieve input stream information"); return -1; } for (int i = 0; i < mIfmtCtx->nb_streams; i++) { if (mIfmtCtx->streams[i]->codec->codec_type == AVMEDIA_TYPE_VIDEO) { mVideoIndex = i; } if (mIfmtCtx->streams[i]->codec->codec_type == AVMEDIA_TYPE_AUDIO) { mAudioIndex = i; } } if(mAudioIndex > -1) this->mAudioStream = mIfmtCtx->streams[mAudioIndex]; if(mVideoIndex > -1) this->mVideoStream = mIfmtCtx->streams[mVideoIndex]; av_dump_format(mIfmtCtx, 0, p, 0); mH264bsfc = av_bitstream_filter_init("h264_mp4toannexb"); mStatus = RUNNING; if((mAudioIndex == -1 ) &&(mVideoIndex == -1)) mStatus = NOSOURCE; return 0; } int ThreadPull(RtmpPuller*p) { while (p->Status() == RtmpPuller::CAP_STATUS::RUNNING) { p->PullData(); } return 0; } int RtmpPuller::StartPull() { this->mThread = new std::thread(ThreadPull, this); this->mThread->get_id(); mStatus = RUNNING; return 0; } int RtmpPuller::PullData() { static int drop = 0; AVStream *in_stream; //Get an AVPacket int ret = av_read_frame(mIfmtCtx, &pkt); if (ret < 0) return -1; in_stream = mIfmtCtx->streams[pkt.stream_index]; /* copy packet */ //Convert PTS/DTS pkt.pts = av_rescale_q_rnd(pkt.pts, in_stream->time_base, in_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX)); pkt.dts = av_rescale_q_rnd(pkt.dts, in_stream->time_base, in_stream->time_base, (AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX)); pkt.duration = av_rescale_q(pkt.duration, in_stream->time_base, in_stream->time_base); pkt.pos = -1; //Print to Screen if (drop < 100) { drop++; goto end; } if (pkt.stream_index == mVideoIndex) { printf("Receive %8d video frames from input URL\n", mFrameIndex); mFrameIndex++; av_bitstream_filter_filter(mH264bsfc, in_stream->codec, NULL, &pkt.data, &pkt.size, pkt.data, pkt.size, 0); if (mObserver.size() > 0) { for (auto itr = this->mObserver.begin(); itr != mObserver.end(); itr++) { RtmpPullObserver *p = (RtmpPullObserver *)*itr; if (p->mObserverType == RtmpPullObserver::Observer_Video) { p->OnRtmpFrame(pkt.data, pkt.size); } } } } if (pkt.stream_index == mAudioIndex) { if (mObserver.size() > 0) { for (auto itr = this->mObserver.begin(); itr != mObserver.end(); itr++) { RtmpPullObserver *p = (RtmpPullObserver *)*itr; if (p->mObserverType == RtmpPullObserver::Observer_Audio) { p->OnRtmpFrame(pkt.data, pkt.size); } } } } end: //printf("%02x %02x %02x %02x %02x\r\n", pkt.data[0], pkt.data[1], pkt.data[2], pkt.data[3], pkt.data[4]); av_free_packet(&pkt); } int RtmpPuller::SetObserver(RtmpPullObserver *p) { if (nullptr == p) return -1; mMux.lock(); for (auto itr = this->mObserver.begin(); itr != mObserver.end(); itr++) { if (p == *itr) return 0; } this->mObserver.push_back(p); mMux.unlock(); return 0; } RtmpPuller::CAP_STATUS RtmpPuller::Status() { return this->mStatus; } AVStream * RtmpPuller::AudioStream() { return this->mAudioStream; }