merge dev

This commit is contained in:
tqcq 2024-09-04 10:32:25 +08:00
commit cf9baba428
19 changed files with 3170 additions and 872 deletions

5
.gitignore vendored
View File

@ -1,5 +1,6 @@
.gdb_histroy
docs/
build*
.gdb_history
.vscode/
build/

10
doxide.yaml Normal file
View File

@ -0,0 +1,10 @@
title: AW-Security-Media-Lib
description:
files:
- "src/HuaWei/*.h"
- "src/GB28181/*.h"
- "src/DecEnc/*.h"
defines:
API_EXPORT: ""

1682
include/sigslot.h Normal file

File diff suppressed because it is too large Load Diff

44
mkdocs.yaml Normal file
View File

@ -0,0 +1,44 @@
site_name: ASM
site_description:
theme:
name: material
# custom_dir: docs/overrides
features:
- navigation.indexes
palette:
# Palette toggle for light mode
- scheme: default
primary: red
accent: red
toggle:
icon: material/brightness-7
name: Switch to dark mode
# Palette toggle for dark mode
- scheme: slate
primary: red
accent: red
toggle:
icon: material/brightness-4
name: Switch to light mode
markdown_extensions:
- def_list
- attr_list
- admonition
- pymdownx.details
- pymdownx.superfences
- pymdownx.arithmatex:
generic: true
- pymdownx.emoji:
emoji_index: !!python/name:material.extensions.emoji.twemoji
emoji_generator: !!python/name:material.extensions.emoji.to_svg
plugins:
- search
extra_css:
- stylesheets/doxide.css
extra_javascript:
- javascripts/mathjax.js
- https://polyfill.io/v3/polyfill.min.js?features=es6
- https://cdn.jsdelivr.net/npm/mathjax@3/es5/tex-mml-chtml.js

View File

