Сбор данных
С пешеходного моста мы сняли на айфон несколько клипов с транспортным потоком. При помощи следующего Python скрипт убрали лишние кадры, оставив только каждый 20й кадр
import cv2
import sys
videoCapture = cv2.VideoCapture('./data/videos/video.MOV')
if not videoCapture.isOpened():
logging.error("Cannot open video file")
sys.exit(-1)
i = 0
j = 0
while True:
success, frame = videoCapture.read()
if success:
cv2.imshow('Display', frame)
i += 1
if i % 20 == 0:
j += 1
cv2.imwrite(f'./data/images/img_{j:04d}.jpg', frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
cv2.destroyAllWindows()
break
Для обучения нейросети использовался облачный сервис Edge Impulse Studio, а для упрощения загрузки датасета в этот сервис было установлено консольное приложение Edge Impulse CLI (https://docs.edgeimpulse.com/docs/tools/edge-impulse-cli/cli-installation)
$ edge-impulse-uploader --category split data/images/*.jpg
эта команда загрузит изображения в сервис Edge Impulse Studio (далее я буду называть его просто EIS) и разделит их на два датасете - "Training" и "Testing". Сразу после успешного завершения загрузки датасеты будут видны в EIS в разделе Data Acquisition
4d309c25-75ba-4fd3-bfc1-399c71685b30-image.png
Теперь мы можем приступить к разметке датасета, используя инструмент "Прямоугольник" во вкладке "Labeling Queue"
labelling.gif
Из всех транспортных средств в потоке мы намеренно отметили только легковые автомобили, чтобы в дальнейшем обучить модель распознавать и идентифицировать их.
Обучение модели
Перейдите в раздел "Impulse Design", и выберите пункт "Create Impulse".
Так как ИИ-акселератор Hailo может работать с изображениями в большом разрешении без потери FPS, то выбираем оптимизацию изображений до размера 640x480 в блоке "Image Data".
Нажимаем "Add a processing block" и выбираем "Image". На этом этапе выполняется предварительная обработка и нормализация данных изображения, а также предоставляется возможность выбора глубины цвета.
Нажимаем "Add a learning block" и выбираем "Object Detection (Images)".
Нажмите кнопку "Save Impulse", чтобы завершить процесс.
0798c373-2764-42a3-b190-0faa0ec76241-image.png
В разделе "Image" выберите RGB в качестве формата цвета и нажмите кнопку "Save parameters", чтобы перейти в следующий раздел - "Generate Features"
1956b467-3c32-4cc4-acd2-681aba289636-image.png
Теперь мы можем запустить генерацию признаков, нажав кнопку "Generate features"
После окончания генерации данные будут визуализированы на панели "Feature Explorer"
5fd7e873-2aaa-4a44-870e-970915306396-image.png
Переходим на страницу "Object Detection" и нажимаем кнопку "Choose a different model" и выбираем вариант YOLOv5. В выборе размера модели доступны 4 варианта - выбираем вариант Small с 7.2 миллионами параметров. После этого нажимаем кнопку "Start training", чтобы начать обучение. Процесс обучения займёт несколько минут.
7e787070-2558-4d29-ab1f-d825ff95f174-image.png
После завершения обучения мы увидим таблицу с оценкой точности и прочими параметрами
71ee2219-c80d-4021-b2c5-ca9e003077bd-image.png
8942ab42-e34d-4296-9ef5-72f9a732f1e6-image.png
Проверка модели
На странице проверки модели нажмите кнопку «Classify All», которая запустит тестирование модели с обученной моделью float32. Точность составила 97,5%
e5764d10-bd92-434e-90d0-8b640be5bf0a-image.png
Конвертация модели в формат HEF
Чтобы модель могла выполняться на Raspberry Pi AI kit, сгенерированную модель нужно преобразовать в формат Hailo Executable Format (HEF).
Сначала скачиваем модель в формате ONNX (на скриншоте выделено прямоугольником) из панели управления EIS.
e71b2a6d-8d5d-4d57-91e1-b491d2da015c-image.png
Для конвертации нам потребуется компьютер с Linux и процессором x86, на котором мы будем запускать конвертер от разработчиков Hailo.
Чтобы скачать конвертер (он называется Dataflow Compiler), необходимо зарегистрироваться на сайте https://hailo.ai/developer-zone. Ссылка на скачивание Dataflow Compiler находится в разделе загрузок.
Screenshot from 2024-11-30 00-01-58.png
Конвертер устанавливается следующим образом:
$ python3 -mvenv hailo_dfc && cd hailo_dfc
$ source bin/activate
$ pip3 install hailo_dataflow_compiler-3.27.0-py3-none-linux_x86_64.whl
$ pip3 install opencv-python
Далее нужно убедиться, что всё правильно установилось
$ hailo -h
[info] First time Hailo Dataflow Compiler is being used. Checking system requirements... (this might take a few seconds)
[info] Current Time: 22:19:56, 07/14/24
[info] System info: OS: Linux, Kernel: 5.15.0-113-generic
[info] Hailo DFC Version: 3.27.0
Процесс конвертации состоит нескольких шагов.
1. Калибровка датасета
Скачиваем изображения датасета Training из EIS
Screenshot from 2024-11-30 00-14-36.png
Пишем пайтон скрипт, который преобразует скаченный тренировочный датасет в NumPy-формат *npy
import cv2
import os
import numpy as np
w, h, c = 640, 640, 3
data_dir = './data/training'
images_list = os.listdir(data_dir)
calib_dataset = np.zeros((len(images_list), w, h, c))
for idx, filename in enumerate(os.listdir(data_dir)):
if not filename.endswith('.jpg'):
continue
filepath = os.path.join(data_dir, filename)
img = cv2.imread(filepath)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
x = img.shape[1]/2 - w/2
y = img.shape[0]/2 - h/2
crop_img = img[int(y):int(y+h), int(x):int(x+w)]
calib_dataset[idx, :, :, :] = crop_img
np.save('calib_dataset.npy', calib_dataset)
2. Преобразование модели
Следующий скрипт преобразует модель ONNX во внутреннее представлении Hailo и генерирует файл архива Hailo (HAR).
from hailo_sdk_client import ClientRunner
model_name = 'ei-car-yolov5s'
onnx_path = f'{model_name}.onnx'
chosen_hw_arch = 'hailo8l'
runner = ClientRunner(hw_arch=chosen_hw_arch)
hn, npz = runner.translate_onnx_model(
onnx_path,
model_name,
start_node_names=['images'],
end_node_names=['/model.24/m.0/Conv', '/model.24/m.2/Conv', '/model.24/m.1/Conv'],
net_input_shapes={'images': [1, 3, 640, 640]})
runner.save_har(f'{model_name}.har')
3. Оптимизация модели
Следующий скрипт используется для оптимизации модели, преобразования ее из полной точности в целочисленное представление и генерации квантованного файла Hailo Archive (HAR). Этот скрипт включает нормализацию входных данных модели и подавление немаксимальных значений (NMS) на выходных данных модели.
from hailo_sdk_client import ClientRunner
import numpy as np
model_name = 'ei-car-yolov5s'
alls = [
'normalization1 = normalization([0.0, 0.0, 0.0], [255.0, 255.0, 255.0])\n',
'nms_postprocess(meta_arch=yolov5, engine=cpu, nms_scores_th=0.2, nms_iou_th=0.4, classes=1)\n',
]
har_path = f'{model_name}.har'
calib_dataset = np.load('./calib_dataset.npy')
runner = ClientRunner(har=har_path)
runner.load_model_script(''.join(alls))
runner.optimize(calib_dataset)
runner.save_har(f'{model_name}_quantized.har')
4. Компиляция модели
Этот скрит компилирует квантованный архив Hailo (HAR) и генерирует модель в формате HEF (Hailo Executable Format)
from hailo_sdk_client import ClientRunner
model_name = 'ei-car-yolov5s'
quantized_model_har_path = f'{model_name}_quantized.har'
runner = ClientRunner(har=quantized_model_har_path)
hef = runner.compile()
file_name = f'{model_name}.hef'
with open(file_name, 'wb') as f:
f.write(hef)
Теперь скопируем полученный HEF файл (в данном случае ei-car-yolov5s.hef) на Raspberry Pi с установленным ИИ-акселератором AI-kit. Можно оценить производительность модели следующими командами, которые показываеют результат 63 FPS, что весьма впечатляюще.
$ hailortcli run ei-car-yolov5s.hef
Running streaming inference (ei-car-yolov5s.hef):
Transform data: true
Type: auto
Quantized: true
Network ei-car-yolov5s/ei-car-yolov5s: 100% | 317 | FPS: 63.33 | ETA: 00:00:00
> Inference result:
Network group: ei-carhef-yolov5s
Frames count: 317
FPS: 63.33
Send Rate: 622.54 Mbit/s
Recv Rate: 77.33 Mbit/s
Испытания
Подключите и настройте AI kit согласно инструкции
Настроим виртуальную среду окружения Python используя скрипт из базы примеров Hailo для raspberry pi
$ git clone https://github.com/hailo-ai/hailo-rpi5-examples.git
$ cd hailo-rpi5-examples
$ source setup_env.sh
$./compile_postprocess.sh
$ pip3 install supervision
В папке hailo-rpi5-example/resources создаём конфигурационный файл yolov5.json следующего содержания:
{
"iou_threshold": 0.45,
"detection_threshold": 0.7,
"output_activation": "none",
"label_offset":1,
"max_boxes":200,
"anchors": [
[ 116, 90, 156, 198, 373, 326 ],
[ 30, 61, 62, 45, 59, 119 ],
[ 10, 13, 16, 30, 33, 23 ]
],
"labels": [
"unlabeled",
"car"
]
}
В эту же папку копируем ранее созданную HEF модель.
В качестве отправной точки будем использовать пример detection.py из hailo-rpi5-example, немного его модифицировав - меняем пайплайн GStreamer таким образом, чтобы включить элемент HailoTracker, чтобы после того как машина обнаружена прямоугольник с неё не убирался пока она не исчезнет из кадра.
Для подсчёта машин используется библиотека Roboflow Supervision.
Модифицированный detection.py выглядит так:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import hailo
import supervision as sv
from hailo_rpi_common import (
get_default_parser,
QUEUE,
get_caps_from_pad,
get_numpy_from_buffer,
GStreamerApp,
app_callback_class,
)
# -----------------------------------------------------------------------------------------------
# User-defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
# Inheritance from the app_callback_class
class user_app_callback_class(app_callback_class):
def __init__(self):
super().__init__()
self.new_variable = 42 # New variable example
def new_function(self): # New function example
return "The meaning of life is: "
# -----------------------------------------------------------------------------------------------
# User-defined callback function
# -----------------------------------------------------------------------------------------------
# This is the callback function that will be called when data is available from the pipeline
def app_callback(pad, info, user_data):
# Get the GstBuffer from the probe info
buffer = info.get_buffer()
# Check if the buffer is valid
if buffer is None:
return Gst.PadProbeReturn.OK
# Get the detections from the buffer
roi = hailo.get_roi_from_buffer(buffer)
hailo_detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
n = len(hailo_detections)
# Get the caps from the pad
_, w, h = get_caps_from_pad(pad)
boxes = np.zeros((n, 4))
confidence = np.zeros(n)
class_id = np.zeros(n)
tracker_id = np.empty(n)
for i, detection in enumerate(hailo_detections):
class_id[i] = detection.get_class_id()
confidence[i] = detection.get_confidence()
tracker_id[i] = detection.get_objects_typed(hailo.HAILO_UNIQUE_ID)[0].get_id()
bbox = detection.get_bbox()
boxes[i] = [bbox.xmin() * w, bbox.ymin() * h, bbox.xmax() * w, bbox.ymax() * h]
detections = sv.Detections(
xyxy=boxes,
confidence=confidence,
class_id=class_id,
tracker_id=tracker_id)
#print(tracker_id, confidence, boxes)
line_zone.trigger(detections)
textoverlay = app.pipeline.get_by_name("hailo_text")
textoverlay.set_property('text', f'OUT: {line_zone.in_count} | IN: {line_zone.out_count}')
textoverlay.set_property('font-desc', 'Sans 36')
return Gst.PadProbeReturn.OK
# -----------------------------------------------------------------------------------------------
# User Gstreamer Application
# -----------------------------------------------------------------------------------------------
# This class inherits from the hailo_rpi_common.GStreamerApp class
class GStreamerDetectionApp(GStreamerApp):
def __init__(self, args, user_data):
# Call the parent class constructor
super().__init__(args, user_data)
# Additional initialization code can be added here
# Set Hailo parameters these parameters should be set based on the model used
self.batch_size = 1
self.network_width = 640
self.network_height = 640
self.network_format = "RGB"
nms_score_threshold = 0.3
nms_iou_threshold = 0.45
# Temporary code: new postprocess will be merged to TAPPAS.
# Check if new postprocess so file exists
new_postprocess_path = os.path.join(self.current_path, '/home/naveen/hailo-rpi5-examples/resources/libyolo_hailortpp_post.so')
if os.path.exists(new_postprocess_path):
self.default_postprocess_so = new_postprocess_path
else:
self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')
if args.hef_path is not None:
self.hef_path = args.hef_path
# Set the HEF file path based on the network
elif args.network == "yolov6n":
self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef')
elif args.network == "yolov8s":
self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef')
elif args.network == "yolox_s_leaky":
self.hef_path = os.path.join(self.current_path, '../resources/yolox_s_leaky_h8l_mz.hef')
else:
assert False, "Invalid network type"
# User-defined label JSON file
if args.labels_json is not None:
self.labels_config = f' config-path={args.labels_json} '
# Temporary code
if not os.path.exists(new_postprocess_path):
print("New postprocess so file is missing. It is required to support custom labels. Check documentation for more information.")
exit(1)
else:
self.labels_config = ''
self.app_callback = app_callback
self.thresholds_str = (
f"nms-score-threshold={nms_score_threshold} "
f"nms-iou-threshold={nms_iou_threshold} "
f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
)
# Set the process title
setproctitle.setproctitle("Hailo Detection App")
self.create_pipeline()
def get_pipeline_string(self):
if self.source_type == "rpi":
source_element = (
#"libcamerasrc name=src_0 auto-focus-mode=AfModeManual ! "
"libcamerasrc name=src_0 auto-focus-mode=2 ! "
f"video/x-raw, format={self.network_format}, width=1536, height=864 ! "
+ QUEUE("queue_src_scale")
+ "videoscale ! "
f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=60/1 ! "
#f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height} ! "
)
elif self.source_type == "usb":
source_element = (
f"v4l2src device={self.video_source} name=src_0 ! "
"video/x-raw, width=640, height=480, framerate=30/1 ! "
)
else:
source_element = (
f"filesrc location={self.video_source} name=src_0 ! "
+ QUEUE("queue_dec264")
+ " qtdemux ! h264parse ! avdec_h264 max-threads=2 ! "
" video/x-raw, format=I420 ! "
)
source_element += QUEUE("queue_scale")
source_element += "videoscale n-threads=2 ! "
source_element += QUEUE("queue_src_convert")
source_element += "videoconvert n-threads=3 name=src_convert qos=false ! "
source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! "
pipeline_string = (
"hailomuxer name=hmux "
+ source_element
+ "tee name=t ! "
+ QUEUE("bypass_queue", max_size_buffers=20)
+ "hmux.sink_0 "
+ "t. ! "
+ QUEUE("queue_hailonet")
+ "videoconvert n-threads=3 ! "
f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! "
+ QUEUE("queue_hailofilter")
+ f"hailofilter so-path={self.default_postprocess_so} {self.labels_config} qos=false ! "
+ QUEUE("queue_hailotracker")
+ "hailotracker keep-tracked-frames=3 keep-new-frames=3 keep-lost-frames=3 ! "
+ QUEUE("queue_hmuc")
+ "hmux.sink_1 "
+ "hmux. ! "
+ QUEUE("queue_hailo_python")
+ QUEUE("queue_user_callback")
+ "identity name=identity_callback ! "
+ QUEUE("queue_hailooverlay")
+ "hailooverlay ! "
+ QUEUE("queue_videoconvert")
+ "videoconvert n-threads=3 qos=false ! "
+ QUEUE("queue_textoverlay")
+ "textoverlay name=hailo_text text='' valignment=top halignment=center ! "
+ QUEUE("queue_hailo_display")
+ f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "
)
print(pipeline_string)
return pipeline_string
if __name__ == "__main__":
# Create an instance of the user app callback class
user_data = user_app_callback_class()
START = sv.Point(0, 340)
END = sv.Point(640, 340)
line_zone = sv.LineZone(start=START, end=END, triggering_anchors=(sv.Position.BOTTOM_LEFT, sv.Position.BOTTOM_RIGHT))
parser = get_default_parser()
# Add additional arguments here
parser.add_argument(
"--network",
default="yolov6n",
choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'],
help="Which Network to use, default is yolov6n",
)
parser.add_argument(
"--hef-path",
default=None,
help="Path to HEF file",
)
parser.add_argument(
"--labels-json",
default=None,
help="Path to costume labels JSON file",
)
args = parser.parse_args()
app = GStreamerDetectionApp(args, user_data)
app.run()
Применение на записанных видеофайлах
python3 detection.py --hef-path ../resources/ei-car-yolov5s.hef --input rpi --labels-json ../resources/yolov5.json --show-fps
Применение на потоке с камеры
$ python3 detection.py --hef-path ../resources/ei-car-yolov5s.hef --input rpi --labels-json ../resources/yolov5.json --show-fps