新闻  |   论坛  |   博客  |   在线研讨会
C++部署的性能优化方法
地平线开发者 | 2025-04-28 18:32:59    阅读:13   发布文章

一、使用结构体提前存放常用变量

在编写前后处理函数时,通常会多次用到一些变量,比如模型输入 tensor 的 shape,count 等等,若在每个处理函数中都重复计算一次,会增加部署时的计算量。对于这种情况,可以考虑使用结构体,并定义一个初始化函数。先计算好需要的值,之后需要用到该变量的时候直接引用(&)传递即可。


// 定义结构体
struct ModelInfo {
   hbDNNPackedHandle_t packed_handle;
   hbDNNHandle_t       model_handle;
   const char *        model_path;
   const char **       model_name_list;
   int model_count;
   int input_count;
   int output_count;
};
// 函数声明
int init_model(ModelInfo &model_info);
int other_function(ModelInfo &model_info, ...);
//主函数
int main(){
   // 初始化
   ModelInfo prefill_model = {0};
   prefill_model.model_path = drobotics_model_path_prefill.c_str();
   init_model(prefill_model);
   // 在其他函数中使用引用传递相关参数
   other_function(prefill_model, ...);
   return 0;
}
// 初始化函数的完整定义
int init_model(ModelInfo &model_info) {
   hbDNNInitializeFromFiles(&model_info.packed_handle, &model_info.model_path, 1);
   HB_CHECK_SUCCESS(hbDNNGetModelNameList(&model_info.model_name_list, &model_info.model_count, model_info.packed_handle),
           "hbDNNGetModelNameList failed");
   HB_CHECK_SUCCESS(hbDNNGetModelHandle(&model_info.model_handle, model_info.packed_handle, model_info.model_name_list[0]),
       "hbDNNGetModelHandle failed");
   HB_CHECK_SUCCESS(hbDNNGetInputCount(&model_info.input_count, model_info.model_handle), "hbDNNGetInputCount failed");
   HB_CHECK_SUCCESS(hbDNNGetOutputCount(&model_info.output_count, model_info.model_handle), "hbDNNGetOutputCount failed");
   return 0;
}
// 其他函数参数中使用引用传递
int other_function(ModelInfo &model_info, ...){
   ...
}


二、函数使用引用代替值传递

考虑到 C++的特性,函数的参数建议使用引用 (&) 来代替值传递,有这几个显著优点:

    只将原对象的引用传递给函数,避免不必要的拷贝,降低计算耗时

    因为不会复制数据,所以引用相比值传递可以避免内存的重复开销,降低内存占用

但需要注意,引用会允许函数修改原始数据,因此若不希望原始数据被修改,请不要使用引用方法。



三、量化/反量化融合

3.1 在前后处理的循环中融合

在前后处理中通常会遍历数据,而量化/反量化也会遍历数据,因此可以考虑合并计算,以减少数据遍历耗时。这是最常见的量化/反量化融合思路,可以直接参考 ai benchmark 中的大量源码示例。

3.2 将数据存进 tensor 时融合

如果在前处理中没找到融合的机会,那么也可以在数据复制进 input tensor 的时候做量化计算。

int64_t kv_count = 0;
int8_t* input_ptr = reinterpret_cast<int8_t*>(model_info.input_tensors[i].sysMem.virAddr);
for (int n = 0; n < total_count; n++) {
   input_ptr[n] = quantize_int8(kv_decode[kv_count++], cur_scale, cur_zero_point);
}


3.3 填充初始值时,提前计算量化后的值

有时我们想给模型准备特定的输入,比如生成一个全 0 数组,再为数组的特定区域填充某个固定的浮点值。在这种情况下,如果先生成完整的浮点数组,再遍历整个数组做量化,会产生不必要的遍历耗时,常见的优化思路是先提前计算好填充值量化后的结果,填充的时候直接填入定点值,这样就可以避免多余的量化耗时。