@ -1,209 +1,210 @@
#include "L2PSstream.h"
#include "L2PSstream.h"
#include "StreamCodec.h"
#include "mpeg-ps.h"
#include "mpeg-ts-proto.h"
#include "StreamCodec.h"
static inline void nbo_w16(uint8_t* ptr, uint16_t val)
static inline void
nbo_w16(uint8_t *ptr, uint16_t val)
{
ptr[0] = (uint8_t)((val >> 8) & 0xFF);
ptr[1] = (uint8_t)(val & 0xFF);
ptr[0] = (uint8_t) ((val >> 8) & 0xFF);
ptr[1] = (uint8_t) (val & 0xFF);
}
L2PSstream::L2PSstream(sec_set_info* set_info):L2UdpStream(set_info)
L2PSstream::L2PSstream(sec_set_info *set_info) : L2UdpStream(set_info)
{
_ps_demuxer = ps_demuxer_create([](void* param,
int stream,
int codecid,
int flags,
int64_t pts,
int64_t dts,
const void* data,
size_t bytes){
L2PSstream *thiz = (L2PSstream *)param;
thiz->on_ps_decode(stream, codecid,thiz->_ps_demuxer, flags, pts, dts, data, bytes);
return 0;
},this);
_ps_demuxer = ps_demuxer_create(
[](void *param, int stream, int codecid, int flags, int64_t pts, int64_t dts, const void *data, size_t bytes) {
L2PSstream *thiz = (L2PSstream *) param;
thiz->on_ps_decode(stream, codecid, thiz->_ps_demuxer, flags, pts, dts, data, bytes);
return 0;
},
this);
}
L2PSstream::~L2PSstream()
{
ps_demuxer_destroy((struct ps_demuxer_t*)_ps_demuxer);
}
L2PSstream::~L2PSstream() { ps_demuxer_destroy((struct ps_demuxer_t *) _ps_demuxer); }
void L2PSstream::on_rtp_out(const uint8_t *packet, int bytes, uint32_t timestamp, int flags){
// TRACEL("timestamp:%u, bytes:%u, flag:%d\n",timestamp,bytes,flags);
ps_demuxer_input((struct ps_demuxer_t*)_ps_demuxer,packet,bytes);
void
L2PSstream::on_rtp_out(const uint8_t *packet, int bytes, uint32_t timestamp, int flags)
{
TRACEL("timestamp:%u, bytes:%u, flag:%d\n", timestamp, bytes, flags);
ps_demuxer_input((struct ps_demuxer_t *) _ps_demuxer, packet, bytes);
};
void L2PSstream::on_ps_decode(int stream, int codecid, void *ps, int flags, int64_t pts, int64_t dts, const void *data, int bytes){
void
L2PSstream::on_ps_decode(int stream,
int codecid,
void *ps,
int flags,
int64_t pts,
int64_t dts,
const void *data,
int bytes)
{
// TRACEL("dts:%lu, bytes:%d, codecid:%d\n",dts,bytes,codecid);
auto prefix=prefixSize((char*)data,bytes);
auto prefix = prefixSize((char *) data, bytes);
pts /= 90;
dts /= 90;
pts%=((int64_t)UINT32_MAX+1);
dts%=((int64_t)UINT32_MAX+1);
switch (codecid)
{
pts %= ((int64_t) UINT32_MAX + 1);
dts %= ((int64_t) UINT32_MAX + 1);
switch (codecid) {
case PSI_STREAM_H264:
_code_id=CodecId::CodecH264;
stream_split((char*)data,bytes,prefix,[&,dts](const char *ptr, int len, int prefix){
decodeH264(ptr,len,dts,prefix);
});
_code_id = CodecId::CodecH264;
stream_split((char *) data, bytes, prefix,
[&, dts](const char *ptr, int len, int prefix) { decodeH264(ptr, len, dts, prefix); });
break;
case PSI_STREAM_H265:
_code_id=CodecId::CodecH265;
stream_split((char*)data,bytes,prefix,[&](const char *ptr, int len, int prefix){
decodeH265(ptr,len,dts,prefix);
});
_code_id = CodecId::CodecH265;
stream_split((char *) data, bytes, prefix,
[&](const char *ptr, int len, int prefix) { decodeH265(ptr, len, dts, prefix); });
break;
default:
break;
}
}
bool L2PSstream::splitPES(const uint8_t * data, uint16_t len,const std::function<bool(const char *,const char *, int, int)> &cb){
auto prefix=prefixSize((char*)data,len);
uint8_t ned[]={0x00,0x00,0x01,0xe0};
auto start =(uint8_t*)memmem(data, len,ned,sizeof(ned));
char *pes_head=nullptr;
bool ret=false;
ret= stream_split_nd((char*)start,len-(start-data),prefix,ned,sizeof(ned),[&](const char *ptr, int len, int prefix){
pes_head=(char*)ptr;
stream_split((char*)ptr,len,prefix,[&](const char *ptr, int len, int prefix){
//INFOL("%02X %02X %02X %02X %02X \n",(uint8_t)ptr[0],(uint8_t)ptr[1],(uint8_t)ptr[2],(uint8_t)ptr[3],(uint8_t)ptr[4]);
if(cb){
ret=cb(ptr,pes_head,len,prefix);
}else
{
throw runtime_error("empty callback function");
}
bool
L2PSstream::splitPES(const uint8_t *data,
uint16_t len,
const std::function<bool(const char *, const char *, int, int)> &cb)
{
auto prefix = prefixSize((char *) data, len);
uint8_t ned[] = {0x00, 0x00, 0x01, 0xe0};
auto start = (uint8_t *) memmem(data, len, ned, sizeof(ned));
char *pes_head = nullptr;
bool ret = false;
ret = stream_split_nd(
(char *) start, len - (start - data), prefix, ned, sizeof(ned), [&](const char *ptr, int len, int prefix) {
pes_head = (char *) ptr;
stream_split((char *) ptr, len, prefix, [&](const char *ptr, int len, int prefix) {
//INFOL("%02X %02X %02X %02X %02X \n",(uint8_t)ptr[0],(uint8_t)ptr[1],(uint8_t)ptr[2],(uint8_t)ptr[3],(uint8_t)ptr[4]);
if (cb) {
ret = cb(ptr, pes_head, len, prefix);
} else {
throw runtime_error("empty callback function");
}
});
return ret;
});
return ret;
});
if(!ret && start){
record_PES_head(start,len-(start-data));
}
if (!ret && start) { record_PES_head(start, len - (start - data)); }
return ret;
}
void L2PSstream::rtp_post_process(const RTPcell::Ptr &rtp_cell){
const uint8_t h264key[]={0x00,0x00,0x00,0x01,0x65};
const uint8_t PEShead[]={0x00,0x00,0x01,0xe0};
const uint8_t h264Sei[]={0x00,0x00,0x00,0x01,0x06};
uint8_t * keyframe_pos,*PES_pos;
uint16_t payload_len=0;
switch (_code_id)
{
case CodecId::CodecH264:
keyframe_pos =(uint8_t*)memmem(rtp_cell->_PT_data, rtp_cell->_PT_len,h264key,sizeof(h264key));
if(keyframe_pos){ //the rtp contain key frame
auto ret=splitPES(rtp_cell->_PT_data, keyframe_pos-(rtp_cell->_PT_data),[&](const char *ptr,const char *pes, int len, int prefix){
if(prefix==4){
int type=H264_TYPE(((uint8_t *)ptr+prefix)[0]);
if (type==H264Nal::NAL_SEI)
{
uint8_t* sei_ptr=_sei_race_buf;
void
L2PSstream::rtp_post_process(const RTPcell::Ptr &rtp_cell)
{
const uint8_t h264key[] = {0x00, 0x00, 0x00, 0x01, 0x65};
const uint8_t PEShead[] = {0x00, 0x00, 0x01, 0xe0};
const uint8_t h264Sei[] = {0x00, 0x00, 0x00, 0x01, 0x06};
uint8_t *keyframe_pos, *PES_pos;
uint16_t payload_len = 0;
switch (_code_id) {
case CodecId::CodecH264:
keyframe_pos = (uint8_t *) memmem(rtp_cell->_PT_data, rtp_cell->_PT_len, h264key, sizeof(h264key));
if (keyframe_pos) {//the rtp contain key frame
INFOL("Find Key frame\n");
auto ret = splitPES(
rtp_cell->_PT_data, keyframe_pos - (rtp_cell->_PT_data),
[&](const char *ptr, const char *pes, int len, int prefix) {
if (prefix == 4) {
int type = H264_TYPE(((uint8_t *) ptr + prefix)[0]);
if (type == H264Nal::NAL_SEI) {
uint8_t *sei_ptr = _sei_race_buf;
// if(len>sizeof(_sei_race_buf)) return false;
// memcpy(sei_ptr,ptr,len);sei_ptr+=len;
auto gen_ptr=gen_sec_sei(sei_ptr);
if(gen_ptr-sei_ptr<=0) return false;
uint16_t pes_pt_len= (((uint16_t)pes[4]) << 8) | (uint8_t)pes[5];
pes_pt_len+=gen_ptr-sei_ptr;
uint16_t pes_pos=((uint8_t*)pes-rtp_cell->_buffer)+rtp_cell->_RTPpkg_head_pos;
if(pes_pos+2+4>rtp_cell->_RTPpkg_ptr->_buffer.size())
return false;
nbo_w16((uint8_t*)(rtp_cell->_RTPpkg_ptr->_buffer.data()+pes_pos+4),pes_pt_len);
auto gen_ptr = gen_sec_sei(sei_ptr);
if (gen_ptr - sei_ptr <= 0) return false;
uint16_t pes_pt_len = (((uint16_t) pes[4]) << 8) | (uint8_t) pes[5];
pes_pt_len += gen_ptr - sei_ptr;
uint16_t pes_pos = ((uint8_t *) pes - rtp_cell->_buffer) + rtp_cell->_RTPpkg_head_pos;
if (pes_pos + 2 + 4 > rtp_cell->_RTPpkg_ptr->_buffer.size()) return false;
nbo_w16((uint8_t *) (rtp_cell->_RTPpkg_ptr->_buffer.data() + pes_pos + 4), pes_pt_len);
///////////add rear rtp header/////////////////
sei_ptr=addRTPheader(rtp_cell,gen_ptr,sizeof(_sei_race_buf)-(gen_ptr-_sei_race_buf));
if(!sei_ptr) return false;
_sei_race_len=sei_ptr-_sei_race_buf;
rtp_cell->_RTPpkg_ptr->insertData(_sei_race_buf,_sei_race_len,rtp_cell->_RTPpkg_head_pos,
((uint8_t*)ptr-rtp_cell->_buffer)+len,sei_ptr-gen_ptr);
sei_ptr =
addRTPheader(rtp_cell, gen_ptr, sizeof(_sei_race_buf) - (gen_ptr - _sei_race_buf));
if (!sei_ptr) return false;
_sei_race_len = sei_ptr - _sei_race_buf;
rtp_cell->_RTPpkg_ptr->insertData(
_sei_race_buf, _sei_race_len, rtp_cell->_RTPpkg_head_pos,
((uint8_t *) ptr - rtp_cell->_buffer) + len, sei_ptr - gen_ptr);
// SUCSL("SUCESS SIGN!!!!!!!!!!!!!!\n");
return true;
}
}
return false;
});
// PES_pos =(uint8_t*)memmem(rtp_cell->_PT_data, keyframe_pos-(rtp_cell->_PT_data),PEShead,sizeof(PEShead));
// record_PES_head(PES_pos,PES_pos-(rtp_cell->_PT_data));
// if(_PES_head_len>0 ){
// auto sei_head_ptr=_sei_race_buf+_PES_head_len;
// auto pes_ptr=sei_head_ptr+sizeof(h264Sei);
// auto gen_ptr=gen_sec_sei(pes_ptr);
// if(gen_ptr-pes_ptr<=0) return;
// memcpy(sei_head_ptr,h264Sei,sizeof(h264Sei)); // add sei head
// memcpy(_sei_race_buf,_PES_head,_PES_head_len); // add pes head
// payload_len=gen_ptr-_sei_race_buf-6;
// nbo_w16(_sei_race_buf+4,payload_len);// modify pes len
// auto rtp_appen_ptr=addRTPheader(rtp_cell,gen_ptr,sizeof(_sei_race_buf)-(gen_ptr-_sei_race_buf));
// if(rtp_appen_ptr){
// _sei_race_len=rtp_appen_ptr-_sei_race_buf;
// rtp_cell->_RTPpkg_ptr->insertData(_sei_race_buf,_sei_race_len,rtp_cell->_RTPpkg_head_pos,
// rtp_cell->_RTPpkg_head_pos+(PES_pos-rtp_cell->_buffer),rtp_appen_ptr-gen_ptr);
// // SUCSL("SUCESS SIGN!!!!!!!!!!!!!!\n");
// }
// PES_pos =(uint8_t*)memmem(rtp_cell->_PT_data, keyframe_pos-(rtp_cell->_PT_data),PEShead,sizeof(PEShead));
// record_PES_head(PES_pos,PES_pos-(rtp_cell->_PT_data));
// if(_PES_head_len>0 ){
// auto sei_head_ptr=_sei_race_buf+_PES_head_len;
// auto pes_ptr=sei_head_ptr+sizeof(h264Sei);
// auto gen_ptr=gen_sec_sei(pes_ptr);
// if(gen_ptr-pes_ptr<=0) return;
// memcpy(sei_head_ptr,h264Sei,sizeof(h264Sei)); // add sei head
// memcpy(_sei_race_buf,_PES_head,_PES_head_len); // add pes head
// payload_len=gen_ptr-_sei_race_buf-6;
// nbo_w16(_sei_race_buf+4,payload_len);// modify pes len
// auto rtp_appen_ptr=addRTPheader(rtp_cell,gen_ptr,sizeof(_sei_race_buf)-(gen_ptr-_sei_race_buf));
// if(rtp_appen_ptr){
// _sei_race_len=rtp_appen_ptr-_sei_race_buf;
// rtp_cell->_RTPpkg_ptr->insertData(_sei_race_buf,_sei_race_len,rtp_cell->_RTPpkg_head_pos,
// rtp_cell->_RTPpkg_head_pos+(PES_pos-rtp_cell->_buffer),rtp_appen_ptr-gen_ptr);
// // SUCSL("SUCESS SIGN!!!!!!!!!!!!!!\n");
// }
// }
}
break;
case CodecId::CodecH265:
/* code */
break;
case CodecId::CodecInvalid:
break;
default:
WRNGL("unknown code id: %u \n ",_code_id);
break;
// }
} else {
INFOL("NO key frame\n");
}
break;
case CodecId::CodecH265:
/* code */
break;
case CodecId::CodecInvalid:
break;
default:
WRNGL("unknown code id: %u \n ", _code_id);
break;
}
}
void L2PSstream::decodeH264(const char *data, int bytes, int64_t dts, int prefix){
int type=H264_TYPE(((uint8_t *)data+prefix)[0]);
switch (type)
{
void
L2PSstream::decodeH264(const char *data, int bytes, int64_t dts, int prefix)
{
int type = H264_TYPE(((uint8_t *) data + prefix)[0]);
switch (type) {
case H264Nal::NAL_IDR:
TRACEL("NAL_IDR dts:%lu, bytes:%d\n",dts,bytes);
addSignData(data+prefix,bytes-prefix,dts,type);
TRACEL("NAL_IDR dts:%lu, bytes:%d\n", dts, bytes);
addSignData(data + prefix, bytes - prefix, dts, type);
break;
case H264Nal::NAL_SEI:
TRACEL("NAL_SEI dts:%lu, bytes:%d\n",dts,bytes);
break;
default:
break;
}
};
void L2PSstream::decodeH265(const char *data, int bytes, int64_t dts, int prefix){
int type=H265_TYPE(((uint8_t *)data+prefix)[0]);
switch (type)
{
case H265Nal::NAL_BLA_W_LP ... H265Nal::NAL_RSV_IRAP_VCL23: //keyframe
TRACEL("NAL_IDR dts:%lu, bytes:%d\n",dts,bytes);
addSignData(data+prefix,bytes-prefix,dts,type);
break;
case H265Nal::NAL_SEI_PREFIX :
TRACEL("NAL_SEI_PREFIX dts:%lu, bytes:%d\n",dts,bytes);
break;
case H265Nal::NAL_SEI_SUFFIX :
TRACEL("NAL_SEI_SUFFIX dts:%lu, bytes:%d\n",dts,bytes);
TRACEL("NAL_SEI dts:%lu, bytes:%d\n", dts, bytes);
break;
default:
break;
}
};
void
L2PSstream::decodeH265(const char *data, int bytes, int64_t dts, int prefix)
{
int type = H265_TYPE(((uint8_t *) data + prefix)[0]);
switch (type) {
case H265Nal::NAL_BLA_W_LP... H265Nal::NAL_RSV_IRAP_VCL23://keyframe
TRACEL("NAL_IDR dts:%lu, bytes:%d\n", dts, bytes);
addSignData(data + prefix, bytes - prefix, dts, type);
break;
case H265Nal::NAL_SEI_PREFIX:
TRACEL("NAL_SEI_PREFIX dts:%lu, bytes:%d\n", dts, bytes);
break;
case H265Nal::NAL_SEI_SUFFIX:
TRACEL("NAL_SEI_SUFFIX dts:%lu, bytes:%d\n", dts, bytes);
break;
default:
break;
}
};
///////////////////////////////////////////////////////////////////

View File

@ -2,204 +2,188 @@
#include "L2SecurityStream.h"
void printdata(uint8_t* out_buff,int frameLen,int type,int dts){
printf("\n");
printf("type: %d %d,%d\n", type, dts, frameLen);
for (int num = 0; num < 10; num++)
{
printf("%02X ", (uint8_t)out_buff[num]);
}
printf("---- ");
for (int num = frameLen/2; num < frameLen / 2+10; num++)
{
printf("%02X ", (uint8_t)out_buff[num]);
}
printf("---- ");
for (int num = 10; num >0; num--)
{
printf("%02X ", (uint8_t)out_buff[frameLen-num]);
}
printf("\n");
}
L2SecurityStream::L2SecurityStream(sec_set_info* set_info){
_keyframe_seq=-1;
_code_id=CodecInvalid;
sm3_init(&_sm3);
if(set_info){
_set_info=*set_info;
}else
{
throw runtime_error("L2SecurityStream initial error: empty sec_set_info");
}
uint8_t security_set_version[19] = "Ver 0.0.2";
memset(&sec_set,0,sizeof(sec_set));
memset(&(sec_set.evek),0xff,sizeof(sec_set.evek));
memset(&(sec_set.iv),0xff,sizeof(sec_set.iv));
sec_set.Flag.encryption_flag = 0;
sec_set.Flag.authnetication_flag = 1;
sec_set.Flag.vek_flag = 0;
sec_set.Flag.iv_flag = 0;
sec_set.Flag.hash_discard_p_picture = 1;
sec_set.Flag.reserved_flag = 0b001;
sec_set.Type.signature_type = (uint8_t) SecMedia::DecryptType::SM2_auth;
sec_set.Type.hash_type =(uint8_t) SecMedia::DecryptType::SM3;
sec_set.Type.encryption_type =(uint8_t) SecMedia::DecryptType::NONE;
sec_set.Type.vek_encryption_type = (uint8_t) SecMedia::DecryptType::NONE;
memcpy(sec_set.camera_id,_set_info.camera_id,sizeof(sec_set.camera_id) );
memcpy(sec_set.vkek_version,_set_info.vkek_version,sizeof(sec_set.vkek_version));
sec_set.vkek_version_length_minus1=sizeof(sec_set.vkek_version)-1;
sec_set.end_flag=0x80;
// /////////////////////ver set
memset(&ver_set,0,sizeof(ver_set));
memset(&(ver_set.sign),0xff,sizeof(ver_set.sign));
ver_set.end_flag=0x80;
memset(&err_ver_set,0,sizeof(ver_set));
err_ver_set.end_flag=0x80;
_thread_exit=0;
for(auto &th:_sign_thread_list){
th=new std::thread([&](){
ThreadSign();
});
}
for(auto &th:_sign_thread_list){
th->detach();
}
usleep(1000);
void
printdata(uint8_t *out_buff, int frameLen, int type, int dts)
{
printf("\n");
printf("type: %d %d,%d\n", type, dts, frameLen);
for (int num = 0; num < 10; num++) { printf("%02X ", (uint8_t) out_buff[num]); }
printf("---- ");
for (int num = frameLen / 2; num < frameLen / 2 + 10; num++) { printf("%02X ", (uint8_t) out_buff[num]); }
printf("---- ");
for (int num = 10; num > 0; num--) { printf("%02X ", (uint8_t) out_buff[frameLen - num]); }
printf("\n");
}
void L2SecurityStream::ThreadSign(){
uint8_t sha[128];
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(_set_info.cpu_core,&mask);
if(pthread_setaffinity_np(pthread_self(),sizeof(mask),&mask)<0){
ERROL("Sign Thread start fail");
}
SUCSL("sign thread created\n");
while (1)
{
{
std::unique_lock<std::mutex> lock(_sign_mtx);
_sign_start.wait(lock,[&]{ return _start_sign; });
_start_sign=false;
}
if(_thread_exit){
NOTICEL("sign thread release\n");
sleep(0);
return;
}
{
std::lock_guard<std::mutex> lock(_sign_process_mtx);
NOTICEL("Start sign cpu:%d dts:%u\n",sched_getcpu(),ver_set.head_frame_dts);
// _sign_start.notify_one();
// _on_sign_flag=true;
// _on_sign.notify_all();
sm3_init(&_sm3);
sm3_update(&_sm3,(uint8_t*)_keyframe_DATA.data(),_keyframe_DATA.size());
sm3_final(&_sm3,sha);
do_sm2_sign((char*)_set_info.prikey,(char*)_set_info.pubkey,(char *) sha, 32, (char *)ver_set.sign);
ver_set.sign_len=64;
// DEBUGL("\n&&&&&&&& sign: hash &&&&&&&&&&&&&&&&&&&&&&&&");
// print_data2((char*)sha,32,32);
// DEBUGL("\n&&&&&&&& sign: sign &&&&&&&&&&&&&&&&&&&&&&&&");
// print_data2((char*)ver_set.sign,ver_set.sign_len,ver_set.sign_len);
_keyframe_seq=-5;
NOTICEL("End sign\n");
}
}
L2SecurityStream::L2SecurityStream(sec_set_info *set_info)
{
_keyframe_seq = -1;
_code_id = CodecInvalid;
sm3_init(&_sm3);
if (set_info) {
_set_info = *set_info;
} else {
throw runtime_error("L2SecurityStream initial error: empty sec_set_info");
}
int L2SecurityStream::RtpOutput(char * buf, uint32_t * len, uint16_t* sei_tail_pos, uint16_t* extra_len, void ** param){
auto element= _package_queue.pop();
// *param=nullptr;
if (!element) return -1;
*len=(uint32_t) element->_buffer.size();
memcpy(buf,element->_buffer.data(),*len);
*sei_tail_pos=element->_insert_tail_pos;
*extra_len=element->_inserted_len;
if(param) *param=element->_usr_param;
element->clear_all();
_all_out++;
// DEBUGL("in param address: %p \n",element->_usr_param);
// DEBUGL("%p out>> in %u out %u que: %lu frame_len %u tail_pos %u ins_len %u\n",this,_all_in,_all_out, _package_queue.size(),*len,*sei_tail_pos,*extra_len);
// DEBUGL("OUT len %u\n", *len);
return 1;
uint8_t security_set_version[19] = "Ver 0.0.2";
memset(&sec_set, 0, sizeof(sec_set));
memset(&(sec_set.evek), 0xff, sizeof(sec_set.evek));
memset(&(sec_set.iv), 0xff, sizeof(sec_set.iv));
sec_set.Flag.encryption_flag = 0;
sec_set.Flag.authnetication_flag = 1;
sec_set.Flag.vek_flag = 0;
sec_set.Flag.iv_flag = 0;
sec_set.Flag.hash_discard_p_picture = 1;
sec_set.Flag.reserved_flag = 0b001;
sec_set.Type.signature_type = (uint8_t) SecMedia::DecryptType::SM2_auth;
sec_set.Type.hash_type = (uint8_t) SecMedia::DecryptType::SM3;
sec_set.Type.encryption_type = (uint8_t) SecMedia::DecryptType::NONE;
sec_set.Type.vek_encryption_type = (uint8_t) SecMedia::DecryptType::NONE;
memcpy(sec_set.camera_id, _set_info.camera_id, sizeof(sec_set.camera_id));
memcpy(sec_set.vkek_version, _set_info.vkek_version, sizeof(sec_set.vkek_version));
sec_set.vkek_version_length_minus1 = sizeof(sec_set.vkek_version) - 1;
sec_set.end_flag = 0x80;
// /////////////////////ver set
memset(&ver_set, 0, sizeof(ver_set));
memset(&(ver_set.sign), 0xff, sizeof(ver_set.sign));
ver_set.end_flag = 0x80;
memset(&err_ver_set, 0, sizeof(ver_set));
err_ver_set.end_flag = 0x80;
_thread_exit = 0;
for (auto &th : _sign_thread_list) {
th = new std::thread([&]() { ThreadSign(); });
}
for (auto &th : _sign_thread_list) { th->detach(); }
usleep(1000);
}
uint8_t * L2SecurityStream::gen_sec_sei(uint8_t *ptr){
std::lock_guard<std::mutex> lock(_sign_process_mtx);
if(_keyframe_complete && !_start_sign && !_keyframe_DATA.empty()){
{
// std::unique_lock<std::mutex> lock(_sign_process_mtx);
if(_keyframe_seq==-5){
ptr=SecMedia::appendSEIframe(ptr,SEC_SET_PT,(uint8_t*)SecMedia::Sec_set_UUID,(uint8_t*)&sec_set,sizeof(sec_set));
ptr=SecMedia::appendSEIframe(ptr,VER_SET_PT,(uint8_t*)SecMedia::Ver_set_UUID,(uint8_t*)&ver_set,sizeof(ver_set));
INFOL("gen_sec_sei dts:%u\n",ver_set.head_frame_dts);
}
else{
if(_keyframe_seq==-2){
WRNGL("Sign not complete\n");
err_ver_set.head_frame_type=SecMedia::ERR_incomplete_Sign;
}
else{
WRNGL("Sign loss\n");
err_ver_set.head_frame_type=SecMedia::ERR_Without_Sign_Data;
}
}
_keyframe_seq=0;
_keyframe_DATA.clear();
_keyframe_complete=false;
}
return ptr;
}else
void
L2SecurityStream::ThreadSign()
{
uint8_t sha[128];
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(_set_info.cpu_core, &mask);
if (pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask) < 0) { ERROL("Sign Thread start fail"); }
SUCSL("sign thread created\n");
while (1) {
{
WRNGL("Without Sign data\n");
err_ver_set.head_frame_type=SecMedia::ERR_Without_Frame_Data;
std::unique_lock<std::mutex> lock(_sign_mtx);
_sign_start.wait(lock, [&] { return _start_sign; });
_start_sign = false;
}
ptr=SecMedia::appendSEIframe(ptr,SEC_SET_PT,(uint8_t*)SecMedia::Sec_set_UUID,(uint8_t*)&sec_set,sizeof(sec_set));
ptr=SecMedia::appendSEIframe(ptr,VER_SET_PT,(uint8_t*)SecMedia::Ver_set_UUID,(uint8_t*)&err_ver_set,sizeof(err_ver_set));
if (_thread_exit) {
NOTICEL("sign thread release\n");
sleep(0);
return;
}
{
std::lock_guard<std::mutex> lock(_sign_process_mtx);
NOTICEL("Start sign cpu:%d dts:%u\n", sched_getcpu(), ver_set.head_frame_dts);
// _sign_start.notify_one();
// _on_sign_flag=true;
// _on_sign.notify_all();
sm3_init(&_sm3);
sm3_update(&_sm3, (uint8_t *) _keyframe_DATA.data(), _keyframe_DATA.size());
sm3_final(&_sm3, sha);
do_sm2_sign((char *) _set_info.prikey, (char *) _set_info.pubkey, (char *) sha, 32, (char *) ver_set.sign);
ver_set.sign_len = 64;
// DEBUGL("\n&&&&&&&& sign: hash &&&&&&&&&&&&&&&&&&&&&&&&");
// print_data2((char*)sha,32,32);
// DEBUGL("\n&&&&&&&& sign: sign &&&&&&&&&&&&&&&&&&&&&&&&");
// print_data2((char*)ver_set.sign,ver_set.sign_len,ver_set.sign_len);
_keyframe_seq = -5;
NOTICEL("End sign\n");
}
}
}
int
L2SecurityStream::RtpOutput(char *buf, uint32_t *len, uint16_t *sei_tail_pos, uint16_t *extra_len, void **param)
{
auto element = _package_queue.pop();
// *param=nullptr;
if (!element) return -1;
*len = (uint32_t) element->_buffer.size();
memcpy(buf, element->_buffer.data(), *len);
*sei_tail_pos = element->_insert_tail_pos;
*extra_len = element->_inserted_len;
if (param) *param = element->_usr_param;
element->clear_all();
_all_out++;
// DEBUGL("in param address: %p \n",element->_usr_param);
// DEBUGL("%p out>> in %u out %u que: %lu frame_len %u tail_pos %u ins_len %u\n",this,_all_in,_all_out, _package_queue.size(),*len,*sei_tail_pos,*extra_len);
// DEBUGL("OUT len %u\n", *len);
return 1;
}
uint8_t *
L2SecurityStream::gen_sec_sei(uint8_t *ptr)
{
std::lock_guard<std::mutex> lock(_sign_process_mtx);
if (_keyframe_complete && !_start_sign && !_keyframe_DATA.empty()) {
{
// std::unique_lock<std::mutex> lock(_sign_process_mtx);
if (_keyframe_seq == -5) {
ptr = SecMedia::appendSEIframe(
ptr, SEC_SET_PT, (uint8_t *) SecMedia::Sec_set_UUID, (uint8_t *) &sec_set, sizeof(sec_set));
ptr = SecMedia::appendSEIframe(
ptr, VER_SET_PT, (uint8_t *) SecMedia::Ver_set_UUID, (uint8_t *) &ver_set, sizeof(ver_set));
INFOL("gen_sec_sei dts:%u\n", ver_set.head_frame_dts);
} else {
if (_keyframe_seq == -2) {
WRNGL("Sign not complete\n");
err_ver_set.head_frame_type = SecMedia::ERR_incomplete_Sign;
} else {
WRNGL("Sign loss\n");
err_ver_set.head_frame_type = SecMedia::ERR_Without_Sign_Data;
}
}
_keyframe_seq = 0;
_keyframe_DATA.clear();
_keyframe_complete = false;
}
return ptr;
} else {
WRNGL("Without Sign data\n");
err_ver_set.head_frame_type = SecMedia::ERR_Without_Frame_Data;
}
ptr = SecMedia::appendSEIframe(
ptr, SEC_SET_PT, (uint8_t *) SecMedia::Sec_set_UUID, (uint8_t *) &sec_set, sizeof(sec_set));
ptr = SecMedia::appendSEIframe(
ptr, VER_SET_PT, (uint8_t *) SecMedia::Ver_set_UUID, (uint8_t *) &err_ver_set, sizeof(err_ver_set));
return ptr;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
int L2UdpStream::RtpInput(const uint8_t* data,uint16_t len, void *param){
RTPpackage::rtp_cb_f rtp_cell_cb=[&](const RTPcell::Ptr &rtp_cell){
rtp_cell->precessRTP();
if( (uint32_t)(_prv_seq+1)%(uint32_t)(UINT16_MAX+1) == rtp_cell->seq || _prv_seq==0){
// INFOL("PT: %u Seq: %u PT_len: %u mark: %u\n",rtp_cell->PT,rtp_cell->seq,rtp_cell->_PT_len,(uint8_t)rtp_cell->mark);
_rtp_decoder.decodeRtp(rtp_cell->_buffer,rtp_cell->_RTP_len);
// on_rtp_out(rtp_cell->_PT_data,rtp_cell->_PT_len,rtp_cell->timestamp,(int)rtp_cell->mark);
// rtp_cell->_prefix
incrementRTPseq(rtp_cell);
rtp_post_process(rtp_cell);
}else
{
WRNGL("Disorder rtp Seq: %u+1 != %u \n",_prv_seq,rtp_cell->seq);
}
_prv_seq=rtp_cell->seq;
};
RTPpackage::Ptr RTPpkg=make_shared<RTPpackage>(rtp_cell_cb,true);
RTPpkg->assignRTPpackage(data,len,param);
RTPpkg->insertData();
_package_queue.push(RTPpkg);
return 0;
}
int
L2UdpStream::RtpInput(const uint8_t *data, uint16_t len, void *param)
{
RTPpackage::rtp_cb_f rtp_cell_cb = [&](const RTPcell::Ptr &rtp_cell) {
rtp_cell->precessRTP();
if ((uint32_t) (_prv_seq + 1) % (uint32_t) (UINT16_MAX + 1) == rtp_cell->seq || _prv_seq == 0) {
// INFOL("PT: %u Seq: %u PT_len: %u mark: %u\n",rtp_cell->PT,rtp_cell->seq,rtp_cell->_PT_len,(uint8_t)rtp_cell->mark);
_rtp_decoder.decodeRtp(rtp_cell->_buffer, rtp_cell->_RTP_len);
// on_rtp_out(rtp_cell->_PT_data,rtp_cell->_PT_len,rtp_cell->timestamp,(int)rtp_cell->mark);
// rtp_cell->_prefix
incrementRTPseq(rtp_cell);
rtp_post_process(rtp_cell);
} else {
WRNGL("Disorder rtp Seq: (prev)%u+1 != (cur)%u \n", _prv_seq, rtp_cell->seq);
}
_prv_seq = rtp_cell->seq;
};
RTPpackage::Ptr RTPpkg = make_shared<RTPpackage>(rtp_cell_cb, true);
RTPpkg->assignRTPpackage(data, len, param);
RTPpkg->insertData();
_package_queue.push(RTPpkg);
return 0;
}

View File

@ -6,6 +6,8 @@
#include "HuaWei/HWsign.h"
#include "HuaWei/RTP.h"
#include "RtpDecoder.h"
#include "layer2/l2_manager.h"
#include "layer4/rtp_manager.h"
#include <array>
#include <atomic>
#include <fstream>
@ -23,7 +25,7 @@ public:
_keyframe_seq = -2;
_sign_start.notify_all();
usleep(10000);
};
}
void ThreadSign();
@ -55,7 +57,7 @@ protected:
// }
usleep(1000);
// sleep(1);
};
}
private:
const char _class_name[17] = "L2SecurityStream";
@ -67,7 +69,8 @@ protected:
uint32_t _all_out = 0;
bool _keyframe_complete = false;
uint16_t _sei_race_len;
HWsec_queue _package_queue;
// HWsec_queue _package_queue;
L2Manager _l2_manager;
public:
uint8_t _sei_race_buf[4096];
@ -88,10 +91,13 @@ public:
bool _start_sign = false;
};
class L2UdpStream : public L2SecurityStream {
class L2UdpStream : public L2SecurityStream, public sigslot::has_slots<> {
protected:
uint16_t _seq_increment = 0;
RtpDecoder _rtp_decoder;
RTPManager::Ptr _rtp_manager;
L2Manager _l2_manager;
ThreadsafeQueue<RTPpackage::Ptr> _package_queue;
private:
uint16_t _prv_seq = 0;
@ -102,7 +108,11 @@ public:
_rtp_decoder.setOnDecode([&](const uint8_t *packet, int bytes, uint32_t timestamp, int flags) {
on_rtp_out(packet, bytes, timestamp, flags);
});
};
_rtp_manager = std::shared_ptr<RTPManager>(new RTPManager());
_l2_manager.OnFrameInput.connect(_rtp_manager.get(), &RTPManager::OnFrame);
_rtp_manager->OnPSFrame.connect(this, &L2UdpStream::on_rtp_out);
}
int RtpInput(const uint8_t *data, uint16_t len, void *param = nullptr) override;
virtual void on_rtp_out(const uint8_t *packet, int bytes, uint32_t timestamp, int flags) = 0;

View File

@ -1,48 +1,48 @@
#include "RtpDecoder.h"
#include "rtp-payload.h"
RtpDecoder::~RtpDecoder() {
if(_rtp_decoder){
RtpDecoder::~RtpDecoder()
{
if (_rtp_decoder) {
rtp_payload_decode_destroy(_rtp_decoder);
_rtp_decoder = nullptr;
}
}
void RtpDecoder::decodeRtp(const void *data, int bytes) {
if(!_rtp_decoder){
static rtp_payload_t s_func= {
[](void* param, int bytes){
RtpDecoder *obj = (RtpDecoder *)param;
obj->_buffer.resize(bytes);
if (obj->_buffer.capacity()>2*bytes && bytes>2048)
{
obj->_buffer.shrink_to_fit();
}
// obj->_buffer->setCapacity(bytes);
// return (void *)obj->_buffer->data();
return (void*) obj->_buffer.data();
},
[](void* param, void* packet){
//do nothing
},
[](void* param, const void *packet, int bytes, uint32_t timestamp, int flags){
RtpDecoder *obj = (RtpDecoder *)param;
obj->onDecode((uint8_t*)packet,bytes,timestamp,flags);
}
};
void
RtpDecoder::decodeRtp(const void *data, int bytes)
{
if (bytes < 2) { return; }
const uint8_t cur_rtp_type = 0x7F & ((uint8_t *) data)[1];
_rtp_type = 0x7F & ((uint8_t *) data)[1];
INFOL("rtp type:%d\n" ,(int) _rtp_type);
if (!_rtp_decoder) {
static rtp_payload_t s_func = {
[](void *param, int bytes) {
RtpDecoder *obj = (RtpDecoder *) param;
obj->_buffer.resize(bytes);
if (obj->_buffer.capacity() > 2 * bytes && bytes > 2048) { obj->_buffer.shrink_to_fit(); }
// obj->_buffer->setCapacity(bytes);
// return (void *)obj->_buffer->data();
return (void *) obj->_buffer.data();
},
[](void *param, void *packet) {
//do nothing
},
[](void *param, const void *packet, int bytes, uint32_t timestamp, int flags) {
RtpDecoder *obj = (RtpDecoder *) param;
obj->onDecode((uint8_t *) packet, bytes, timestamp, flags);
}};
_rtp_type = cur_rtp_type;
INFOL("rtp type:%d\n", (int) _rtp_type);
_rtp_decoder = rtp_payload_decode_create(_rtp_type, _codec.data(), &s_func, this);
if (!_rtp_decoder) {
WRNGL("unsupported rtp type:%d ,size:%u\n",(int) _rtp_type,bytes);
}
if (!_rtp_decoder) { WRNGL("unsupported rtp type:%d ,size:%u\n", (int) _rtp_type, bytes); }
}
if(_rtp_decoder){
if(( 0x7F & ((uint8_t *) data)[1])==_rtp_type){ // 直接收一种类型了
rtp_payload_decode_input(_rtp_decoder,data,bytes);
if (_rtp_decoder) {
if (cur_rtp_type == _rtp_type) {// 直接收一种类型了
rtp_payload_decode_input(_rtp_decoder, data, bytes);
}
}
}
}

View File

@ -10,13 +10,12 @@ public:
typedef function<void(const uint8_t *packet, int bytes, uint32_t timestamp, int flags)> on_decode_t;
public:
RtpDecoder() {
RtpDecoder(const std::string &codec_name = "MP2P") : _codec(codec_name) {}
};
~RtpDecoder();
void decodeRtp(const void *data, int bytes);
void setOnDecode(on_decode_t cb) { _on_decoded = cb; };
void setOnDecode(on_decode_t cb) { _on_decoded = cb; }
private:
void onDecode(const uint8_t *packet, int bytes, uint32_t timestamp, int flags)
@ -27,7 +26,7 @@ private:
private:
void *_rtp_decoder = nullptr;
std::vector<uint8_t> _buffer;
std::string _codec = "MP2P";
std::string _codec;
uint8_t _rtp_type;
on_decode_t _on_decoded;
};

View File

@ -1,148 +1,149 @@
#ifndef _HWsec_H
#define _HWsec_H
#include <thread>
#include <chrono>
#include <fstream>
#include "HWsign.h"
#include "RTP.h"
#include "queue.h"
#include "HWsign.h"
class HWsec_queue: public ThreadsafeQueue<RTPpackage::Ptr>
{
public:
HWsec_queue(){};
~HWsec_queue(){};
RTPpackage::Ptr pop(){
std::lock_guard<decltype(mutex_)> lock(mutex_);
if (queue_.empty()) {
return nullptr;
}
RTPpackage::Ptr tmp = queue_.front();
if (tmp->isPause())
{
WRNGL("pause@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@pause@@@@@@@@pause@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@pause@@@@@@@@pause@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@pause@@@@@@@@ \n");
return nullptr;
}
queue_.pop();
return tmp;
}
};
#include <chrono>
#include <fstream>
#include <thread>
class HWsec
class HWsec_queue : public ThreadsafeQueue<RTPpackage::Ptr> {
public:
HWsec_queue() {}
~HWsec_queue() {}
RTPpackage::Ptr pop()
{
typedef std::chrono::high_resolution_clock Clock;
private:
CodecId _code_id;
RTPcell::Ptr _SEI_rtp;
RTPcell::Ptr _prv_cell=nullptr;
HWsec_queue _RTPpackage_queue;
uint32_t _all_in=0;
uint32_t _all_out=0;
const char _class_name[6]="HWsec";
bool _keyframe_complete=false;
uint16_t _sei_race_len;
#ifdef DUMP_FILE
shared_ptr<ofstream> _data_file;
shared_ptr<ofstream> _len_file;
#endif
std::thread * _sign_thread;
public:
uint8_t _sei_race_buf[4096];
uint8_t _thread_exit;
int _keyframe_seq;
string _keyframe_DATA;
sm3_ctx _sm3;
sec_set_info _set_info;
SecMedia::NALUdecodeInfo sec_set;
SecMedia::VerificationSet ver_set;
std::mutex _sign_mtx;
std::condition_variable _sign_start;
bool _start_sign=false;
public:
HWsec(sec_set_info* set_info);
~HWsec();
bool check_class(){
return strcmp(_class_name,"HWsec")==0;
std::lock_guard<decltype(mutex_)> lock(mutex_);
if (queue_.empty()) { return nullptr; }
RTPpackage::Ptr tmp = queue_.front();
if (tmp->isPause()) {
WRNGL("pause@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@pause@@@@@@@@pause@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@pause@@@@"
"@@@@pause@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@pause@@@@@@@@ \n");
return nullptr;
}
void HWsec_input(const uint8_t* data,uint16_t len, void *param=nullptr){
RTPpackage::rtp_cb_f rtp_cell_cb=[&](const RTPcell::Ptr &rtp_cell){
if(rtp_cell){
queue_.pop();
return tmp;
}
};
class HWsec {
typedef std::chrono::high_resolution_clock Clock;
private:
CodecId _code_id;
RTPcell::Ptr _SEI_rtp;
RTPcell::Ptr _prv_cell = nullptr;
HWsec_queue _RTPpackage_queue;
uint32_t _all_in = 0;
uint32_t _all_out = 0;
const char _class_name[6] = "HWsec";
bool _keyframe_complete = false;
uint16_t _sei_race_len;
#ifdef DUMP_FILE
shared_ptr<ofstream> _data_file;
shared_ptr<ofstream> _len_file;
#endif
std::thread *_sign_thread;
public:
uint8_t _sei_race_buf[4096];
uint8_t _thread_exit;
int _keyframe_seq;
string _keyframe_DATA;
sm3_ctx _sm3;
sec_set_info _set_info;
SecMedia::NALUdecodeInfo sec_set;
SecMedia::VerificationSet ver_set;
std::mutex _sign_mtx;
std::condition_variable _sign_start;
bool _start_sign = false;
public:
HWsec(sec_set_info *set_info);
~HWsec();
bool check_class() { return strcmp(_class_name, "HWsec") == 0; }
void HWsec_input(const uint8_t *data, uint16_t len, void *param = nullptr)
{
RTPpackage::rtp_cb_f rtp_cell_cb = [&](const RTPcell::Ptr &rtp_cell) {
if (rtp_cell) {
rtp_cell->precessRTP();
switch (rtp_cell->PT)
{
case 99: //h264
if (_code_id!=CodecId::CodecH264)
{
_code_id=CodecId::CodecH264;
switch (rtp_cell->PT) {
case 99://h264
if (_code_id != CodecId::CodecH264) {
_code_id = CodecId::CodecH264;
INFOL("Track type is changed to H.264\n");
}
// decode_264( rtp_cell);
break;
case 103 ... 108: //h265
if (_code_id!=CodecId::CodecH265)
{
_code_id=CodecId::CodecH265;
case 103 ... 108://h265
if (_code_id != CodecId::CodecH265) {
_code_id = CodecId::CodecH265;
INFOL("Track type is changed to H.265\n");
}
decode_265( rtp_cell);
decode_265(rtp_cell);
break;
case 0 ... 95: // standard type
case 0 ... 95:// standard type
break;
default:
WRNGL("Error !!!!!!!!!! Unsupport track type: %u \n",rtp_cell->PT);
WRNGL("Error !!!!!!!!!! Unsupport track type: %u \n", rtp_cell->PT);
break;
}
}else
{
} else {
WRNGL("error");
}
};
#ifdef DUMP_FILE
_data_file->write((char*)data,len);
*_len_file <<len<<endl;
_data_file->flush();
_len_file->flush();
#endif
// DEBUGL("in param address: %p \n",param);
_all_in++;
RTPpackage::Ptr RTPpkg=make_shared<RTPpackage>(rtp_cell_cb,_prv_cell);
};
#ifdef DUMP_FILE
_data_file->write((char *) data, len);
*_len_file << len << endl;
_data_file->flush();
_len_file->flush();
#endif
// DEBUGL("in param address: %p \n",param);
_all_in++;
RTPpackage::Ptr RTPpkg = make_shared<RTPpackage>(rtp_cell_cb, _prv_cell);
// auto t1=Clock::now();
_prv_cell=RTPpkg->assignRTPpackage(data,len,param);
// auto t2=Clock::now();
RTPpkg->insertData();
_RTPpackage_queue.push(RTPpkg);
// auto delta=std::chrono::duration_cast<std::chrono::microseconds>(t2-t1);
// TRACEL("time:%ld size: %lu \n",delta.count(),RTPpkg->_buffer.size());
}
// auto t1=Clock::now();
_prv_cell = RTPpkg->assignRTPpackage(data, len, param);
// auto t2=Clock::now();
RTPpkg->insertData();
_RTPpackage_queue.push(RTPpkg);
// auto delta=std::chrono::duration_cast<std::chrono::microseconds>(t2-t1);
// TRACEL("time:%ld size: %lu \n",delta.count(),RTPpkg->_buffer.size());
}
int HWsec_output(char * buf, uint32_t * len, uint16_t* sei_tail_pos, uint16_t* extra_len, void ** param){
int HWsec_output(char *buf, uint32_t *len, uint16_t *sei_tail_pos, uint16_t *extra_len, void **param)
{
auto element= _RTPpackage_queue.pop();
// *param=nullptr;
if (!element) return -1;
*len=(uint32_t) element->_buffer.size();
memcpy(buf,element->_buffer.data(),*len);
*sei_tail_pos=element->_insert_tail_pos;
*extra_len=element->_inserted_len;
if(param) *param=element->_usr_param;
element->clear_all();
_all_out++;
// DEBUGL("in param address: %p \n",element->_usr_param);
DEBUGL("%p out>> in %u out %u que: %lu frame_len %u tail_pos %u ins_len %u\n",this,_all_in,_all_out, _RTPpackage_queue.size(),*len,*sei_tail_pos,*extra_len);
// DEBUGL("OUT len %u\n", *len);
return 1;
}
private:
void gen_sec_data();
void sign_data(const char * buf, const uint32_t len);
int decode_265(RTPcell::Ptr rtp_cell);
int decode_264(RTPcell::Ptr rtp_cell);
void setfile();
};
auto element = _RTPpackage_queue.pop();
// *param=nullptr;
if (!element) return -1;
*len = (uint32_t) element->_buffer.size();
memcpy(buf, element->_buffer.data(), *len);
*sei_tail_pos = element->_insert_tail_pos;
*extra_len = element->_inserted_len;
if (param) *param = element->_usr_param;
element->clear_all();
_all_out++;
// DEBUGL("in param address: %p \n",element->_usr_param);
DEBUGL("%p out>> in %u out %u que: %lu frame_len %u tail_pos %u ins_len %u\n", this, _all_in, _all_out,
_RTPpackage_queue.size(), *len, *sei_tail_pos, *extra_len);
// DEBUGL("OUT len %u\n", *len);
return 1;
}
private:
void gen_sec_data();
void sign_data(const char *buf, const uint32_t len);
int decode_265(RTPcell::Ptr rtp_cell);
int decode_264(RTPcell::Ptr rtp_cell);
void setfile();
};
#endif //_HWSIGN_TCP_H
#endif//_HWSIGN_TCP_H

View File

@ -1,401 +1,416 @@
#ifndef _RTP_H
#define _RTP_H
#include <string>
#include <functional>
#include <memory>
#include "HWcommon.h"
#include "string.h"
#include <arpa/inet.h>
#include "HWcommon.h"
#include <functional>
#include <memory>
#include <string>
using namespace std;
// extern uint32_t RTPcell_total_count;
// extern uint32_t RTPpackage_total_count;
class RTPpackage;
class RTPcell
// extern uint32_t RTPcell_total_count;
// extern uint32_t RTPpackage_total_count;
class RTPpackage;
class RTPcell {
public:
typedef shared_ptr<RTPcell> Ptr;
public:
RTPcell(uint16_t len, uint8_t prefix, uint16_t buff_pos, shared_ptr<RTPpackage> RTPpkg_ptr)
{
public:
typedef shared_ptr<RTPcell> Ptr;
public:
RTPcell(uint16_t len, uint8_t prefix, uint16_t buff_pos,shared_ptr<RTPpackage> RTPpkg_ptr){
_assign_len=0;
_RTPpkg_head_pos=buff_pos;
_prefix=prefix;
_missing_head=false;
_buffer=new uint8_t[(len/2048+1)*2048];
_RTPpkg_ptr=RTPpkg_ptr;
if(_prefix==16) _tcp_prefix=4;
else _tcp_prefix=0;
_RTP_len=len+_tcp_prefix;
// RTPcell_total_count++;
// SUCSL("create RTPcell %u\n",RTPcell_total_count);
}
RTPcell(const uint8_t *head_d,uint16_t len, uint8_t prefix, uint16_t buff_pos,shared_ptr<RTPpackage> RTPpkg_ptr){
_missing_head=true;
_RTPpkg_ptr=RTPpkg_ptr;
_assign_len=len;
_RTPpkg_head_pos=buff_pos;
_prefix=prefix;
_RTP_len=prefix;
_buffer=new uint8_t[(len/2048+1)*2048];
memcpy(_buffer,head_d,len);
if(_prefix==16) _tcp_prefix=4;
else _tcp_prefix=0;
_assign_len = 0;
_RTPpkg_head_pos = buff_pos;
_prefix = prefix;
_missing_head = false;
_buffer = new uint8_t[(len / 2048 + 1) * 2048];
_RTPpkg_ptr = RTPpkg_ptr;
if (_prefix == 16)
_tcp_prefix = 4;
else
_tcp_prefix = 0;
_RTP_len = len + _tcp_prefix;
// RTPcell_total_count++;
// SUCSL("create RTPcell %u\n",RTPcell_total_count);
}
// RTPcell(const uint8_t *data,uint16_t len, uint8_t prefix){
// _buffer=new uint8_t[(len/2048+1)*2048];
// _RTP_len=len;
// _assign_len=0;
// if(_prefix==16) _tcp_prefix=4;
// else _tcp_prefix=0;
// }
~RTPcell(){
// RTPcell_total_count--;
// WRNGL("~RTPcell %u\n",RTPcell_total_count);
delete _buffer;
}
void setRTPlen(){
if(_tcp_prefix>=4){
_RTP_len=(_buffer[2]<<8) &0xff00 | (_buffer[3])+_tcp_prefix;
_buffer= (uint8_t*) realloc(_buffer,(_RTP_len/2048+1)*2048);
}
_missing_head=false;
}
int append_data(const uint8_t * data, uint16_t len){
if(_RTP_len>=_assign_len+len){
memcpy(_assign_len+_buffer,data,len);
_assign_len+=len;
return 1;
}
return -1;
}
int getLackLen(){
return (int)_RTP_len-(int)_assign_len;
}
void precessRTP(){
uint8_t * ptr=_buffer+_tcp_prefix;
mark=ptr[1]>>7;
PT=ptr[1] & 0x7F;
memcpy(&seq, ptr + 2, 2);
seq = ntohs(seq);
memcpy(&timestamp, ptr + 4, 4);
timestamp = ntohl(timestamp);
memcpy(&ssrc, ptr + 8, 4);
ssrc = ntohl(ssrc);
// RTPcell_total_count++;
// SUCSL("create RTPcell %u\n",RTPcell_total_count);
}
_PT_header=ptr+12;
_PT_len=_RTP_len-12-_tcp_prefix;
_PT_data=ptr+12+_tcp_prefix;
}
public:
uint8_t _tcp_prefix=0;
uint8_t _prefix=0;
uint16_t _RTP_len=0;
uint16_t _assign_len=0;
uint16_t _RTPpkg_head_pos=0;
bool _missing_head=false;
uint8_t* _buffer=NULL;
shared_ptr<RTPpackage> _RTPpkg_ptr;
uint8_t *_PT_header=nullptr;
uint16_t _PT_len=0;
uint8_t * _PT_data=nullptr;
////////////////////////////////
bool mark;
uint8_t PT;
uint16_t seq;
uint32_t timestamp;
uint32_t ssrc;
};
class RTPpackage: public enable_shared_from_this<RTPpackage>
RTPcell(const uint8_t *head_d, uint16_t len, uint8_t prefix, uint16_t buff_pos, shared_ptr<RTPpackage> RTPpkg_ptr)
{
public:
typedef shared_ptr<RTPpackage> Ptr;
typedef function<void(const RTPcell::Ptr &rtp_cell)> rtp_cb_f;
private:
int isRTP(const uint8_t * data, uint16_t len,bool tail=false){
if(len==0) return tail?0:-1;
if(len>=_prefix)
if((data[0]==0x24 && data[4]==0x80) || (_is_UDP && data[0]==0x80))
return 1; // strong true
else
return -1; // strong false
else
return 0; // I don't no
}
uint16_t to_uint16(uint8_t H, uint8_t L){
return (H<<8) &0xff00 | (L);
}
RTPcell::Ptr makeRTPcell(const uint8_t * data, uint16_t len, uint16_t * buff_pos){
auto ret=isRTP(data,len);
RTPcell::Ptr rtpcell;
if(ret==1){
if (!_is_UDP)
{
uint16_t length=to_uint16(data[2],data[3]);
rtpcell = make_shared<RTPcell>(length,_prefix,*buff_pos,shared_from_this());
}else
{
rtpcell = make_shared<RTPcell>(len,_prefix,*buff_pos,shared_from_this());
}
auto apd_len=min(rtpcell->_RTP_len,len);
rtpcell->append_data(data,apd_len);
*buff_pos+=apd_len;
return rtpcell;
}else if (ret==0)
{
auto rtpcell = make_shared<RTPcell>(data,len,_prefix,*buff_pos,shared_from_this());
*buff_pos+=len;
return rtpcell;
}else
{
return nullptr;
}
}
public:
RTPpackage(const rtp_cb_f & cb,const RTPcell::Ptr & prev_rtp_cell){
_rtp_cb=cb;
_first_start=true;
_Rtp_cell=prev_rtp_cell;
_prefix=16;
_insert_len=0;
_inserted_len=0;
_insert_tail_pos=0;
// RTPpackage_total_count++;
// WRNGL("create RTPpackage %u\n", RTPpackage_total_count);
};
RTPpackage(const rtp_cb_f & cb,bool is_udp){
_rtp_cb=cb;
_first_start=true;
_Rtp_cell=nullptr;
if (is_udp) _prefix=12;
else _prefix=16;
_is_UDP=is_udp;
_insert_len=0;
_inserted_len=0;
_insert_tail_pos=0;
// RTPpackage_total_count++;
// WRNGL("create RTPpackage %u\n", RTPpackage_total_count);
};
~RTPpackage(){
// RTPpackage_total_count--;
// WRNGL("~RTPpackage %u\n", RTPpackage_total_count);
};
void clear_all(){
_Rtp_cell.reset();
// _buffer.clear();
}
void insertData(const uint8_t *data, uint16_t len, uint16_t head_pos, uint16_t insert_pos,uint16_t tail_remain=0){
uint8_t * head_ptr=(uint8_t *)_buffer.data()+head_pos;
size_t tail_pos=_buffer.size();
if(insert_pos<head_pos) return;
if(tail_pos<insert_pos) return;
if(isRTP(head_ptr,_prefix)<1) return;
if(len>sizeof(_insert_buff)) return;
memcpy(_insert_buff,data,len);
_insert_len=len;
_insert_pos=insert_pos;
_head_pos=head_pos;
_tail_remain=tail_remain;
}
_missing_head = true;
_RTPpkg_ptr = RTPpkg_ptr;
_assign_len = len;
_RTPpkg_head_pos = buff_pos;
_prefix = prefix;
_RTP_len = prefix;
_buffer = new uint8_t[(len / 2048 + 1) * 2048];
memcpy(_buffer, head_d, len);
if (_prefix == 16)
_tcp_prefix = 4;
else
_tcp_prefix = 0;
void insertData(){
if(_insert_len<=0) return;
_buffer.insert(_insert_pos,(char*)_insert_buff,_insert_len);
uint8_t *head_ptr=(uint8_t *)_buffer.data()+_head_pos;
if(!_is_UDP){
uint16_t length=to_uint16(head_ptr[2],head_ptr[3]);
length+=_insert_len;
head_ptr[2] = (length) >> 8;
head_ptr[3] = (length) & 0x00FF;
}
SUCSL("!!!!!!!!!! sucess insert sign data len:%u ,buf size %lu, head_pos:%d, insert_pos:%d\n",_insert_len,_buffer.size(),_head_pos,_insert_pos);
_inserted_len+=_insert_len;
_insert_tail_pos=_insert_pos+_insert_len-_tail_remain;
_insert_len=0;
}
// RTPcell_total_count++;
// SUCSL("create RTPcell %u\n",RTPcell_total_count);
}
int modifyData(uint16_t pos,uint8_t *data, size_t len){
if(pos+len>_buffer.size()) return -1;
auto ptr=_buffer.data()+pos;
memcpy((char*)ptr,data,len);
// RTPcell(const uint8_t *data,uint16_t len, uint8_t prefix){
// _buffer=new uint8_t[(len/2048+1)*2048];
// _RTP_len=len;
// _assign_len=0;
// if(_prefix==16) _tcp_prefix=4;
// else _tcp_prefix=0;
// }
~RTPcell()
{
// RTPcell_total_count--;
// WRNGL("~RTPcell %u\n",RTPcell_total_count);
delete _buffer;
}
void setRTPlen()
{
if (_tcp_prefix >= 4) {
_RTP_len = (_buffer[2] << 8) & 0xff00 | (_buffer[3]) + _tcp_prefix;
_buffer = (uint8_t *) realloc(_buffer, (_RTP_len / 2048 + 1) * 2048);
}
_missing_head = false;
}
int append_data(const uint8_t *data, uint16_t len)
{
if (_RTP_len >= _assign_len + len) {
memcpy(_assign_len + _buffer, data, len);
_assign_len += len;
return 1;
}
return -1;
}
int getLackLen() { return (int) _RTP_len - (int) _assign_len; }
void setPause(bool enable){
_pop_pause=enable;
}
bool isPause(){
return _pop_pause;
}
void precessRTP()
{
int findStartHead(const uint8_t * package, uint16_t len){
const uint8_t needle[]={0x24,0x00};
uint8_t * sus_head=(uint8_t*)package;
uint16_t rest_len=len;
do{
sus_head=(uint8_t*)memmem(sus_head,rest_len-5,needle,2);
if(sus_head){
if( sus_head[4]==0x80){
return sus_head-package;
}
rest_len-=sus_head-package;
}
return -1;
}while(rest_len<5);
return -1;
}
uint8_t *ptr = _buffer + _tcp_prefix;
mark = ptr[1] >> 7;
PT = ptr[1] & 0x7F;
memcpy(&seq, ptr + 2, 2);
seq = ntohs(seq);
memcpy(&timestamp, ptr + 4, 4);
timestamp = ntohl(timestamp);
memcpy(&ssrc, ptr + 8, 4);
ssrc = ntohl(ssrc);
RTPcell::Ptr assignRTPpackage(const uint8_t * package, uint16_t len,void* usr_param){
int ret=0;
_buffer.assign((char*)package,len);
_usr_param=usr_param;
if(!_Rtp_cell && isRTP(package,len)<1){
if(_is_UDP) ret=0;
else ret=findStartHead(package,len);
if(ret<0) return nullptr;
}
auto buf_ptr=(uint8_t*)_buffer.data();
_PT_header = ptr + 12;
_PT_len = _RTP_len - 12 - _tcp_prefix;
_PT_data = ptr + 12 + _tcp_prefix;
}
if(_Rtp_cell && _Rtp_cell->_missing_head){
auto lacklen=_Rtp_cell->getLackLen();
if(lacklen<=len){
_Rtp_cell->append_data(package,lacklen);
if(isRTP(_Rtp_cell->_buffer,_Rtp_cell->_assign_len)!=1) {
_Rtp_cell=nullptr;
assignRTPpackage(package,len,usr_param);
}else
{
ret+=lacklen;
_Rtp_cell->setRTPlen();
// lacklen=_Rtp_cell->getLackLen();
// _Rtp_cell->append_data(package,lacklen);
// offeset+=lacklen;
}
public:
uint8_t _tcp_prefix = 0;
uint8_t _prefix = 0;
uint16_t _RTP_len = 0;
uint16_t _assign_len = 0;
uint16_t _RTPpkg_head_pos = 0;
bool _missing_head = false;
uint8_t *_buffer = NULL;
shared_ptr<RTPpackage> _RTPpkg_ptr;
uint8_t *_PT_header = nullptr;
uint16_t _PT_len = 0;
uint8_t *_PT_data = nullptr;
////////////////////////////////
bool mark;
uint8_t PT;
uint16_t seq;
uint32_t timestamp;
uint32_t ssrc;
};
}
else{
_Rtp_cell=nullptr;
assignRTPpackage(package,len,usr_param);
}
class RTPpackage : public enable_shared_from_this<RTPpackage> {
public:
typedef shared_ptr<RTPpackage> Ptr;
typedef function<void(const RTPcell::Ptr &rtp_cell)> rtp_cb_f;
private:
int isRTP(const uint8_t *data, uint16_t len, bool tail = false)
{
auto IsRtpHeader = [](uint8_t c) {
// check version == 2
return (c & 0xC0) == 0x80 && (c & 0xF) == 0;
};
if (len == 0) return tail ? 0 : -1;
if (len >= _prefix)
if ((data[0] == 0x24 && IsRtpHeader(data[0]))
|| (_is_UDP && (data[0] == 0x80 || data[0] == 0xa0 || data[0] == 0x90)))
return 1;// strong true
else
return -1;// strong false
else
return 0;// I don't no
}
uint16_t to_uint16(uint8_t H, uint8_t L) { return (H << 8) & 0xff00 | (L); }
RTPcell::Ptr makeRTPcell(const uint8_t *data, uint16_t len, uint16_t *buff_pos)
{
auto ret = isRTP(data, len);
RTPcell::Ptr rtpcell;
if (ret == 1) {
if (!_is_UDP) {
uint16_t length = to_uint16(data[2], data[3]);
rtpcell = make_shared<RTPcell>(length, _prefix, *buff_pos, shared_from_this());
} else {
rtpcell = make_shared<RTPcell>(len, _prefix, *buff_pos, shared_from_this());
}
return splitRTP(buf_ptr,len,ret);
auto apd_len = min(rtpcell->_RTP_len, len);
rtpcell->append_data(data, apd_len);
*buff_pos += apd_len;
return rtpcell;
} else if (ret == 0) {
auto rtpcell = make_shared<RTPcell>(data, len, _prefix, *buff_pos, shared_from_this());
*buff_pos += len;
return rtpcell;
} else {
return nullptr;
}
}
public:
RTPpackage(const rtp_cb_f &cb, const RTPcell::Ptr &prev_rtp_cell)
{
_rtp_cb = cb;
_first_start = true;
_Rtp_cell = prev_rtp_cell;
_prefix = 16;
_insert_len = 0;
_inserted_len = 0;
_insert_tail_pos = 0;
// RTPpackage_total_count++;
// WRNGL("create RTPpackage %u\n", RTPpackage_total_count);
};
RTPpackage(const rtp_cb_f &cb, bool is_udp)
{
_rtp_cb = cb;
_first_start = true;
_Rtp_cell = nullptr;
if (is_udp)
_prefix = 12;
else
_prefix = 16;
_is_UDP = is_udp;
_insert_len = 0;
_inserted_len = 0;
_insert_tail_pos = 0;
// RTPpackage_total_count++;
// WRNGL("create RTPpackage %u\n", RTPpackage_total_count);
};
~RTPpackage() {
// RTPpackage_total_count--;
// WRNGL("~RTPpackage %u\n", RTPpackage_total_count);
};
void clear_all()
{
_Rtp_cell.reset();
// _buffer.clear();
}
void insertData(const uint8_t *data, uint16_t len, uint16_t head_pos, uint16_t insert_pos, uint16_t tail_remain = 0)
{
uint8_t *head_ptr = (uint8_t *) _buffer.data() + head_pos;
size_t tail_pos = _buffer.size();
if (insert_pos < head_pos) return;
if (tail_pos < insert_pos) return;
if (isRTP(head_ptr, _prefix) < 1) return;
if (len > sizeof(_insert_buff)) return;
memcpy(_insert_buff, data, len);
_insert_len = len;
_insert_pos = insert_pos;
_head_pos = head_pos;
_tail_remain = tail_remain;
}
void insertData()
{
if (_insert_len <= 0) return;
_buffer.insert(_insert_pos, (char *) _insert_buff, _insert_len);
uint8_t *head_ptr = (uint8_t *) _buffer.data() + _head_pos;
if (!_is_UDP) {
uint16_t length = to_uint16(head_ptr[2], head_ptr[3]);
length += _insert_len;
head_ptr[2] = (length) >> 8;
head_ptr[3] = (length) & 0x00FF;
}
private:
// RTPcell::Ptr missingRTP(const uint8_t * data, uint16_t len, uint16_t * buff_pos){
// const uint8_t needle[]={0x24,0x00};
// uint8_t * sus_head=(uint8_t*)memmem(data,len-5,needle,2);
// if(sus_head[4]==0x80){
// *buff_pos=data-sus_head;
// }else
// {
// /* code */
// }
SUCSL("!!!!!!!!!! sucess insert sign data len:%u ,buf size %lu, head_pos:%d, insert_pos:%d\n", _insert_len,
_buffer.size(), _head_pos, _insert_pos);
_inserted_len += _insert_len;
_insert_tail_pos = _insert_pos + _insert_len - _tail_remain;
_insert_len = 0;
}
int modifyData(uint16_t pos, uint8_t *data, size_t len)
{
if (pos + len > _buffer.size()) return -1;
auto ptr = _buffer.data() + pos;
memcpy((char *) ptr, data, len);
return 1;
}
// }
RTPcell::Ptr splitRTP(const uint8_t * package, uint16_t len,uint16_t ofst=0){
int lacklen=0;
uint16_t offeset=ofst;
void setPause(bool enable) { _pop_pause = enable; }
bool isPause() { return _pop_pause; }
do
{
if(!_Rtp_cell){
_Rtp_cell=makeRTPcell(package+offeset,len-offeset,&offeset);
if(!_Rtp_cell) {
WRNGL("NOT RTP PACKAGE \n");
int findStartHead(const uint8_t *package, uint16_t len)
{
// '$' = 0x24
const uint8_t needle[] = {0x24, 0x00};
uint8_t *sus_head = (uint8_t *) package;
uint16_t rest_len = len;
do {
sus_head = (uint8_t *) memmem(sus_head, rest_len - 5, needle, 2);
if (sus_head) {
if (sus_head[4] == 0x80) { return sus_head - package; }
rest_len -= sus_head - package;
}
return -1;
} while (rest_len < 5);
return -1;
}
RTPcell::Ptr assignRTPpackage(const uint8_t *package, uint16_t len, void *usr_param)
{
int ret = 0;
_buffer.assign((char *) package, len);
_usr_param = usr_param;
if (!_Rtp_cell && isRTP(package, len) < 1) {
if (_is_UDP)
ret = 0;
else
ret = findStartHead(package, len);
if (ret < 0) return nullptr;
}
auto buf_ptr = (uint8_t *) _buffer.data();
if (_Rtp_cell && _Rtp_cell->_missing_head) {
auto lacklen = _Rtp_cell->getLackLen();
if (lacklen <= len) {
_Rtp_cell->append_data(package, lacklen);
if (isRTP(_Rtp_cell->_buffer, _Rtp_cell->_assign_len) != 1) {
_Rtp_cell = nullptr;
assignRTPpackage(package, len, usr_param);
} else {
ret += lacklen;
_Rtp_cell->setRTPlen();
// lacklen=_Rtp_cell->getLackLen();
// _Rtp_cell->append_data(package,lacklen);
// offeset+=lacklen;
}
} else {
_Rtp_cell = nullptr;
assignRTPpackage(package, len, usr_param);
}
}
return splitRTP(buf_ptr, len, ret);
}
private:
// RTPcell::Ptr missingRTP(const uint8_t * data, uint16_t len, uint16_t * buff_pos){
// const uint8_t needle[]={0x24,0x00};
// uint8_t * sus_head=(uint8_t*)memmem(data,len-5,needle,2);
// if(sus_head[4]==0x80){
// *buff_pos=data-sus_head;
// }else
// {
// /* code */
// }
// }
RTPcell::Ptr splitRTP(const uint8_t *package, uint16_t len, uint16_t ofst = 0)
{
int lacklen = 0;
uint16_t offeset = ofst;
do {
if (!_Rtp_cell) {
_Rtp_cell = makeRTPcell(package + offeset, len - offeset, &offeset);
if (!_Rtp_cell) {
WRNGL("NOT RTP PACKAGE \n");
return nullptr;
}
if (_Rtp_cell->_missing_head) return _Rtp_cell;
}
lacklen = _Rtp_cell->getLackLen();
if (lacklen <= 0) {
_rtp_cb(_Rtp_cell);
_Rtp_cell.reset();
if ((int) len - (int) offeset == 0) return nullptr;
} else {
if ((int) len - (int) offeset - lacklen < 0) return _Rtp_cell;
auto ret = isRTP(package + offeset + lacklen, len - offeset - lacklen, true);
if (ret >= 0) {
_Rtp_cell->append_data(package + offeset, lacklen);
offeset += lacklen;
} else {
ret = findStartHead(package, len);
if (ret < 0) {
WRNGL("NOT CONTINUE RTP PACKAGE \n");
return nullptr;
}
if(_Rtp_cell->_missing_head) return _Rtp_cell;
NOTICEL("NOT CONTINUE RTP PACKAGE BUT START NEW \n");
_Rtp_cell = nullptr;
offeset += ret;
}
}
lacklen=_Rtp_cell->getLackLen();
if(lacklen<=0) {
_rtp_cb(_Rtp_cell);
_Rtp_cell.reset();
if((int)len-(int)offeset==0) return nullptr;
}
else{
if((int)len-(int)offeset-lacklen<0) return _Rtp_cell;
} while (1);
}
auto ret=isRTP(package+offeset+lacklen,len-offeset-lacklen,true);
if(ret>=0){
_Rtp_cell->append_data(package+offeset,lacklen);
offeset+=lacklen;
}
else{
ret=findStartHead(package,len);
if(ret<0) {
WRNGL("NOT CONTINUE RTP PACKAGE \n");
return nullptr;
}
NOTICEL("NOT CONTINUE RTP PACKAGE BUT START NEW \n");
_Rtp_cell=nullptr;
offeset+=ret;
}
}
private:
bool _pop_pause = false;
rtp_cb_f _rtp_cb;
bool _first_start = true;
RTPcell::Ptr _Rtp_cell = NULL;
uint8_t _prefix = 0;
uint8_t _insert_buff[2048];
uint16_t _insert_len;
uint16_t _head_pos;
uint16_t _insert_pos;
uint16_t _tail_remain;
bool _is_UDP = false;
} while (1);
}
public:
string _buffer;
uint16_t _insert_tail_pos;
uint16_t _inserted_len;
void *_usr_param = nullptr;
};
private:
bool _pop_pause=false;
rtp_cb_f _rtp_cb;
bool _first_start=true;
RTPcell::Ptr _Rtp_cell=NULL;
uint8_t _prefix=0;
uint8_t _insert_buff[2048];
uint16_t _insert_len;
uint16_t _head_pos;
uint16_t _insert_pos;
uint16_t _tail_remain;
bool _is_UDP=false;
public:
string _buffer;
uint16_t _insert_tail_pos;
uint16_t _inserted_len;
void* _usr_param=nullptr;
class RTPHeader {
private:
bool _mark;
uint8_t _PT;
uint16_t _seq;
uint32_t _timestamp;
uint32_t _ssrc;
uint8_t _header_buff[16];
};
public:
RTPHeader() {};
~RTPHeader() {};
class RTPHeader
{
private:
bool _mark;
uint8_t _PT;
uint16_t _seq;
uint32_t _timestamp;
uint32_t _ssrc;
uint8_t _header_buff[16];
public:
RTPHeader(){};
~RTPHeader(){};
void setRawHeader(uint8_t* data,uint8_t len){
memcpy(_header_buff,data,len);
}
};
void setRawHeader(uint8_t *data, uint8_t len) { memcpy(_header_buff, data, len); }
};
#endif
#endif

39
src/layer2/frame.cpp Normal file
View File

@ -0,0 +1,39 @@
#include "layer2/frame.h"
Frame::Ptr
Frame::CreateUDPFrame(const uint8_t *const data, std::size_t len)
{
return std::shared_ptr<Frame>(new Frame(kUDP, data, len));
}
Frame::Ptr
Frame::CreateTCPFrame(const uint8_t *const data, std::size_t len)
{
return std::shared_ptr<Frame>(new Frame(kTCP, data, len));
}
Frame::Type
Frame::type() const
{
return _type;
}
void *
Frame::user_param() const
{
return _user_param;
}
void
Frame::set_user_param(void *param)
{
_user_param = param;
}
const std::vector<uint8_t> &
Frame::data() const
{
return _data;
}
Frame::Frame(Type type, const uint8_t *const data, std::size_t len) : _type(type), _data(data, data + len) {}

37
src/layer2/frame.h Normal file
View File

@ -0,0 +1,37 @@
#ifndef AW_SECURITY_MEDIA_LIB_LAYER2_FRAME_H
#define AW_SECURITY_MEDIA_LIB_LAYER2_FRAME_H
#pragma once
#include <cstdint>
#include <memory>
#include <vector>
class Frame {
public:
using Ptr = std::shared_ptr<Frame>;
enum Type {
kTCP,
kUDP,
};
static Frame::Ptr CreateUDPFrame(const uint8_t *const data, std::size_t len);
static Frame::Ptr CreateTCPFrame(const uint8_t *const data, std::size_t len);
Type type() const;
const std::vector<uint8_t> &data() const;
void *user_param() const;
void set_user_param(void *param);
private:
Frame(Type type, const uint8_t *const data, std::size_t len);
private:
Type _type;
std::vector<uint8_t> _data;
void *_user_param = nullptr;
};
#endif// AW_SECURITY_MEDIA_LIB_LAYER2_FRAME_H

37
src/layer2/l2_manager.cpp Normal file
View File

@ -0,0 +1,37 @@
#include "l2_manager.h"
#include "HuaWei/HWcommon.h"
void
L2Manager::FrameInput(Frame::Ptr frame)
{
std::lock_guard<std::mutex> _(_mutex);
OnFrameInput(frame);
_frame_queue.push(frame);
}
Frame::Ptr
L2Manager::FrameOutput()
{
std::lock_guard<std::mutex> _(_mutex);
auto frame = _frame_queue.front();
_frame_queue.pop();
OnFrameOutput(_frame_queue.front());
return frame;
}
std::size_t
L2Manager::size() const
{
std::lock_guard<std::mutex> _(_mutex);
return _frame_queue.size();
}
bool
L2Manager::empty() const
{
std::lock_guard<std::mutex> _(_mutex);
return _frame_queue.empty();
}

25
src/layer2/l2_manager.h Normal file
View File

@ -0,0 +1,25 @@
#ifndef AW_SECURITY_MEDIA_LIB_LAYER2_L2_MANAGER_H
#define AW_SECURITY_MEDIA_LIB_LAYER2_L2_MANAGER_H
#pragma once
#include "HuaWei/queue.h"
#include "layer2/frame.h"
#include "sigslot.h"
#include <queue>
class L2Manager {
public:
sigslot::signal1<Frame::Ptr> OnFrameInput;
sigslot::signal1<Frame::Ptr> OnFrameOutput;
void FrameInput(Frame::Ptr frame);
Frame::Ptr FrameOutput();
std::size_t size() const;
bool empty() const;
private:
std::queue<Frame::Ptr> _frame_queue;
mutable std::mutex _mutex;
};
#endif// AW_SECURITY_MEDIA_LIB_LAYER2_L2_MANAGER_H

View File

@ -0,0 +1,77 @@
#include "layer4/rtp_manager.h"
#include "HuaWei/HWcommon.h"
RTPManager::RTPManager() : _stopped(false), _cv(), _mutex(), _packet_queue(), _rtp_decoder()
{
_worker = new std::thread([this] { WorkProc(); });
_rtp_decoder.setOnDecode([this](const uint8_t *packet, int bytes, uint32_t timestamp, int flags) {
OnDecodeCB(packet, bytes, timestamp, flags);
});
}
RTPManager::~RTPManager()
{
_stopped.store(true);
_worker->join();
delete _worker;
}
void
RTPManager::OnFrame(Frame::Ptr frame)
{
RTPPacket::Ptr packet = RTPPacket::CreateUDP(frame->data().data(), frame->data().size());
if (packet->packet_type() == RTPPacket::kTCP) {
WRNGL("Unsupport TCP\n");
} else if (packet->packet_type() == RTPPacket::kUDP) {
if (!packet->PacketIsComplete()) {
INFOL("Incomplete RTP Packet\n");
return;
}
Enqueue(packet);
}
}
void
RTPManager::Enqueue(RTPPacket::Ptr packet)
{
std::lock_guard<std::mutex> _(this->_mutex);
_packet_queue.push(packet);
_cv.notify_all();
}
RTPPacket::Ptr
RTPManager::Dequeue(bool wait)
{
std::unique_lock<std::mutex> _(this->_mutex);
if (wait) {
_cv.wait(_, [this] { return !_packet_queue.empty(); });
} else if (_packet_queue.empty()) {
return nullptr;
}
auto packet = _packet_queue.front();
_packet_queue.pop();
return packet;
}
void
RTPManager::OnDecodeCB(const uint8_t *packet, int bytes, uint32_t timestamp, int flags)
{
TRACEL("RTP Packet: %p, %d, %u, %d\n", packet, bytes, timestamp, flags);
OnPSFrame(packet, bytes, timestamp, flags);
}
void
RTPManager::WorkProc()
{
while (!_stopped.load(std::memory_order_relaxed)) {
RTPPacket::Ptr packet = Dequeue(true);
if (!packet) {
ERROL("RTP Packet is nullptr\n");
continue;
}
TRACEL("RTP Pcket seq: %u , ssrc=%u, len=%lu\n", packet->sequence_number(), packet->ssrc(),
packet->rtp_packet_size());
_rtp_decoder.decodeRtp(packet->rtp_packet(), packet->rtp_packet_size());
}
}

45
src/layer4/rtp_manager.h Normal file
View File

@ -0,0 +1,45 @@
#ifndef AW_SECURITY_MEDIA_LIB_LAYER3_RTP_MANAGER_H
#define AW_SECURITY_MEDIA_LIB_LAYER3_RTP_MANAGER_H
#pragma once
#include "GB28181/RtpDecoder.h"
#include "HuaWei/queue.h"
#include "layer2/frame.h"
#include "layer4/rtp_packet.h"
#include "sigslot.h"
#include <atomic>
class RTPManager : public sigslot::has_slots<> {
public:
// packet, bytes, timestamp, flags
sigslot::signal4<const uint8_t *, int, uint32_t, int> OnPSFrame;
using Ptr = std::shared_ptr<RTPManager>;
public:
RTPManager();
~RTPManager();
void OnFrame(Frame::Ptr frame);
private:
void Enqueue(RTPPacket::Ptr packet);
RTPPacket::Ptr Dequeue(bool wait = true);
void WorkProc();
// typedef function<void(const uint8_t *packet, int bytes, uint32_t timestamp, int flags)> on_decode_t;
void OnDecodeCB(const uint8_t *packet, int bytes, uint32_t timestamp, int flags);
private:
std::atomic<bool> _stopped;
std::thread *_worker;
std::mutex _mutex;
std::condition_variable _cv;
std::queue<RTPPacket::Ptr> _packet_queue;
RtpDecoder _rtp_decoder;
};
#endif// AW_SECURITY_MEDIA_LIB_LAYER3_RTP_MANAGER_H

216
src/layer4/rtp_packet.cpp Normal file
View File

@ -0,0 +1,216 @@
#include "layer4/rtp_packet.h"
#include "HuaWei/HWcommon.h"
#include <arpa/inet.h>
#include <assert.h>
#include <limits>
RTPPacket::Ptr
RTPPacket::CreateUDP(const uint8_t *data, std::size_t len)
{
auto p = std::shared_ptr<RTPPacket>(new RTPPacket(kUDP));
p->Append(data, len);
return p;
}
RTPPacket::Ptr
RTPPacket::CreateTCP(const uint8_t *data, std::size_t len)
{
auto p = std::shared_ptr<RTPPacket>(new RTPPacket(kUDP));
p->Append(data, len);
return p;
}
std::size_t
RTPPacket::GetHeaderSize(PacketType type)
{
switch (type) {
case RTPPacket::kTCP:
return 16;
case RTPPacket::kUDP:
return 12;
default:
ERROL("Unknown packet type.\n");
return std::numeric_limits<std::size_t>::max();
}
}
RTPPacket::~RTPPacket() {}
bool
RTPPacket::CanBeParsed() const
{
if (_data.empty()) { return true; }
if (packet_type() == kTCP) {
if (_data[0] != '$') { return false; }
if (_data.size() >= 5 && version() != 2) { return false; }
} else if (packet_type() == kUDP) {
if (version() != 2) { return false; }
} else {
return false;
}
return true;
}
bool
RTPPacket::FixedHeaderIsComplete() const
{
if (!CanBeParsed()) { return false; }
if (GetHeaderSize(packet_type()) > _data.size()) { return false; }
return true;
}
bool
RTPPacket::HeaderIsComplete() const
{
if (!FixedHeaderIsComplete()) {
TRACEL("FixedHeader not is complete.\n");
return false;
}
const std::size_t base_header_len = GetHeaderSize(packet_type());
// 首先检查固定头长度
const std::size_t fixed_header_len = base_header_len + csrc_count() * 4;
if (_data.size() < fixed_header_len) { return false; }
// 检查扩展头长度
if (has_extension()) {
if (_data.size() < fixed_header_len + 4) { return false; }
const std::size_t header_len = fixed_header_len + 4 + extension_length();
if (_data.size() < header_len) { return false; }
}
return true;
}
bool
RTPPacket::PacketIsComplete() const
{
if (!HeaderIsComplete()) { return false; }
// tcp 需要检查后续长度是否满足 tcp_payload_size
if (packet_type() == kTCP && tcp_payload_size() + 4 > _data.size()) { return false; }
// 如果已经是整个rtp packet检查 padding
// if (has_padding()) {
// const std::size_t padding_len = _data.back();
// if (padding_len > _data.size() - rtp_fixed_header_size()) { return false; }
// }
return true;
}
void
RTPPacket::Append(const uint8_t *const data, std::size_t len)
{
std::copy(data, data + len, std::back_inserter(_data));
}
std::size_t
RTPPacket::lack_len() const
{
if (packet_type() == kTCP) {
if (FixedHeaderIsComplete()) {}
}
return 0;
}
std::uint8_t
RTPPacket::version() const
{
return *rtp_header() >> 6;
}
bool
RTPPacket::has_padding() const
{
return *rtp_header() & 0x20;
}
bool
RTPPacket::has_extension() const
{
return *rtp_header() & 0x10;
}
std::uint8_t
RTPPacket::csrc_count() const
{
uint8_t cnt = *rtp_header() & 0x0f;
return cnt;
}
bool
RTPPacket::has_marker() const
{
return *(rtp_header() + 1) & 0x80;
}
std::uint8_t
RTPPacket::payload_type() const
{
return *(rtp_header() + 1) & 0x7f;
}
std::uint16_t
RTPPacket::sequence_number() const
{
uint16_t *p = (uint16_t *) (rtp_header() + 2);
return ntohs(*p);
}
std::uint32_t
RTPPacket::timestamp() const
{
uint32_t *p = (uint32_t *) (rtp_header() + 4);
return ntohl(*p);
}
std::uint32_t
RTPPacket::ssrc() const
{
uint32_t *p = (uint32_t *) (rtp_header() + 8);
return ntohl(*p);
}
std::size_t
RTPPacket::tcp_payload_size() const
{
assert(_type == kTCP && "Must be TCP packet.\n");
if (CanBeParsed() && _data.size() >= 4) { return ntohs(*(uint16_t *) (_data.data() + 2)); }
return 0;
}
std::uint8_t *
RTPPacket::rtp_packet() const
{
return rtp_header();
}
std::size_t
RTPPacket::rtp_packet_size() const
{
return _data.size() - _rtp_header_offset;
}
std::uint8_t *
RTPPacket::rtp_header() const
{
return const_cast<std::uint8_t *>(_data.data()) + _rtp_header_offset;
}
std::size_t
RTPPacket::rtp_fixed_header_size() const
{
return 12 + csrc_count() * 4;
}
std::size_t
RTPPacket::extension_length() const
{
if (!has_extension()) { return 0; }
auto extension_header = rtp_header() + rtp_fixed_header_size();
uint16_t *p = (uint16_t *) (extension_header + 2);
return ntohs(*p);
}
RTPPacket::RTPPacket(PacketType type) : _type(type), _rtp_header_offset(type == kTCP ? 4 : 0) {}

75
src/layer4/rtp_packet.h Normal file
View File

@ -0,0 +1,75 @@
#ifndef AW_SECURITY_MEDIA_LIB_LAYER3_RTP_PACKET_H
#define AW_SECURITY_MEDIA_LIB_LAYER3_RTP_PACKET_H
#pragma once
#include <cstdint>
#include <memory>
#include <vector>
class RTPPacket {
public:
using Ptr = std::shared_ptr<RTPPacket>;
enum PacketType {
kTCP,
kUDP,
};
class RTPHeaderExtension {
public:
RTPHeaderExtension(RTPPacket::Ptr rtp_packet);
std::uint16_t profile();
std::uint16_t length();
std::uint16_t data();
};
public:
static RTPPacket::Ptr CreateUDP(const uint8_t *data, std::size_t len);
static RTPPacket::Ptr CreateTCP(const uint8_t *data, std::size_t len);
static std::size_t GetHeaderSize(PacketType type);
~RTPPacket();
bool CanBeParsed() const;
bool FixedHeaderIsComplete() const;
bool HeaderIsComplete() const;
bool PacketIsComplete() const;
void Append(const uint8_t *const data, std::size_t len);
std::size_t lack_len() const;
PacketType packet_type() const { return _type; }
// Header Info
std::uint8_t version() const;
bool has_padding() const;
bool has_extension() const;
std::uint8_t csrc_count() const;
bool has_marker() const;
std::uint8_t payload_type() const;
std::uint16_t sequence_number() const;
std::uint32_t timestamp() const;
std::uint32_t ssrc() const;
std::size_t tcp_payload_size() const;
std::uint8_t *rtp_packet() const;
std::size_t rtp_packet_size() const;
std::size_t rtp_fixed_header_size() const;
std::size_t extension_length() const;
std::size_t padding_length() const;
private:
RTPPacket(PacketType type);
std::uint8_t *rtp_header() const;
private:
PacketType _type;
std::vector<std::uint8_t> _data;
// for tcp: offset = 4
// for udp: offset = 0
std::size_t _rtp_header_offset;
};
#endif// AW_SECURITY_MEDIA_LIB_LAYER3_RTP_PACKET_H