深入昇腾AI开发

引言

在人工智能算力需求激增的今天,国产AI芯片正成为技术自主可控的关键一环。华为昇腾(Ascend)系列AI处理器及其配套软件栈,构建了一套完整的全栈全场景AI解决方案。作为开发者了解和使用昇腾技术的核心入口,hiascend.com(华为昇腾社区)提供了从文档、工具、模型到示例代码的一站式支持。

本文将围绕 hiascend.com 提供的官方资源,介绍昇腾AI开发的基本流程,并通过一段可运行的 Python 代码演示如何在昇腾设备上完成模型推理,帮助初学者快速上手这一国产AI生态。


昇腾AI开发基础架构

昇腾AI生态的核心组件包括:

  • Ascend AI 芯片:如 Ascend 310(推理)、Ascend 910(训练)。
    在这里插入图片描述在这里插入图片描述
  • CANN(Compute Architecture for Neural Networks):昇腾异构计算架构,提供底层驱动、编译器、运行时库等。
    在这里插入图片描述
  • MindSpore / TensorFlow / PyTorch 支持:通过插件或转换工具适配主流框架。
  • 在这里插入图片描述
  • ATC(Ascend Tensor Compiler):将通用模型(如 ONNX)转换为昇腾专用的 .om 模型格式。
  • 在这里插入图片描述
  • MindX SDK:面向行业应用的高层开发套件,简化部署流程。

实战:使用 ACL(Ascend Computing Language)进行图像分类推理

ACL图像分类推理示例
使用ResNet50模型进行图像分类
import numpy as np
import cv2
import os
import sys
import time
from typing import Dict, List, Tuple
import acl
import acllite_utils as utils

class ACLImageClassifier:
“”“ACL图像分类器类”“”

def __init__(self, model_path: str, device_id: int = 0):
    """
    初始化ACL环境和模型
    
    Args:
        model_path: 模型文件路径
        device_id: 设备ID
    """
    self.device_id = device_id
    self.context = None
    self.stream = None
    self.model_id = None
    self.model_desc = None
    self.input_data = None
    self.output_data = None
    self.input_buffer = None
    self.output_buffer = None
    
    # 初始化ACL
    self._init_acl()
    
    # 加载模型
    self._load_model(model_path)
    
    # 准备输入输出缓冲区
    self._prepare_buffers()
    
    # 初始化预处理参数
    self._init_preprocess_params()
    
def _init_acl(self):
    """初始化ACL环境"""
    print("初始化ACL环境...")
    
    # 初始化ACL
    ret = acl.init()
    if ret != 0:
        raise RuntimeError(f"ACL初始化失败,错误码:{ret}")
    
    # 打开设备
    ret = acl.rt.set_device(self.device_id)
    if ret != 0:
        raise RuntimeError(f"设置设备失败,错误码:{ret}")
    
    # 创建上下文
    self.context = acl.rt.create_context(self.device_id)
    if self.context is None:
        raise RuntimeError("创建上下文失败")
    
    # 创建流
    self.stream = acl.rt.create_stream()
    if self.stream is None:
        raise RuntimeError("创建流失败")
    
    print("ACL环境初始化成功")

def _load_model(self, model_path: str):
    """加载离线模型"""
    print(f"加载模型:{model_path}")
    
    if not os.path.exists(model_path):
        raise FileNotFoundError(f"模型文件不存在:{model_path}")
    
    # 加载模型
    self.model_id, ret = acl.mdl.load_from_file(model_path)
    if ret != 0:
        raise RuntimeError(f"加载模型失败,错误码:{ret}")
    
    # 创建模型描述
    self.model_desc = acl.mdl.create_desc()
    if self.model_desc is None:
        raise RuntimeError("创建模型描述失败")
    
    # 获取模型描述
    ret = acl.mdl.get_desc(self.model_desc, self.model_id)
    if ret != 0:
        raise RuntimeError(f"获取模型描述失败,错误码:{ret}")
    
    print("模型加载成功")