std::vector<int16_t> prepare_decode_attention_mask(ModelInfo &model_info,
   DecodeInfo &decode_info, PrefillInfo &prefill_info, int decode_infer_num){

   // 初始化全 0 数组

   std::vector<int16_t> decode_attention_mask_int(decode_info.kv_cache_len, 0);
   // 提前计算填充值量化后的结果
   hbDNNQuantiScale scale = model_info.input_tensors[1].properties.scale;
   auto cur_scale = scale.scaleData[0];
   auto cur_zero_point = scale.zeroPointData[0];
   int16_t pad_value_int = quantize_s16(-2048.0, cur_scale, cur_zero_point);
   // 将量化后的填充值填充到数组中特定区域
   for(int i = 0; i < decode_info.kv_cache_len - prefill_info.tokens_len
       - decode_infer_num -1; i++){
       decode_attention_mask_int[i] = pad_value_int;
   }

   // 返回相当于已经量化了的数组

   return decode_attention_mask_int;
}


3.4 根据后处理的实际作用,跳过反量化

在某些情况下,比如后处理只做 argmax 时,完全没有必要做反量化,直接使用整型数据做 argmax 即可。需要用户根据后处理的具体原理来判断是否使用这种优化方法。

// 直接对模型输出的 int16_t 数据做 argmax 计算
int logits_argmax(std::vector<hbDNNTensor> &output_tensor) {
   auto data_tensor = reinterpret_cast<int16_t *>(output_tensor[0].sysMem.virAddr);
   int maxIndex = -1;
   int maxValue = -32768;
   for (int i = 0; i < 151936; ++i) {
       if (data_tensor[i] > maxValue) {
           maxValue = data_tensor[i];
           maxIndex = i;
       }
   }
   return maxIndex;
}


四、循环推理同个模型时,输出数据直接存进输入 tensor

在某些情况下,我们希望 C++程序能重复推理同一个模型,并且模型上一帧的输出可以作为下一帧的输入。如果按照常规手段,我们可能会将输出 tensor 的内容保存到特定数组,再把这个数组拷贝到输入 tensor,这样一来一回就产生了两次数据拷贝的耗时,也占用了更多内存。实际上,我们可以将模型的输出 tensor 地址直接指向输入 tensor,这样模型第一帧的推理结果会直接写在输入 tensor 上,推理第二帧的时候就可以直接利用这份数据,不需要再单独准备输入,可以节省大量耗时。

如果想使用该方法,需要模型输入输出对应节点的 shape/stride 等信息完全相同。此外,如果模型删除了量化/反量化算子,并且对应的 scale 完全相同,那么重复利用的这部分 tensor 是不需要 flush 的(因为不涉及 CPU 操作),还可进一步节约耗时。

这里举个例子详细说明一下。

假设我们有一个模型,这个模型有 59 个输入节点(0-58),57 个输出节点(0-56),量化/反量化算子均已删除,且输入输出最后 56 个节点对应的 scale/shape/stride 等信息均相同。在第一帧推理完成后,输出节点 1-56 的值需要传递给输入节点的 3-58,那么我们在分配模型输入输出 tensor 的时候,输出 tensor 只需要为 1 分配即可,在分配输入 tensor 时,3-58 的 tensor 可以同时 push_back 给输出 tensor。具体来说,可以这样写:


int prepare_tensor(std::vector<hbDNNTensor> & input_tensor, std::vector<hbDNNTensor> & output_tensor,
                  hbDNNHandle_t dnn_handle) {
   int input_count  = 0;
   int output_count = 0;
   hbDNNGetInputCount(&input_count, dnn_handle);
   hbDNNGetOutputCount(&output_count, dnn_handle);
   for (int i = 0; i < 1; i++) {
       hbDNNTensor output;
       HB_CHECK_SUCCESS(hbDNNGetOutputTensorProperties(&output.properties, dnn_handle, i),
                        "hbDNNGetOutputTensorProperties failed");
       int output_memSize = output.properties.alignedByteSize;
       HB_CHECK_SUCCESS(hbUCPMallocCached(&output.sysMem, output_memSize, 0), "hbUCPMallocCached failed");
       output_tensor.push_back(output);
   }

   for (int i = 0; i < input_count; i++) {
       hbDNNTensor input;
       HB_CHECK_SUCCESS(hbDNNGetInputTensorProperties(&input.properties, dnn_handle, i),
                        "hbDNNGetInputTensorProperties failed");
       int input_memSize = input.properties.alignedByteSize;
       HB_CHECK_SUCCESS(hbUCPMallocCached(&input.sysMem, input_memSize, 0), "hbUCPMallocCached failed");
       input_tensor.push_back(input);
       if(i > 2){
           output_tensor.push_back(input);
       }
   }
   return 0;
}


在模型推理时,重复利用的这部分 tensor 不需要再 flush,因此只需要给 output_tensor 的 0,以及 input_tensor 的 0/1/2 进行 flush 操作即可(这几个 tensor 和 CPU 产生了交互)。


while(1){
   hbUCPTaskHandle_t task_handle_decode{nullptr};
   hbDNNTensor *output_decode = decode_model.output_tensors.data();
   HB_CHECK_SUCCESS(hbDNNInferV2(&task_handle_decode, output_decode,
       decode_model.input_tensors.data(), decode_model.model_handle), "hbDNNInferV2 failed");
   hbUCPSchedParam ctrl_param_decode;
   HB_UCP_INITIALIZE_SCHED_PARAM(&ctrl_param_decode);
   ctrl_param_decode.backend = HB_UCP_BPU_CORE_ANY;
   HB_CHECK_SUCCESS(hbUCPSubmitTask(task_handle_decode, &ctrl_param_decode), "hbUCPSubmitTask failed");
   HB_CHECK_SUCCESS(hbUCPWaitTaskDone(task_handle_decode, 0), "hbUCPWaitTaskDone failed");
   // 只刷新一部分输出内存(output_tensor 0)
   hbUCPMemFlush(&decode_model.output_tensors[0].sysMem, HB_SYS_MEM_CACHE_INVALIDATE);
   HB_CHECK_SUCCESS(hbUCPReleaseTask(task_handle_decode), "hbUCPReleaseTask failed");
   // 后处理(只针对 output_tensor 0)
   decode_argmax_id = logits_argmax(decode_model.output_tensors);
   // 准备下一帧推理的 input_tensor 0/1/2 输入数据
   prepare_input_tensor(...);
   // 只刷新一部分输入内存(input_tensor 0/1/2)
   for (int i = 0; i < 3; i++) {
       hbUCPMemFlush(&decode_model.input_tensors[i].sysMem, HB_SYS_MEM_CACHE_CLEAN);
   }


此外,如果使用了这种优化方法,那么在模型推理结束释放内存时,要避免同一块内存的重复释放。对于该案例,input_tensor 全部释放完毕后,output_tensor 只需要释放 output_tensor 0。


for (int i = 0; i < decode_model.input_count; i++) {
   HB_CHECK_SUCCESS(hbUCPFree(&(decode_model.input_tensors[i].sysMem)), "hbUCPFree decode_model.input_tensors failed");
}
for (int i = 0; i < 1; i++) {
   HB_CHECK_SUCCESS(hbUCPFree(&(decode_model.output_tensors[i].sysMem)), "hbUCPFree decode_model.output_tensors failed");
}



五、多线程后处理

对于 yolo v5 这种有三个输出头的模型,可以考虑使用三个线程同时对三个输出头做后处理,以显著提升性能。


*博客内容为网友个人发布,仅代表博主个人观点,如有侵权请联系工作人员删除。

参与讨论
登录后参与讨论
推荐文章
最近访客