используй нормальные карты памяти и не будет ничего повреждаться при сбоях по питанию.
Недавние сообщения нейробизон
-
RE: Управляемы ИБП для Raspberry pi
-
RE: Точное управление GPIO
для точного софтового ШИМа на Raspberry Pi 1-4 можно использовать GPIO либу, в которой тайминги ШИМа, задаются при помощи DMA, например pigpio
-
RE: Подключаем видеокарту к Raspberry Pi 5
жалко NVidia нельзя подключить, чтобы использовать CUDA.
или у каких-нибудь умельцев уже получилось видеокарты NVIdia подулючить к RasPi? -
Распознавание автомобилей с помощью Raspberry Pi AI Kit
Сбор данных
С пешеходного моста мы сняли на айфон несколько клипов с транспортным потоком. При помощи следующего Python скрипт убрали лишние кадры, оставив только каждый 20й кадр
import cv2 import sys videoCapture = cv2.VideoCapture('./data/videos/video.MOV') if not videoCapture.isOpened(): logging.error("Cannot open video file") sys.exit(-1) i = 0 j = 0 while True: success, frame = videoCapture.read() if success: cv2.imshow('Display', frame) i += 1 if i % 20 == 0: j += 1 cv2.imwrite(f'./data/images/img_{j:04d}.jpg', frame) key = cv2.waitKey(1) & 0xFF if key == ord('q'): cv2.destroyAllWindows() break
Для обучения нейросети использовался облачный сервис Edge Impulse Studio, а для упрощения загрузки датасета в этот сервис было установлено консольное приложение Edge Impulse CLI (https://docs.edgeimpulse.com/docs/tools/edge-impulse-cli/cli-installation)
$ edge-impulse-uploader --category split data/images/*.jpg
эта команда загрузит изображения в сервис Edge Impulse Studio (далее я буду называть его просто EIS) и разделит их на два датасете - "Training" и "Testing". Сразу после успешного завершения загрузки датасеты будут видны в EIS в разделе Data Acquisition
Теперь мы можем приступить к разметке датасета, используя инструмент "Прямоугольник" во вкладке "Labeling Queue"
Из всех транспортных средств в потоке мы намеренно отметили только легковые автомобили, чтобы в дальнейшем обучить модель распознавать и идентифицировать их.
Обучение модели
- Перейдите в раздел "Impulse Design", и выберите пункт "Create Impulse".
Так как ИИ-акселератор Hailo может работать с изображениями в большом разрешении без потери FPS, то выбираем оптимизацию изображений до размера 640x480 в блоке "Image Data". - Нажимаем "Add a processing block" и выбираем "Image". На этом этапе выполняется предварительная обработка и нормализация данных изображения, а также предоставляется возможность выбора глубины цвета.
- Нажимаем "Add a learning block" и выбираем "Object Detection (Images)".
- Нажмите кнопку "Save Impulse", чтобы завершить процесс.
В разделе "Image" выберите RGB в качестве формата цвета и нажмите кнопку "Save parameters", чтобы перейти в следующий раздел - "Generate Features"
Теперь мы можем запустить генерацию признаков, нажав кнопку "Generate features"
После окончания генерации данные будут визуализированы на панели "Feature Explorer"Переходим на страницу "Object Detection" и нажимаем кнопку "Choose a different model" и выбираем вариант YOLOv5. В выборе размера модели доступны 4 варианта - выбираем вариант Small с 7.2 миллионами параметров. После этого нажимаем кнопку "Start training", чтобы начать обучение. Процесс обучения займёт несколько минут.
После завершения обучения мы увидим таблицу с оценкой точности и прочими параметрами
Проверка модели
На странице проверки модели нажмите кнопку «Classify All», которая запустит тестирование модели с обученной моделью float32. Точность составила 97,5%
Конвертация модели в формат HEF
Чтобы модель могла выполняться на Raspberry Pi AI kit, сгенерированную модель нужно преобразовать в формат Hailo Executable Format (HEF).
Сначала скачиваем модель в формате ONNX (на скриншоте выделено прямоугольником) из панели управления EIS.Для конвертации нам потребуется компьютер с Linux и процессором x86, на котором мы будем запускать конвертер от разработчиков Hailo.
Чтобы скачать конвертер (он называется Dataflow Compiler), необходимо зарегистрироваться на сайте https://hailo.ai/developer-zone. Ссылка на скачивание Dataflow Compiler находится в разделе загрузок.
Конвертер устанавливается следующим образом:$ python3 -mvenv hailo_dfc && cd hailo_dfc $ source bin/activate $ pip3 install hailo_dataflow_compiler-3.27.0-py3-none-linux_x86_64.whl $ pip3 install opencv-python
Далее нужно убедиться, что всё правильно установилось
$ hailo -h [info] First time Hailo Dataflow Compiler is being used. Checking system requirements... (this might take a few seconds) [info] Current Time: 22:19:56, 07/14/24 [info] System info: OS: Linux, Kernel: 5.15.0-113-generic [info] Hailo DFC Version: 3.27.0
Процесс конвертации состоит нескольких шагов.
1. Калибровка датасета
Скачиваем изображения датасета Training из EIS
Пишем пайтон скрипт, который преобразует скаченный тренировочный датасет в NumPy-формат *npy
import cv2 import os import numpy as np w, h, c = 640, 640, 3 data_dir = './data/training' images_list = os.listdir(data_dir) calib_dataset = np.zeros((len(images_list), w, h, c)) for idx, filename in enumerate(os.listdir(data_dir)): if not filename.endswith('.jpg'): continue filepath = os.path.join(data_dir, filename) img = cv2.imread(filepath) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) x = img.shape[1]/2 - w/2 y = img.shape[0]/2 - h/2 crop_img = img[int(y):int(y+h), int(x):int(x+w)] calib_dataset[idx, :, :, :] = crop_img np.save('calib_dataset.npy', calib_dataset)
2. Преобразование модели
Следующий скрипт преобразует модель ONNX во внутреннее представлении Hailo и генерирует файл архива Hailo (HAR).
from hailo_sdk_client import ClientRunner model_name = 'ei-car-yolov5s' onnx_path = f'{model_name}.onnx' chosen_hw_arch = 'hailo8l' runner = ClientRunner(hw_arch=chosen_hw_arch) hn, npz = runner.translate_onnx_model( onnx_path, model_name, start_node_names=['images'], end_node_names=['/model.24/m.0/Conv', '/model.24/m.2/Conv', '/model.24/m.1/Conv'], net_input_shapes={'images': [1, 3, 640, 640]}) runner.save_har(f'{model_name}.har')
3. Оптимизация модели
Следующий скрипт используется для оптимизации модели, преобразования ее из полной точности в целочисленное представление и генерации квантованного файла Hailo Archive (HAR). Этот скрипт включает нормализацию входных данных модели и подавление немаксимальных значений (NMS) на выходных данных модели.
from hailo_sdk_client import ClientRunner import numpy as np model_name = 'ei-car-yolov5s' alls = [ 'normalization1 = normalization([0.0, 0.0, 0.0], [255.0, 255.0, 255.0])\n', 'nms_postprocess(meta_arch=yolov5, engine=cpu, nms_scores_th=0.2, nms_iou_th=0.4, classes=1)\n', ] har_path = f'{model_name}.har' calib_dataset = np.load('./calib_dataset.npy') runner = ClientRunner(har=har_path) runner.load_model_script(''.join(alls)) runner.optimize(calib_dataset) runner.save_har(f'{model_name}_quantized.har')
4. Компиляция модели
Этот скрит компилирует квантованный архив Hailo (HAR) и генерирует модель в формате HEF (Hailo Executable Format)
from hailo_sdk_client import ClientRunner model_name = 'ei-car-yolov5s' quantized_model_har_path = f'{model_name}_quantized.har' runner = ClientRunner(har=quantized_model_har_path) hef = runner.compile() file_name = f'{model_name}.hef' with open(file_name, 'wb') as f: f.write(hef)
Теперь скопируем полученный HEF файл (в данном случае ei-car-yolov5s.hef) на Raspberry Pi с установленным ИИ-акселератором AI-kit. Можно оценить производительность модели следующими командами, которые показываеют результат 63 FPS, что весьма впечатляюще.
$ hailortcli run ei-car-yolov5s.hef Running streaming inference (ei-car-yolov5s.hef): Transform data: true Type: auto Quantized: true Network ei-car-yolov5s/ei-car-yolov5s: 100% | 317 | FPS: 63.33 | ETA: 00:00:00 > Inference result: Network group: ei-carhef-yolov5s Frames count: 317 FPS: 63.33 Send Rate: 622.54 Mbit/s Recv Rate: 77.33 Mbit/s
Испытания
Подключите и настройте AI kit согласно инструкции
Настроим виртуальную среду окружения Python используя скрипт из базы примеров Hailo для raspberry pi$ git clone https://github.com/hailo-ai/hailo-rpi5-examples.git $ cd hailo-rpi5-examples $ source setup_env.sh $./compile_postprocess.sh $ pip3 install supervision
В папке hailo-rpi5-example/resources создаём конфигурационный файл yolov5.json следующего содержания:
{ "iou_threshold": 0.45, "detection_threshold": 0.7, "output_activation": "none", "label_offset":1, "max_boxes":200, "anchors": [ [ 116, 90, 156, 198, 373, 326 ], [ 30, 61, 62, 45, 59, 119 ], [ 10, 13, 16, 30, 33, 23 ] ], "labels": [ "unlabeled", "car" ] }
В эту же папку копируем ранее созданную HEF модель.
В качестве отправной точки будем использовать пример detection.py из hailo-rpi5-example, немного его модифицировав - меняем пайплайн GStreamer таким образом, чтобы включить элемент HailoTracker, чтобы после того как машина обнаружена прямоугольник с неё не убирался пока она не исчезнет из кадра.
Для подсчёта машин используется библиотека Roboflow Supervision.
Модифицированный detection.py выглядит так:import gi gi.require_version('Gst', '1.0') from gi.repository import Gst, GLib import os import argparse import multiprocessing import numpy as np import setproctitle import cv2 import time import hailo import supervision as sv from hailo_rpi_common import ( get_default_parser, QUEUE, get_caps_from_pad, get_numpy_from_buffer, GStreamerApp, app_callback_class, ) # ----------------------------------------------------------------------------------------------- # User-defined class to be used in the callback function # ----------------------------------------------------------------------------------------------- # Inheritance from the app_callback_class class user_app_callback_class(app_callback_class): def __init__(self): super().__init__() self.new_variable = 42 # New variable example def new_function(self): # New function example return "The meaning of life is: " # ----------------------------------------------------------------------------------------------- # User-defined callback function # ----------------------------------------------------------------------------------------------- # This is the callback function that will be called when data is available from the pipeline def app_callback(pad, info, user_data): # Get the GstBuffer from the probe info buffer = info.get_buffer() # Check if the buffer is valid if buffer is None: return Gst.PadProbeReturn.OK # Get the detections from the buffer roi = hailo.get_roi_from_buffer(buffer) hailo_detections = roi.get_objects_typed(hailo.HAILO_DETECTION) n = len(hailo_detections) # Get the caps from the pad _, w, h = get_caps_from_pad(pad) boxes = np.zeros((n, 4)) confidence = np.zeros(n) class_id = np.zeros(n) tracker_id = np.empty(n) for i, detection in enumerate(hailo_detections): class_id[i] = detection.get_class_id() confidence[i] = detection.get_confidence() tracker_id[i] = detection.get_objects_typed(hailo.HAILO_UNIQUE_ID)[0].get_id() bbox = detection.get_bbox() boxes[i] = [bbox.xmin() * w, bbox.ymin() * h, bbox.xmax() * w, bbox.ymax() * h] detections = sv.Detections( xyxy=boxes, confidence=confidence, class_id=class_id, tracker_id=tracker_id) #print(tracker_id, confidence, boxes) line_zone.trigger(detections) textoverlay = app.pipeline.get_by_name("hailo_text") textoverlay.set_property('text', f'OUT: {line_zone.in_count} | IN: {line_zone.out_count}') textoverlay.set_property('font-desc', 'Sans 36') return Gst.PadProbeReturn.OK # ----------------------------------------------------------------------------------------------- # User Gstreamer Application # ----------------------------------------------------------------------------------------------- # This class inherits from the hailo_rpi_common.GStreamerApp class class GStreamerDetectionApp(GStreamerApp): def __init__(self, args, user_data): # Call the parent class constructor super().__init__(args, user_data) # Additional initialization code can be added here # Set Hailo parameters these parameters should be set based on the model used self.batch_size = 1 self.network_width = 640 self.network_height = 640 self.network_format = "RGB" nms_score_threshold = 0.3 nms_iou_threshold = 0.45 # Temporary code: new postprocess will be merged to TAPPAS. # Check if new postprocess so file exists new_postprocess_path = os.path.join(self.current_path, '/home/naveen/hailo-rpi5-examples/resources/libyolo_hailortpp_post.so') if os.path.exists(new_postprocess_path): self.default_postprocess_so = new_postprocess_path else: self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so') if args.hef_path is not None: self.hef_path = args.hef_path # Set the HEF file path based on the network elif args.network == "yolov6n": self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef') elif args.network == "yolov8s": self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef') elif args.network == "yolox_s_leaky": self.hef_path = os.path.join(self.current_path, '../resources/yolox_s_leaky_h8l_mz.hef') else: assert False, "Invalid network type" # User-defined label JSON file if args.labels_json is not None: self.labels_config = f' config-path={args.labels_json} ' # Temporary code if not os.path.exists(new_postprocess_path): print("New postprocess so file is missing. It is required to support custom labels. Check documentation for more information.") exit(1) else: self.labels_config = '' self.app_callback = app_callback self.thresholds_str = ( f"nms-score-threshold={nms_score_threshold} " f"nms-iou-threshold={nms_iou_threshold} " f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32" ) # Set the process title setproctitle.setproctitle("Hailo Detection App") self.create_pipeline() def get_pipeline_string(self): if self.source_type == "rpi": source_element = ( #"libcamerasrc name=src_0 auto-focus-mode=AfModeManual ! " "libcamerasrc name=src_0 auto-focus-mode=2 ! " f"video/x-raw, format={self.network_format}, width=1536, height=864 ! " + QUEUE("queue_src_scale") + "videoscale ! " f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=60/1 ! " #f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height} ! " ) elif self.source_type == "usb": source_element = ( f"v4l2src device={self.video_source} name=src_0 ! " "video/x-raw, width=640, height=480, framerate=30/1 ! " ) else: source_element = ( f"filesrc location={self.video_source} name=src_0 ! " + QUEUE("queue_dec264") + " qtdemux ! h264parse ! avdec_h264 max-threads=2 ! " " video/x-raw, format=I420 ! " ) source_element += QUEUE("queue_scale") source_element += "videoscale n-threads=2 ! " source_element += QUEUE("queue_src_convert") source_element += "videoconvert n-threads=3 name=src_convert qos=false ! " source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! " pipeline_string = ( "hailomuxer name=hmux " + source_element + "tee name=t ! " + QUEUE("bypass_queue", max_size_buffers=20) + "hmux.sink_0 " + "t. ! " + QUEUE("queue_hailonet") + "videoconvert n-threads=3 ! " f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! " + QUEUE("queue_hailofilter") + f"hailofilter so-path={self.default_postprocess_so} {self.labels_config} qos=false ! " + QUEUE("queue_hailotracker") + "hailotracker keep-tracked-frames=3 keep-new-frames=3 keep-lost-frames=3 ! " + QUEUE("queue_hmuc") + "hmux.sink_1 " + "hmux. ! " + QUEUE("queue_hailo_python") + QUEUE("queue_user_callback") + "identity name=identity_callback ! " + QUEUE("queue_hailooverlay") + "hailooverlay ! " + QUEUE("queue_videoconvert") + "videoconvert n-threads=3 qos=false ! " + QUEUE("queue_textoverlay") + "textoverlay name=hailo_text text='' valignment=top halignment=center ! " + QUEUE("queue_hailo_display") + f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true " ) print(pipeline_string) return pipeline_string if __name__ == "__main__": # Create an instance of the user app callback class user_data = user_app_callback_class() START = sv.Point(0, 340) END = sv.Point(640, 340) line_zone = sv.LineZone(start=START, end=END, triggering_anchors=(sv.Position.BOTTOM_LEFT, sv.Position.BOTTOM_RIGHT)) parser = get_default_parser() # Add additional arguments here parser.add_argument( "--network", default="yolov6n", choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'], help="Which Network to use, default is yolov6n", ) parser.add_argument( "--hef-path", default=None, help="Path to HEF file", ) parser.add_argument( "--labels-json", default=None, help="Path to costume labels JSON file", ) args = parser.parse_args() app = GStreamerDetectionApp(args, user_data) app.run()
Применение на записанных видеофайлах
python3 detection.py --hef-path ../resources/ei-car-yolov5s.hef --input rpi --labels-json ../resources/yolov5.json --show-fps
Применение на потоке с камеры
$ python3 detection.py --hef-path ../resources/ei-car-yolov5s.hef --input rpi --labels-json ../resources/yolov5.json --show-fps
- Перейдите в раздел "Impulse Design", и выберите пункт "Create Impulse".
-
Конвертация моделей из формата ONNX в Hailo HEF
Есть потребность конвертнуть модель из ONNX в HEF
Кто пользовался Hailo Data Flow Compiler, нормально он конвертирует, какие подводные камни?