def _prepare_buffers(self):
    """准备输入输出缓冲区"""
    print("准备输入输出缓冲区...")
    
    # 获取输入数量
    input_num = acl.mdl.get_num_inputs(self.model_desc)
    if input_num <= 0:
        raise RuntimeError("模型没有输入")
    
    # 获取输出数量
    output_num = acl.mdl.get_num_outputs(self.model_desc)
    if output_num <= 0:
        raise RuntimeError("模型没有输出")
    
    # 为第一个输入准备缓冲区
    input_desc = acl.mdl.get_input_desc(self.model_desc, 0)
    input_size = acl.mdl.get_input_size_by_index(self.model_desc, 0)
    
    # 创建输入数据结构
    self.input_data = acl.mdl.create_dataset()
    self.input_buffer = acl.rt.malloc(input_size, acl.rt.MEM_MALLOC_HUGE_FIRST)
    
    if self.input_buffer is None:
        raise RuntimeError("创建输入缓冲区失败")
    
    # 添加输入缓冲区到数据集
    input_data_buffer = acl.create_data_buffer(self.input_buffer, input_size)
    if input_data_buffer is None:
        raise RuntimeError("创建输入数据缓冲区失败")
    
    ret = acl.mdl.add_dataset_buffer(self.input_data, input_data_buffer)
    if ret != 0:
        raise RuntimeError(f"添加输入缓冲区失败,错误码:{ret}")
    
    # 为第一个输出准备缓冲区
    output_desc = acl.mdl.get_output_desc(self.model_desc, 0)
    output_size = acl.mdl.get_output_size_by_index(self.model_desc, 0)
    
    # 创建输出数据结构
    self.output_data = acl.mdl.create_dataset()
    self.output_buffer = acl.rt.malloc(output_size, acl.rt.MEM_MALLOC_HUGE_FIRST)
    
    if self.output_buffer is None:
        raise RuntimeError("创建输出缓冲区失败")
    
    # 添加输出缓冲区到数据集
    output_data_buffer = acl.create_data_buffer(self.output_buffer, output_size)
    if output_data_buffer is None:
        raise RuntimeError("创建输出数据缓冲区失败")
    
    ret = acl.mdl.add_dataset_buffer(self.output_data, output_data_buffer)
    if ret != 0:
        raise RuntimeError(f"添加输出缓冲区失败,错误码:{ret}")
    
    print(f"输入缓冲区大小:{input_size} bytes")
    print(f"输出缓冲区大小:{output_size} bytes")
    print("缓冲区准备完成")

def _init_preprocess_params(self):
    """初始化预处理参数"""
    # ResNet50输入尺寸
    self.input_height = 224
    self.input_width = 224
    
    # ImageNet均值和标准差
    self.mean = np.array([123.675, 116.28, 103.53], dtype=np.float32)
    self.std = np.array([58.395, 57.12, 57.375], dtype=np.float32)
    
    # RGB转BGR(OpenCV读取的是BGR格式)
    self.channel_swap = (2, 1, 0)

def preprocess(self, image_path: str) -> np.ndarray:
    """
    图像预处理
    
    Args:
        image_path: 图像文件路径
    
    Returns:
        预处理后的图像数据
    """
    # 读取图像
    img = cv2.imread(image_path)
    if img is None:
        raise ValueError(f"无法读取图像:{image_path}")
    
    # 调整大小
    img_resized = cv2.resize(img, (self.input_width, self.input_height))
    
    # 转换为float32
    img_float = img_resized.astype(np.float32)
    
    # 转换通道顺序 BGR -> RGB
    img_rgb = cv2.cvtColor(img_float, cv2.COLOR_BGR2RGB)
    
    # 减去均值
    img_normalized = img_rgb - self.mean
    
    # 除以标准差
    img_normalized = img_normalized / self.std
    
    # 转换为CHW格式
    img_chw = np.transpose(img_normalized, (2, 0, 1))
    
    # 添加batch维度
    img_batch = np.expand_dims(img_chw, axis=0)
    
    # 转换为连续数组
    img_contiguous = np.ascontiguousarray(img_batch)
    
    return img_contiguous

