Skip to content
  • Категории
  • Главная
  • Документация
  • Последние
  • Пользователи
Collapse
  1. Главная
  2. Аппаратные модули
  3. ИИ ускорители - Hailo, IMX500
  4. Распознавание автомобилей с помощью Raspberry Pi AI Kit

Распознавание автомобилей с помощью Raspberry Pi AI Kit

Запланировано Прикреплена Закрыта Перенесена ИИ ускорители - Hailo, IMX500
1 Сообщения 1 Posters 190 Просмотры
  • Сначала старые
  • Сначала новые
  • По количеству голосов
Авторизуйтесь, чтобы ответить
Эта тема была удалена. Только пользователи с правом управления темами могут её видеть.
  • нейробизонН Не в сети
    нейробизонН Не в сети
    нейробизон
    написал в отредактировано нейробизон
    #1

    Сбор данных

    С пешеходного моста мы сняли на айфон несколько клипов с транспортным потоком. При помощи следующего Python скрипт убрали лишние кадры, оставив только каждый 20й кадр

    import cv2
    import sys
    
    videoCapture = cv2.VideoCapture('./data/videos/video.MOV')
        
    if not videoCapture.isOpened():
        logging.error("Cannot open video file")
        sys.exit(-1)
       
    i = 0
    j = 0
    while True:
        success, frame = videoCapture.read()
        if success:
            cv2.imshow('Display', frame)
            i += 1
            if i % 20 == 0:
                j += 1
                cv2.imwrite(f'./data/images/img_{j:04d}.jpg', frame)
        
            key = cv2.waitKey(1) & 0xFF
            if key == ord('q'):
                cv2.destroyAllWindows()
                break
    

    Для обучения нейросети использовался облачный сервис Edge Impulse Studio, а для упрощения загрузки датасета в этот сервис было установлено консольное приложение Edge Impulse CLI (https://docs.edgeimpulse.com/docs/tools/edge-impulse-cli/cli-installation)

    $ edge-impulse-uploader --category split data/images/*.jpg
    

    эта команда загрузит изображения в сервис Edge Impulse Studio (далее я буду называть его просто EIS) и разделит их на два датасете - "Training" и "Testing". Сразу после успешного завершения загрузки датасеты будут видны в EIS в разделе Data Acquisition

    4d309c25-75ba-4fd3-bfc1-399c71685b30-image.png

    Теперь мы можем приступить к разметке датасета, используя инструмент "Прямоугольник" во вкладке "Labeling Queue"

    labelling.gif

    Из всех транспортных средств в потоке мы намеренно отметили только легковые автомобили, чтобы в дальнейшем обучить модель распознавать и идентифицировать их.

    Обучение модели

    • Перейдите в раздел "Impulse Design", и выберите пункт "Create Impulse".
      Так как ИИ-акселератор Hailo может работать с изображениями в большом разрешении без потери FPS, то выбираем оптимизацию изображений до размера 640x480 в блоке "Image Data".
    • Нажимаем "Add a processing block" и выбираем "Image". На этом этапе выполняется предварительная обработка и нормализация данных изображения, а также предоставляется возможность выбора глубины цвета.
    • Нажимаем "Add a learning block" и выбираем "Object Detection (Images)".
    • Нажмите кнопку "Save Impulse", чтобы завершить процесс.

    0798c373-2764-42a3-b190-0faa0ec76241-image.png

    В разделе "Image" выберите RGB в качестве формата цвета и нажмите кнопку "Save parameters", чтобы перейти в следующий раздел - "Generate Features"
    1956b467-3c32-4cc4-acd2-681aba289636-image.png

    Теперь мы можем запустить генерацию признаков, нажав кнопку "Generate features"
    После окончания генерации данные будут визуализированы на панели "Feature Explorer"

    5fd7e873-2aaa-4a44-870e-970915306396-image.png

    Переходим на страницу "Object Detection" и нажимаем кнопку "Choose a different model" и выбираем вариант YOLOv5. В выборе размера модели доступны 4 варианта - выбираем вариант Small с 7.2 миллионами параметров. После этого нажимаем кнопку "Start training", чтобы начать обучение. Процесс обучения займёт несколько минут.

    7e787070-2558-4d29-ab1f-d825ff95f174-image.png

    После завершения обучения мы увидим таблицу с оценкой точности и прочими параметрами

    71ee2219-c80d-4021-b2c5-ca9e003077bd-image.png

    8942ab42-e34d-4296-9ef5-72f9a732f1e6-image.png

    Проверка модели

    На странице проверки модели нажмите кнопку «Classify All», которая запустит тестирование модели с обученной моделью float32. Точность составила 97,5%

    e5764d10-bd92-434e-90d0-8b640be5bf0a-image.png

    Конвертация модели в формат HEF

    Чтобы модель могла выполняться на Raspberry Pi AI kit, сгенерированную модель нужно преобразовать в формат Hailo Executable Format (HEF).
    Сначала скачиваем модель в формате ONNX (на скриншоте выделено прямоугольником) из панели управления EIS.

    e71b2a6d-8d5d-4d57-91e1-b491d2da015c-image.png

    Для конвертации нам потребуется компьютер с Linux и процессором x86, на котором мы будем запускать конвертер от разработчиков Hailo.
    Чтобы скачать конвертер (он называется Dataflow Compiler), необходимо зарегистрироваться на сайте https://hailo.ai/developer-zone. Ссылка на скачивание Dataflow Compiler находится в разделе загрузок.
    Screenshot from 2024-11-30 00-01-58.png
    Конвертер устанавливается следующим образом:

    $ python3 -mvenv hailo_dfc && cd hailo_dfc
    $ source bin/activate
    $ pip3 install hailo_dataflow_compiler-3.27.0-py3-none-linux_x86_64.whl
    $ pip3 install opencv-python
    

    Далее нужно убедиться, что всё правильно установилось

    $ hailo -h
    [info] First time Hailo Dataflow Compiler is being used. Checking system requirements... (this might take a few seconds)
    [info] Current Time: 22:19:56, 07/14/24
    [info] System info: OS: Linux, Kernel: 5.15.0-113-generic
    [info] Hailo DFC Version: 3.27.0
    

    Процесс конвертации состоит нескольких шагов.

    1. Калибровка датасета

    Скачиваем изображения датасета Training из EIS
    Screenshot from 2024-11-30 00-14-36.png

    Пишем пайтон скрипт, который преобразует скаченный тренировочный датасет в NumPy-формат *npy

    import cv2
    import os
    import numpy as np
    
    w, h, c = 640, 640, 3
    
    data_dir = './data/training'
    
    images_list = os.listdir(data_dir)
    
    calib_dataset = np.zeros((len(images_list), w, h, c))
    
    for idx, filename in enumerate(os.listdir(data_dir)):
        if not filename.endswith('.jpg'):
            continue
        filepath = os.path.join(data_dir, filename)
        img = cv2.imread(filepath)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        x = img.shape[1]/2 - w/2
        y = img.shape[0]/2 - h/2
        crop_img = img[int(y):int(y+h), int(x):int(x+w)]
        calib_dataset[idx, :, :, :] = crop_img
    
    np.save('calib_dataset.npy', calib_dataset)   
    

    2. Преобразование модели

    Следующий скрипт преобразует модель ONNX во внутреннее представлении Hailo и генерирует файл архива Hailo (HAR).

    from hailo_sdk_client import ClientRunner
    
    model_name = 'ei-car-yolov5s'
    onnx_path = f'{model_name}.onnx'
    chosen_hw_arch = 'hailo8l'
    
    runner = ClientRunner(hw_arch=chosen_hw_arch)
    hn, npz = runner.translate_onnx_model(
        onnx_path,
        model_name,
        start_node_names=['images'],
        end_node_names=['/model.24/m.0/Conv', '/model.24/m.2/Conv', '/model.24/m.1/Conv'],
        net_input_shapes={'images': [1, 3, 640, 640]})
    
    runner.save_har(f'{model_name}.har')
    

    3. Оптимизация модели

    Следующий скрипт используется для оптимизации модели, преобразования ее из полной точности в целочисленное представление и генерации квантованного файла Hailo Archive (HAR). Этот скрипт включает нормализацию входных данных модели и подавление немаксимальных значений (NMS) на выходных данных модели.

    from hailo_sdk_client import ClientRunner
    import numpy as np
    
    model_name = 'ei-car-yolov5s'
    alls = [
      'normalization1 = normalization([0.0, 0.0, 0.0], [255.0, 255.0, 255.0])\n',
      'nms_postprocess(meta_arch=yolov5, engine=cpu, nms_scores_th=0.2, nms_iou_th=0.4, classes=1)\n',
    ]
    har_path = f'{model_name}.har'
    calib_dataset = np.load('./calib_dataset.npy')
    
    runner = ClientRunner(har=har_path)
    runner.load_model_script(''.join(alls))
    runner.optimize(calib_dataset)
    
    runner.save_har(f'{model_name}_quantized.har')
    

    4. Компиляция модели

    Этот скрит компилирует квантованный архив Hailo (HAR) и генерирует модель в формате HEF (Hailo Executable Format)

    from hailo_sdk_client import ClientRunner
    
    model_name = 'ei-car-yolov5s'
    quantized_model_har_path = f'{model_name}_quantized.har'
    runner = ClientRunner(har=quantized_model_har_path)
    
    hef = runner.compile()
    
    file_name = f'{model_name}.hef'
    with open(file_name, 'wb') as f:
        f.write(hef)  
    

    Теперь скопируем полученный HEF файл (в данном случае ei-car-yolov5s.hef) на Raspberry Pi с установленным ИИ-акселератором AI-kit. Можно оценить производительность модели следующими командами, которые показываеют результат 63 FPS, что весьма впечатляюще.

    $ hailortcli run ei-car-yolov5s.hef 
    
    Running streaming inference (ei-car-yolov5s.hef):
      Transform data: true
        Type:      auto
        Quantized: true
    Network ei-car-yolov5s/ei-car-yolov5s: 100% | 317 | FPS: 63.33 | ETA: 00:00:00
    > Inference result:
     Network group: ei-carhef-yolov5s
        Frames count: 317
        FPS: 63.33
        Send Rate: 622.54 Mbit/s
        Recv Rate: 77.33 Mbit/s
    

    Испытания

    Подключите и настройте AI kit согласно инструкции
    Настроим виртуальную среду окружения Python используя скрипт из базы примеров Hailo для raspberry pi

    $ git clone https://github.com/hailo-ai/hailo-rpi5-examples.git
    $ cd hailo-rpi5-examples
    $ source setup_env.sh 
    $./compile_postprocess.sh 
    $ pip3 install supervision 
    

    В папке hailo-rpi5-example/resources создаём конфигурационный файл yolov5.json следующего содержания:

    {
      "iou_threshold": 0.45,
      "detection_threshold": 0.7,
      "output_activation": "none",
      "label_offset":1,
      "max_boxes":200,
      "anchors": [
        [ 116, 90, 156, 198, 373, 326 ],
        [ 30, 61, 62, 45, 59, 119 ],
        [ 10, 13, 16, 30, 33, 23 ]
      ],
      "labels": [
        "unlabeled",
        "car"
      ]
    }
    

    В эту же папку копируем ранее созданную HEF модель.
    В качестве отправной точки будем использовать пример detection.py из hailo-rpi5-example, немного его модифицировав - меняем пайплайн GStreamer таким образом, чтобы включить элемент HailoTracker, чтобы после того как машина обнаружена прямоугольник с неё не убирался пока она не исчезнет из кадра.
    Для подсчёта машин используется библиотека Roboflow Supervision.
    Модифицированный detection.py выглядит так:

    import gi
    gi.require_version('Gst', '1.0')
    from gi.repository import Gst, GLib
    import os
    import argparse
    import multiprocessing
    import numpy as np
    import setproctitle
    import cv2
    import time
    import hailo
    import supervision as sv
    from hailo_rpi_common import (
        get_default_parser,
        QUEUE,
        get_caps_from_pad,
        get_numpy_from_buffer,
        GStreamerApp,
        app_callback_class,
    )
    
    # -----------------------------------------------------------------------------------------------
    # User-defined class to be used in the callback function
    # -----------------------------------------------------------------------------------------------
    # Inheritance from the app_callback_class
    class user_app_callback_class(app_callback_class):
        def __init__(self):
            super().__init__()
            self.new_variable = 42  # New variable example
        
        def new_function(self):  # New function example
            return "The meaning of life is: "
    
    # -----------------------------------------------------------------------------------------------
    # User-defined callback function
    # -----------------------------------------------------------------------------------------------
    
    # This is the callback function that will be called when data is available from the pipeline
    def app_callback(pad, info, user_data):
        # Get the GstBuffer from the probe info
        buffer = info.get_buffer()
        # Check if the buffer is valid
        if buffer is None:
            return Gst.PadProbeReturn.OK
    
        # Get the detections from the buffer
        roi = hailo.get_roi_from_buffer(buffer)
        hailo_detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
        n = len(hailo_detections)
    
        # Get the caps from the pad
        _, w, h = get_caps_from_pad(pad)
    
    
        boxes = np.zeros((n, 4))
        confidence = np.zeros(n)
        class_id = np.zeros(n)
        tracker_id = np.empty(n)
    
        for i, detection in enumerate(hailo_detections):
            class_id[i] = detection.get_class_id()
            confidence[i] = detection.get_confidence()
            tracker_id[i] = detection.get_objects_typed(hailo.HAILO_UNIQUE_ID)[0].get_id()
            bbox = detection.get_bbox()
            boxes[i] = [bbox.xmin() * w, bbox.ymin() * h, bbox.xmax() * w, bbox.ymax() * h]
        
        detections = sv.Detections(
                xyxy=boxes, 
                confidence=confidence, 
                class_id=class_id,
                tracker_id=tracker_id)
    
        #print(tracker_id, confidence, boxes)
        line_zone.trigger(detections)
        textoverlay  = app.pipeline.get_by_name("hailo_text")
        textoverlay.set_property('text', f'OUT: {line_zone.in_count}     |      IN: {line_zone.out_count}')
        textoverlay.set_property('font-desc', 'Sans 36')
    
        return Gst.PadProbeReturn.OK
        
    
    # -----------------------------------------------------------------------------------------------
    # User Gstreamer Application
    # -----------------------------------------------------------------------------------------------
    
    # This class inherits from the hailo_rpi_common.GStreamerApp class
    class GStreamerDetectionApp(GStreamerApp):
        def __init__(self, args, user_data):
            # Call the parent class constructor
            super().__init__(args, user_data)
            # Additional initialization code can be added here
            # Set Hailo parameters these parameters should be set based on the model used
            self.batch_size = 1
            self.network_width = 640
            self.network_height = 640
            self.network_format = "RGB"
            nms_score_threshold = 0.3 
            nms_iou_threshold = 0.45
            
            # Temporary code: new postprocess will be merged to TAPPAS.
            # Check if new postprocess so file exists
            new_postprocess_path = os.path.join(self.current_path, '/home/naveen/hailo-rpi5-examples/resources/libyolo_hailortpp_post.so')
            if os.path.exists(new_postprocess_path):
                self.default_postprocess_so = new_postprocess_path
            else:
                self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')
    
            if args.hef_path is not None:
                self.hef_path = args.hef_path
            # Set the HEF file path based on the network
            elif args.network == "yolov6n":
                self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef')
            elif args.network == "yolov8s":
                self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef')
            elif args.network == "yolox_s_leaky":
                self.hef_path = os.path.join(self.current_path, '../resources/yolox_s_leaky_h8l_mz.hef')
            else:
                assert False, "Invalid network type"
    
            # User-defined label JSON file
            if args.labels_json is not None:
                self.labels_config = f' config-path={args.labels_json} '
                # Temporary code
                if not os.path.exists(new_postprocess_path):
                    print("New postprocess so file is missing. It is required to support custom labels. Check documentation for more information.")
                    exit(1)
            else:
                self.labels_config = ''
    
            self.app_callback = app_callback
        
            self.thresholds_str = (
                f"nms-score-threshold={nms_score_threshold} "
                f"nms-iou-threshold={nms_iou_threshold} "
                f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
            )
    
            # Set the process title
            setproctitle.setproctitle("Hailo Detection App")
    
            self.create_pipeline()
    
        def get_pipeline_string(self):
            if self.source_type == "rpi":
                source_element = (
                    #"libcamerasrc name=src_0 auto-focus-mode=AfModeManual ! "
                    "libcamerasrc name=src_0 auto-focus-mode=2 ! "
                    f"video/x-raw, format={self.network_format}, width=1536, height=864  ! "
                    + QUEUE("queue_src_scale")
                    + "videoscale ! "
                    f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=60/1 ! "
                    #f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height} ! "
                )
            elif self.source_type == "usb":
                source_element = (
                    f"v4l2src device={self.video_source} name=src_0 ! "
                    "video/x-raw, width=640, height=480, framerate=30/1 ! "
                )
            else:
                source_element = (
                    f"filesrc location={self.video_source} name=src_0 ! "
                    + QUEUE("queue_dec264")
                    + " qtdemux ! h264parse ! avdec_h264 max-threads=2 ! "
                    " video/x-raw, format=I420 ! "
                )
            source_element += QUEUE("queue_scale")
            source_element += "videoscale n-threads=2 ! "
            source_element += QUEUE("queue_src_convert")
            source_element += "videoconvert n-threads=3 name=src_convert qos=false ! "
            source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! "
    
            pipeline_string = (
                "hailomuxer name=hmux "
                + source_element
                + "tee name=t ! "
                + QUEUE("bypass_queue", max_size_buffers=20)
                + "hmux.sink_0 "
                + "t. ! "
                + QUEUE("queue_hailonet")
                + "videoconvert n-threads=3 ! "
                f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! "
                + QUEUE("queue_hailofilter")
                + f"hailofilter so-path={self.default_postprocess_so} {self.labels_config} qos=false ! "
                + QUEUE("queue_hailotracker")
                + "hailotracker keep-tracked-frames=3 keep-new-frames=3 keep-lost-frames=3 ! "
                + QUEUE("queue_hmuc")
                + "hmux.sink_1 "
                + "hmux. ! "
                + QUEUE("queue_hailo_python")
                + QUEUE("queue_user_callback")
                + "identity name=identity_callback ! "
                + QUEUE("queue_hailooverlay")
                + "hailooverlay ! "
                + QUEUE("queue_videoconvert")
                + "videoconvert n-threads=3 qos=false ! "
                + QUEUE("queue_textoverlay")
                + "textoverlay name=hailo_text text='' valignment=top halignment=center ! "
                + QUEUE("queue_hailo_display")
                + f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "
            )
            print(pipeline_string)
            return pipeline_string
    
    if __name__ == "__main__":
        # Create an instance of the user app callback class
        user_data = user_app_callback_class()
    
        START = sv.Point(0, 340)
        END = sv.Point(640, 340)
    
        line_zone = sv.LineZone(start=START, end=END, triggering_anchors=(sv.Position.BOTTOM_LEFT, sv.Position.BOTTOM_RIGHT))
    
        parser = get_default_parser()
        # Add additional arguments here
        parser.add_argument(
            "--network",
            default="yolov6n",
            choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'],
            help="Which Network to use, default is yolov6n",
        )
        parser.add_argument(
            "--hef-path",
            default=None,
            help="Path to HEF file",
        )
        parser.add_argument(
            "--labels-json",
            default=None,
            help="Path to costume labels JSON file",
        )
        args = parser.parse_args()
        app = GStreamerDetectionApp(args, user_data)
        app.run()
    

    Применение на записанных видеофайлах

    python3 detection.py --hef-path ../resources/ei-car-yolov5s.hef --input rpi --labels-json ../resources/yolov5.json --show-fps
    

    Применение на потоке с камеры

    $ python3 detection.py --hef-path ../resources/ei-car-yolov5s.hef --input rpi --labels-json ../resources/yolov5.json --show-fps
    
    1 ответ Последний ответ
    0
    • нейробизонН нейробизон удалил эту тему в
    • нейробизонН нейробизон восстановил эту тему в

    • Войти

    • Нет учётной записи? Зарегистрироваться

    • Login or register to search.
    • Первое сообщение
      Последнее сообщение
    0
    • Категории
    • Главная
    • Документация
    • Последние
    • Пользователи