// // Created by 倪路朋 on 10/11/24. // #include "live_pusher.h" #include "../audio_channel.h" #include "../video_channel.h" #include "../server_global.h" extern "C"{ #include #include #include } #define FLV_TAG_HEAD_LEN 11 #define FLV_PRE_TAG_LEN 4 void releasePackets(RTMPPacket **packet) { if(packet && *packet){ RTMPPacket_Free(*packet); //LOGE("释放资源"); //DELETE(packet); } packet = 0; } void *task_push_connect(void *args){ LivePusher *pusher = static_cast(args); int ret = 0; do{ LOGI("rtmp pusher 开始连接:%s",pusher->url); pusher->pRtmp = RTMP_Alloc(); if(!pusher->pRtmp){ LOGE("RTMP pusher 初始化失败"); pusher->pusherCall->onConnectFailed(ret); break; } RTMP_Init(pusher->pRtmp); pusher->pRtmp->Link.timeout = 5;//设置连接的超时时间 if(!RTMP_SetupURL(pusher->pRtmp,pusher->url)){ LOGE("rtmp pusher 设置地址失败:%d, url: %s", ret, pusher->url); pusher->pusherCall->onConnectFailed(ret); break; } // 启用 Live 模式(低延迟模式) pusher->pRtmp->Link.lFlags |= RTMP_LF_LIVE; // 设置可写 RTMP_EnableWrite(pusher->pRtmp); if(!RTMP_Connect(pusher->pRtmp,NULL)){ LOGE("rtmp pusher 建立连接失败 %d", ret); pusher->pusherCall->onConnectFailed(1000); break; } if(!RTMP_ConnectStream(pusher->pRtmp,0)){ LOGE("rtmp pusher 连接流失败 %d", ret); pusher->pusherCall->onConnectFailed(1001); break; } //后面要对安全队列进行取数据的操作了 pusher->streamPackets.setWork(1); LOGI("task_push_connect safequeue packets:%d",&pusher->streamPackets); pusher->isLive = 1; LOGI("rtmp pusher 准备完毕"); pusher->pusherCall->onConnected(); pusher->streamPackets.push(audioChannel->getAudioSeqHeader()); pusher->streamPackets.push(videoChannel->getSpsPps()); } while (0); return 0; } void task_push_data(LivePusher *pusher,SafeQueue *packets){ RTMPPacket *packet = 0; //LOGI("task_push_data safequeue pusher:%s ",pusher); //LOGI("task_push_data safequeue videoPackets:%d ",&pusher->videoPackets); //LOGI("task_push_data safequeue packets:%d ",packets); while (pusher->isLive >= 0){ //LOGE("RTMP1 result:%d isLive:%d %d",result,pusher->isLive,packets); if(pusher->isLive == -1){ break; } if(pusher->isLive == 0){ continue; } int result = packets->pop(packet); if(!result || !packet){ LOGE("RTMP1 提取消息失败 %d %d",packets,result); continue; } //准备发送 pusher->pushState = 0; if(packet->m_packetType == RTMP_PACKET_TYPE_AUDIO && packet->m_body[1] == 0x00){ LOGE("RTMP1 发送音频头信息 %d",packets); } packet->m_nInfoField2 = pusher->pRtmp->m_stream_id; //LOGI("RTMP 发送 %d type:%d",pusher,packet->m_packetType); int64_t t = getCurrentTimestamp(); result = RTMP_SendPacket(pusher->pRtmp,packet,1); pusher->pushState = 1; //LOGI("RTMP 发送 result:%d m_body:%d m_packetType:%d",result,packet->m_body ,packet->m_packetType); //LOGI("RTMP 发送完成 %d type:%d",pusher,packet->m_packetType); if(packet->m_packetType == 0x09 ){ //LOGI("RTMP 发给 m_nBodySize:%d 耗时:%d",packet->m_nBodySize,getCurrentTimestamp() - t); } if(result <= 0){ //发送失败 LOGE("RTMP pusher 发送失败 %d type:%d m_nBodySize:%d",result,packet->m_packetType,packet->m_nBodySize); if(packet->m_packetType == 0x09 ){ if(pusher->pushFailedCount > 10*30){ pusher->isLive = -1; pusher->pusherCall->onSendFailed(1003); } pusher->pushFailedCount++; } if(!RTMP_IsConnected(pusher->pRtmp)){ LOGE("RTMP pusher 网络断连"); pusher->isLive = -1; pusher->pusherCall->onConnectFailed(1001); } }else{ //LOGI("RTMP 发给 m_nBodySize:%d 耗时:%d",packet->m_nBodySize,getCurrentTimestamp() - t); pusher->pushFailedCount = 0; } //LOGI("url:%s,packets:%d",pusher->url,packets->size()); RTMPPacket_Free(packet);//释放内存 delete packet; } packets->setWork(0); packets->clear(); LOGE("RTMP1 pusher releasePackets isLive:%d",pusher->isLive); } void *task_push_stream(void *args){ LivePusher *pusher = static_cast(args); //LOGI("task_push_audio safequeue pusher:%d videoPackets:%d",pusher,&pusher->videoPackets); task_push_data(pusher,&pusher->streamPackets); return 0; } LivePusher::LivePusher(JNIEnv *env_, jobject instance_) { pusherCall = new LivePusherCall(env_,instance_); //准备一个安全队列 把数据放入队列,在统一的线程当中取出数据 再发送给服务器 streamPackets.setReleaseCallback(releasePackets); LOGI("LivePusher safequeue pusher:%d ",this); LOGI("LivePusher safequeue videoPackets:%d",&streamPackets); pthread_mutex_init(&mutex,0); pthread_mutex_init(&mutexConnect,0); } LivePusher::~LivePusher() { isLive = -1; DELETE(pusherCall); DELETE(url); // 释放资源 pthread_mutex_destroy(&mutex); pthread_mutex_destroy(&mutexConnect); } void LivePusher::connectRtmp(char *url) { this->url = url; isLive = 0; int re2 = pthread_create(&connect_start,0,task_push_connect,this); LOGI("connectRtmp safequeue re:%d packets:%d",re2,&streamPackets); pthread_create(&stream_start,0,task_push_stream,this); //LOGI("connectRtmp safequeue re:%d packets:%d",re,&audioPackets); } void LivePusher::reConnect() { releaseLive(); isLive = 0; int re2 = pthread_create(&connect_start,0,task_push_connect,this); LOGI("connectRtmp safequeue re:%d packets:%d",re2,&streamPackets); pthread_create(&stream_start,0,task_push_stream,this); } void LivePusher::pushYUV(RTMPPacket *packet) { int64_t t = getCurrentTimestamp(); //LOGI("RTMP 发给x264解码"); if(isLive != 1){ return; } RTMPPacket *dstPacket = new RTMPPacket ; int result = CopyRTMPPacket(dstPacket,packet); //LOGE("result:%d",result); streamPackets.push(dstPacket); //LOGI("RTMP 发给x264解码 耗时:%d",getCurrentTimestamp() - t); } void LivePusher::pushPCM(RTMPPacket *packet) { if( isLive != 1){ return; } RTMPPacket *dstPacket= new RTMPPacket ; CopyRTMPPacket(dstPacket,packet); streamPackets.push(dstPacket); } void LivePusher::releaseLive() { //等待线程退出 pthread_join(connect_start, 0); streamPackets.setWork(0); pthread_join(stream_start, 0); if(pRtmp){ RTMP_Close(pRtmp); } } void LivePusher::closeLive() { isLive = -1; releaseLive(); RTMP_Free(pRtmp); pusherCall->onClosed(); LOGI("直播关闭"); if(recycleCallback){ recycleCallback(this); } }