def inference(self, image_data: np.ndarray) -> np.ndarray:
    """
    执行推理
    
    Args:
        image_data: 预处理后的图像数据
    
    Returns:
        推理结果
    """
    # 将数据复制到设备
    input_size = image_data.size * image_data.itemsize
    ret = acl.rt.memcpy(self.input_buffer, input_size,
                       image_data.ctypes.data, input_size,
                       acl.rt.MEMCPY_HOST_TO_DEVICE)
    if ret != 0:
        raise RuntimeError(f"数据拷贝到设备失败,错误码:{ret}")
    
    # 执行推理
    ret = acl.mdl.execute(self.model_id, self.input_data, self.output_data)
    if ret != 0:
        raise RuntimeError(f"执行推理失败,错误码:{ret}")
    
    # 等待推理完成
    ret = acl.rt.synchronize_stream(self.stream)
    if ret != 0:
        raise RuntimeError(f"同步流失败,错误码:{ret}")
    
    # 获取输出数据
    output_size = acl.mdl.get_output_size_by_index(self.model_desc, 0)
    output_host = np.zeros(output_size // 4, dtype=np.float32)
    
    ret = acl.rt.memcpy(output_host.ctypes.data, output_size,
                       self.output_buffer, output_size,
                       acl.rt.MEMCPY_DEVICE_TO_HOST)
    if ret != 0:
        raise RuntimeError(f"数据拷贝到主机失败,错误码:{ret}")
    
    return output_host

def postprocess(self, output_data: np.ndarray, top_k: int = 5) -> List[Tuple[int, float]]:
    """
    后处理:计算softmax并获取top-k结果
    
    Args:
        output_data: 推理输出
        top_k: 返回前k个结果
    
    Returns:
        top-k结果列表,每个元素为(类别ID, 概率)
    """
    # 计算softmax
    exp_output = np.exp(output_data - np.max(output_data))
    probabilities = exp_output / np.sum(exp_output)
    
    # 获取top-k索引
    top_indices = np.argsort(probabilities)[-top_k:][::-1]
    
    # 返回结果
    results = []
    for idx in top_indices:
        results.append((idx, float(probabilities[idx])))
    
    return results

def predict(self, image_path: str, top_k: int = 5) -> List[Tuple[int, float]]:
    """
    完整的预测流程
    
    Args:
        image_path: 图像文件路径
        top_k: 返回前k个结果
    
    Returns:
        top-k预测结果
    """
    # 预处理
    print(f"处理图像:{image_path}")
    start_time = time.time()
    
    try:
        image_data = self.preprocess(image_path)
        preprocess_time = time.time() - start_time
        print(f"预处理时间:{preprocess_time:.4f}秒")
        
        # 推理
        inference_start = time.time()
        output = self.inference(image_data)
        inference_time = time.time() - inference_start
        print(f"推理时间:{inference_time:.4f}秒")
        
        # 后处理
        postprocess_start = time.time()
        results = self.postprocess(output, top_k)
        postprocess_time = time.time() - postprocess_start
        print(f"后处理时间:{postprocess_time:.4f}秒")
        
        # 总时间
        total_time = time.time() - start_time
        print(f"总处理时间:{total_time:.4f}秒")
        
        return results
        
    except Exception as e:
        print(f"预测失败:{str(e)}")
        raise

def batch_predict(self, image_paths: List[str], top_k: int = 5) -> Dict[str, List[Tuple[int, float]]]:
    """
    批量预测
    
    Args:
        image_paths: 图像路径列表
        top_k: 返回前k个结果
    
    Returns:
        预测结果字典
    """
    results = {}
    
    for image_path in image_paths:
        try:
            predictions = self.predict(image_path, top_k)
            results[image_path] = predictions
        except Exception as e:
            print(f"处理图像 {image_path} 失败:{str(e)}")
            results[image_path] = []
    
    return results

def __del__(self):
    """资源释放"""
    print("释放资源...")
    
    # 释放输入缓冲区
    if self.input_buffer:
        acl.rt.free(self.input_buffer)
    
    # 释放输出缓冲区
    if self.output_buffer:
        acl.rt.free(self.output_buffer)
    
    # 销毁数据集
    if self.input_data:
        acl.mdl.destroy_dataset(self.input_data)
    
    if self.output_data:
        acl.mdl.destroy_dataset(self.output_data)
    
    # 卸载模型
    if self.model_id:
        acl.mdl.unload(self.model_id)
    
    # 销毁模型描述
    if self.model_desc:
        acl.mdl.destroy_desc(self.model_desc)
    
    # 销毁流
    if self.stream:
        acl.rt.destroy_stream(self.stream)
    
    # 销毁上下文
    if self.context:
        acl.rt.destroy_context(self.context)
    
    # 重置设备
    acl.rt.reset_device(self.device_id)
    
    # 最终化ACL
    acl.finalize()
    
    print("资源释放完成")

def load_imagenet_labels(label_path: str = “imagenet_classes.txt”) -> List[str]:
“”"
加载ImageNet标签

Args:
    label_path: 标签文件路径

Returns:
    标签列表
"""
if not os.path.exists(label_path):
    print(f"标签文件不存在:{label_path}")
    print("使用默认类别ID")
    return [f"Class_{i}" for i in range(1000)]

with open(label_path, 'r', encoding='utf-8') as f:
    labels = [line.strip() for line in f.readlines()]

return labels

def main():
“”“主函数”“”
# 配置参数
MODEL_PATH = “./resnet50.om” # 离线模型路径
TEST_IMAGE = “./test.jpg” # 测试图像路径
DEVICE_ID = 0 # 设备ID
TOP_K = 5 # 返回前5个结果

# 检查模型文件
if not os.path.exists(MODEL_PATH):
    print(f"错误:模型文件不存在 - {MODEL_PATH}")
    print("请先使用ATC工具转换模型:")
    print("atc --model=resnet50.onnx --framework=5 --output=resnet50 --input_format=NCHW --input_shape='input:1,3,224,224' --device_id=0")
    sys.exit(1)

# 检查测试图像
if not os.path.exists(TEST_IMAGE):
    print(f"错误:测试图像不存在 - {TEST_IMAGE}")
    sys.exit(1)

try:
    # 创建分类器
    print("=" * 60)
    print("创建ACL图像分类器...")
    classifier = ACLImageClassifier(MODEL_PATH, DEVICE_ID)
    print("分类器创建成功")
    print("=" * 60)
    
    # 加载标签
    labels = load_imagenet_labels()
    
    # 单张图片预测
    print("

" + “=” * 60)
print(“开始单张图片预测…”)
results = classifier.predict(TEST_IMAGE, TOP_K)

    # 打印结果
    print("

预测结果:“)
print(”-" * 60)
for i, (class_id, prob) in enumerate(results, 1):
label = labels[class_id] if class_id < len(labels) else f"Unknown_{class_id}"
print(f"第{i}名: {label} (ID:{class_id}), 置信度: {prob:.4f}“)
print(”-" * 60)

    # 批量预测示例
    print("

" + “=” * 60)
print(“开始批量预测…”)
image_list = [TEST_IMAGE] * 3 # 重复测试3次
batch_results = classifier.batch_predict(image_list, TOP_K)

    for img_path, predictions in batch_results.items():
        print(f"

{img_path}:“)
for i, (class_id, prob) in enumerate(predictions[:3], 1): # 只显示前3个
label = labels[class_id] if class_id < len(labels) else f"Unknown_{class_id}”
print(f" Top-{i}: {label}, 置信度: {prob:.4f}")

    print("

" + “=” * 60)
print(“推理完成!”)

except Exception as e:
    print(f"

错误:{str(e)}")
sys.exit(1)

if name == “main”:
main()
结语
通过 hiascend.com,华为为开发者构建了一个开放、透明且持续演进的AI开发生态。无论是学术研究还是工业部署,昇腾平台都提供了从底层硬件到上层应用的完整支撑。尽管学习曲线略陡,但其在能效比、安全可控和国产化替代方面的战略价值日益凸显。
对于希望投身信创与AI融合领域的工程师而言,掌握昇腾开发技能不仅是技术能力的拓展,更是参与国家科技自立自强的重要一步。建议读者结合昇腾社区的官方教程、样例工程和论坛问答,逐步深入实践,共同推动国产AI生态繁荣发展。

2025年昇腾CANN训练营第二季,基于CANN开源开放全场景,推出0基础入门系列、码力全开特辑、开发者案例等专题课程,助力不同阶段开发者快速提升算子开发技能。获得Ascend C算子中级认证,即可领取精美证书,完成社区任务更有机会赢取华为手机,平板、开发板等大奖。
报名链接:https://www.hiascend.com/developer/activities/cann20252
**

Logo

CANN开发者社区旨在汇聚广大开发者,围绕CANN架构重构、算子开发、部署应用优化等核心方向,展开深度交流与思想碰撞,携手共同促进CANN开放生态突破!

更多推荐