// // Created by 倪路朋 on 10/11/24. // #include "live_pusher.h" #include "../server_global.h" extern "C"{ #include #include #include } #define FLV_TAG_HEAD_LEN 11 #define FLV_PRE_TAG_LEN 4 void *task_push_connect(void *args){ LivePusher *pusher = static_cast(args); int ret = 0; do{ LOGI("rtmp pusher 开始连接:%s",pusher->url); pusher->pRtmp = RTMP_Alloc(); if(!pusher->pRtmp){ LOGE("RTMP pusher 初始化失败"); pusher->pusherCall->onConnectFailed(ret); break; } RTMP_Init(pusher->pRtmp); pusher->pRtmp->Link.timeout = 5;//设置连接的超时时间 if(!RTMP_SetupURL(pusher->pRtmp,pusher->url)){ LOGE("rtmp pusher 设置地址失败:%d, url: %s", ret, pusher->url); pusher->pusherCall->onConnectFailed(ret); break; } // 启用 Live 模式(低延迟模式) pusher->pRtmp->Link.lFlags |= RTMP_LF_LIVE; // 设置可写 RTMP_EnableWrite(pusher->pRtmp); if(!RTMP_Connect(pusher->pRtmp,NULL)){ LOGE("rtmp pusher 建立连接失败 %d", ret); pusher->pusherCall->onConnectFailed(1000); break; } if(!RTMP_ConnectStream(pusher->pRtmp,0)){ LOGE("rtmp pusher 连接流失败 %d", ret); pusher->pusherCall->onConnectFailed(1001); break; } pusher->isLive = 1; LOGI("rtmp pusher 准备完毕"); pusher->pusherCall->onConnected(); pusher->sendStreamHeader(); pusher->sendSpsPps(); pusher->sendAudioHeader(); } while (0); return 0; } LivePusher::LivePusher(JNIEnv *env_, jobject instance_) { pusherCall = new LivePusherCall(env_,instance_); //准备一个安全队列 把数据放入队列,在统一的线程当中取出数据 再发送给服务器 pthread_mutex_init(&mutex,0); pthread_mutex_init(&mutexConnect,0); } LivePusher::~LivePusher() { isLive = -1; DELETE(pusherCall); DELETE(url); // 释放资源 pthread_mutex_destroy(&mutex); pthread_mutex_destroy(&mutexConnect); } void LivePusher::connectRtmp(char *url) { this->url = url; isLive = 0; int re2 = pthread_create(&connect_start,0,task_push_connect,this); } void LivePusher::reConnect() { releaseLive(); isLive = 0; int re2 = pthread_create(&connect_start,0,task_push_connect,this); } void LivePusher::sendStreamHeader() { uint32_t offset = 0; char buffer[512]; char *output = buffer; char *outend = buffer + sizeof(buffer); char send_buffer[512]; output = AMF_EncodeString(output, outend, &av_onMetaData); *output++ = AMF_ECMA_ARRAY; output = AMF_EncodeInt32(output, outend, 5); output = AMF_EncodeNamedNumber(output, outend, &av_width, FRAME_WIDTH); output = AMF_EncodeNamedNumber(output, outend, &av_height, FRAME_HEIGHT); output = AMF_EncodeNamedNumber(output, outend, &av_framerate, FRAME_PS); output = AMF_EncodeNamedNumber(output, outend, &av_duration, 0.0); output = AMF_EncodeNamedNumber(output, outend, &av_videoframerate, FRAME_RATE); output = AMF_EncodeNamedNumber(output, outend, &av_videocodecid, 7);// 7 表示 H.264 output = AMF_EncodeNamedNumber(output, outend, &av_audiocodecid, 10);// 10 表示 AAC output = AMF_EncodeInt24(output, outend, AMF_OBJECT_END); int body_len = output - buffer; int output_len = body_len + FLV_TAG_HEAD_LEN + FLV_PRE_TAG_LEN; send_buffer[offset++] = 0x12; //tagtype scripte send_buffer[offset++] = (uint8_t) (body_len >> 16); //data len send_buffer[offset++] = (uint8_t) (body_len >> 8); //data len send_buffer[offset++] = (uint8_t) (body_len); //data len send_buffer[offset++] = 0; //time stamp send_buffer[offset++] = 0; //time stamp send_buffer[offset++] = 0; //time stamp send_buffer[offset++] = 0; //time stamp send_buffer[offset++] = 0x00; //stream id 0 send_buffer[offset++] = 0x00; //stream id 0 send_buffer[offset++] = 0x00; //stream id 0 memcpy(send_buffer + offset, buffer, body_len); RTMP_Write(pRtmp, send_buffer, output_len); } void LivePusher::sendSpsPps() { uint8_t *data;int size; mediaEncoderCall->getSpsPps(&data,&size); int length = 0 ; int sps_len = 0,pps_len = 0; uint8_t sps[100]; uint8_t pps[100]; for(int i = 0 ; i < size ; i ++){ if(data[i] == 0x00 && data[i+1] == 0x00 && data[i+2] == 0x00 && data[i+3] == 0x01){ if(i > 0){ sps_len = length; } i += 4; length = 0 ; } length ++ ; if(sps_len == 0){ sps[i-4] = data[i]; }else if(pps_len == 0){ pps[i - sps_len - 8] = data[i]; } } pps_len = length; LOGE("sps_len:%d,pps_len:%d,size:%d",sps_len,pps_len,size); char *output ; int body_size = 5 + 8 + 3 + sps_len + pps_len; uint32_t output_len = body_size + FLV_TAG_HEAD_LEN + FLV_PRE_TAG_LEN; output = (char *)malloc(output_len); uint32_t time = getCurrentTimestamp(); int i = 0; // flv tag header output[i++] = 0x09; //tagtype video addHeader(output,body_size,time); i = FLV_TAG_HEAD_LEN; //flv VideoTagHeader output[i++] = 0x17; //key frame, AVC output[i++] = 0x00; //avc sequence header output[i++] = 0x00; //composit time ?????????? output[i++] = 0x00; // composit time output[i++] = 0x00; //composit time //flv VideoTagBody --AVCDecoderCOnfigurationRecord output[i++] = 0x01; //configurationversion output[i++] = sps[1]; //avcprofileindication output[i++] = sps[2]; //profilecompatibilty output[i++] = sps[3]; //avclevelindication output[i++] = 0xff; //reserved + lengthsizeminusone output[i++] = 0xe1; //numofsequenceset output[i++] = (uint8_t)(sps_len >> 8); //sequence parameter set length high 8 bits output[i++] = (uint8_t)(sps_len); //sequence parameter set length low 8 bits memcpy(&output[i], sps, sps_len); //H264 sequence parameter set i += sps_len; output[i++] = 0x01; //numofpictureset output[i++] = (uint8_t)(pps_len >> 8); //picture parameter set length high 8 bits output[i++] = (uint8_t)(pps_len); //picture parameter set length low 8 bits memcpy(&output[i], pps, pps_len); //H264 picture parameter set i+= pps_len;//拷贝完pps数据 ,i移位 uint32_t fff = body_size + FLV_TAG_HEAD_LEN; addFooter(output,i,fff); int val = RTMP_Write(pRtmp, output, output_len); //RTMP Send out free(output); } void LivePusher::sendAudioHeader() { uint8_t *data;int size; mediaEncoderCall->getAudioSeqData(&data,&size); if(size == 0){ data = new uint8_t[2]; data[0] = 0x12; data[1] = 0x08; } pushAAC(data,2,RTMP_GetTime() - start_time); free(data); } void LivePusher::pushAAC(uint8_t *data, int size, uint32_t time) { //LOGE("sendFrame pushAAC 0x%02X 0x%02X 0x%02X 0x%02X %d",data[0],data[1],data[2],data[3],size); if(isLive != 1){ return; } pthread_mutex_lock(&mutex); int val = 0; uint32_t audio_ts = time; uint32_t body_len; uint32_t output_len; char *output ; body_len = 2 + size; //aac header + raw data size // adts_len - AAC_ADTS_HEADER_SIZE; // audito tag header + adts_len - remove adts header + AudioTagHeader output_len = body_len + FLV_TAG_HEAD_LEN + FLV_PRE_TAG_LEN; output = (char *)malloc(output_len); // flv tag header output[0] = 0x08; //tagtype audio addHeader(output,body_len,time); uint32_t offset = FLV_TAG_HEAD_LEN; //flv AudioTagHeader output[offset++] = 0xAF; // SoundFormat: AAC, SoundRate: 44.1kHz, SoundSize: 16-bit, SoundType: Stereo output[offset++] = data[0] == 0x12 ? 0x00:0x01; //aac raw data memcpy(output + offset, data, size); // data + AAC_ADTS_HEADER_SIZE -> data, // (adts_len - AAC_ADTS_HEADER_SIZE) -> size //previous tag size addFooter(output,offset += size,body_len + FLV_TAG_HEAD_LEN); val = RTMP_Write(pRtmp, output, output_len); free(output); pthread_mutex_unlock(&mutex); } void LivePusher::pushH264(uint8_t *data, int length,uint32_t time) { if(pRtmp && isLive == 1 && !RTMP_IsConnected(pRtmp)){ LOGE("RTMP pusher 网络断连"); isLive = -1; reConnect(); return ; } int64_t t = getCurrentTimestamp(); if(isLive != 1){ return; } //LOGI("LivePusher pushH264: length:%d flags:%d ",length,flags); pthread_mutex_lock(&mutex); uint8_t * buf; uint8_t * buf_offset; int val = 0; //int total; uint32_t nal_len; uint32_t nal_len_n; uint8_t *nal; uint8_t *nal_n; buf = data; buf_offset = data; //total = size; nal = get_nal(&nal_len, &buf_offset, buf, length); //LOGE("nal = %d,nal_len = %d",nal,nal_len); if (nal == NULL) { pthread_mutex_unlock(&mutex); return; } //int body_len = nal_len + 5 + 4; //flv VideoTagHeader + NALU length uint32_t body_len = length + 5; uint32_t output_len = body_len + FLV_TAG_HEAD_LEN + FLV_PRE_TAG_LEN; char *output = (char *)malloc(output_len); if (!output) { LOGE("Memory is not allocated..."); } // flv tag header output[0] = 0x09; //tagtype video addHeader(output,body_len,time); uint32_t offset = FLV_TAG_HEAD_LEN; //flv VideoTagHeader if ((nal[0] & 0x1f) == 0x05) // it can be 25,45,65 { output[offset++] = 0x17; //key frame, AVC } else if ((nal[0] & 0x1f) == 0x01) // itcan be 21,41,61 { output[offset++] = 0x27; //not key frame, AVC } output[offset++] = 0x01; //avc NALU unit output[offset++] = 0x00; //composit time ?????????? output[offset++] = 0x00; // composit time output[offset++] = 0x00; //composit time addFooter(output,offset,nal_len); offset += 4; memcpy(output + offset, nal, nal_len); offset += nal_len; while ((nal = get_nal(&nal_len, &buf_offset, buf, length)) != NULL) { addFooter(output,offset,nal_len); offset += 4; memcpy(output + offset, nal, nal_len); offset += nal_len; } addFooter(output,offset,body_len + FLV_TAG_HEAD_LEN); pushState = 0; int result = RTMP_Write(pRtmp, output, output_len); //RTMP Send out free(output); val += result; pushState = 1; pthread_mutex_unlock(&mutex); //LOGI("RTMP 发送h264 耗时:%d",getCurrentTimestamp() - t); if(val < 0 ){ isLive = -1; LOGE("发送数据流失败 %d",val); pusherCall->onSendFailed(1003); } } void LivePusher::addHeader(char *output,int body_size, uint32_t time) { int offset = 1; output[offset++] = (uint8_t) (body_size >> 16) & 0xFF; //data len output[offset++] = (uint8_t) (body_size >> 8) & 0xFF; //data len output[offset++] = (uint8_t) (body_size & 0xFF); //data len output[offset++] = (uint8_t) (time >> 16) & 0xFF; //time stamp output[offset++] = (uint8_t) (time >> 8) & 0xFF; //time stamp output[offset++] = (uint8_t) (time) & 0xFF; //time stamp output[offset++] = (uint8_t) (time >> 24) & 0xFF; //time stamp output[offset++] = 0; //stream id 0 output[offset++] = 0x00; //stream id 0 output[offset++] = 0x00; //stream id 0 } void LivePusher::addFooter(char *output, int index,int fff) { //previous tag size int offset = index; // (adts_len - AAC_ADTS_HEADER_SIZE); output[offset++] = (uint8_t) (fff >> 24) & 0xFF; //data len output[offset++] = (uint8_t) (fff >> 16) & 0xFF; //data len output[offset++] = (uint8_t) (fff >> 8) & 0xFF; //data len output[offset++] = (uint8_t) (fff & 0xFF); //data len } void LivePusher::releaseLive() { if(pRtmp){ RTMP_Close(pRtmp); } } void LivePusher::closeLive() { isLive = -1; releaseLive(); RTMP_Free(pRtmp); pusherCall->onClosed(); LOGI("直播关闭"); if(recycleCallback){ recycleCallback(this); } }