# ****************************************************************************** # Copyright (c) 2021-2022. Kneron Inc. All rights reserved. * # ****************************************************************************** import argparse import os import platform import sys import threading import time import queue PWD = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(1, os.path.join(PWD, '..')) PWD = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(1, os.path.join(PWD, '..')) sys.path.insert(1, os.path.join(PWD, '../example/')) from utils.ExampleHelper import get_device_usb_speed_by_port_id from utils.ExamplePostProcess import post_process_yolo_v5 import kp import cv2 import joblib import numpy as np import math # FPS 측정을 위한 변수 초기화 prev_time = time.time() fps = 0 LOOP_TIME = 1 SCPU_FW_PATH = os.path.join(PWD, '../../res/firmware/KL630/kp_firmware.tar') #얼굴인식 대상 target_name = 'hj' #target_name = 'ys' #얼굴랜드마크 모델 FACE_LANDMARK_MODEL_FILE_PATH = os.path.join(PWD, '../../res/models/face_landmark.nef') #FACE_LANDMARK_MODEL_FILE_PATH = os.path.join(PWD, '../../res/models/sample.nef') #얼굴탐지모델 FACE_DETECTION_MODEL_FILE_PATH = os.path.join(PWD, '../../res/models/face_detection.nef') #통합모델 COMBINE_MODEL_FILE_PATH = os.path.join(PWD,'../../res/models/combine_lm_dt.nef') #감정인식 모델 EMOTION_MODEL_FILE_PATH = os.path.join(PWD,'../../res/weight/svm_JAFFE_LG_gram.pkl') #얼굴인식 모델 PCA_MODEL_FILE_PATH = os.path.join(PWD,f'../../res/weight/{target_name}_pca_model.npz') _LOCK = threading.Lock() _SEND_RUNNING = True _RECEIVE_RUNNING = True _image_to_inference = None _image_to_show = None landmark_image_to_inference = None yolo_result = None _generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor() landmark_generic_inference_input_descriptor = kp.GenericDataInferenceDescriptor() detection_device_group = None cropped_x1=0 cropped_y1=0 cropped_x2=0 cropped_y2=0 cropped_image = None class CameraThread(threading.Thread): def __init__(self, device_group: kp.DeviceGroup): super().__init__(daemon=True) self.device_group = device_group self.cap = None self.running = True def run(self): global _SEND_RUNNING, _image_to_inference try: # Initialize the camera if platform.system() == 'Windows': self.cap = cv2.VideoCapture(0, cv2.CAP_DSHOW) else: self.cap = cv2.VideoCapture(1, cv2.CAP_DSHOW) self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) while self.running and _SEND_RUNNING: if self.cap.isOpened(): ret, frame = self.cap.read() if not ret: print("Error: Failed to read frame from camera.") break _image_to_inference = frame try: inference_image = cv2.cvtColor(src=_image_to_inference, code=cv2.COLOR_BGR2BGR565) _generic_inference_input_descriptor.input_node_image_list = [ kp.GenericInputNodeImage( image=inference_image, resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE, padding_mode=kp.PaddingMode.KP_PADDING_CORNER, normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON ) ] print("send result",kp.inference.generic_image_inference_send(device_group=self.device_group, generic_inference_input_descriptor=_generic_inference_input_descriptor) ) except kp.ApiKPException as exception: print(f"Error during inference: {exception}") break else: print("Error: Camera is not opened.") break finally: if self.cap: self.cap.release() cv2.destroyAllWindows() def stop(self): self.running = False def drawbbox_pose(image, obj, landmarkcolor=(0, 255, 255),resized_image_width=0,resized_image_height=0,yolo_x1=0,yolo_y1=0,yolo_x2=0,yolo_y2=0): global prev_time # bbox에서 크롭 영역의 시작점과 너비/높이 가져오기 bbox = obj[0] # [x1, y1, w, h] detection_x1, detection_y1 = bbox[0], bbox[1] detection_w, detection_h = bbox[2], bbox[3] # 랜드마크 좌표 landmarks = obj[1] # 랜드마크 좌표 (리사이즈된 이미지 기준) # 크롭 영역 기준 스케일 scale_x = detection_w / resized_image_width scale_y = detection_h / resized_image_height # 랜드마크 좌표 변환 및 점 그리기 landmarks = np.array(landmarks).reshape((18, 2)) # 68개의 랜드마크 포인트 #print(landmarks) for i in range(18): lx, ly = landmarks[i] # 랜드마크 좌표 (리사이즈된 이미지 기준) lx = detection_x1 + lx * scale_x # 원본 이미지 좌표로 변환 (X) ly = detection_y1 + ly * scale_y # 원본 이미지 좌표로 변환 (Y) # 랜드마크 점 그리기 cv2.circle(image, (int(lx), int(ly)), 1, landmarkcolor, -1, lineType=cv2.LINE_AA) # #yolo detection box 그리기 # cv2.rectangle(img=image, # pt1=(int(yolo_x1), int(yolo_y1)), # pt2=(int(yolo_x2), int(yolo_y2)), # color=(0,0,255), # thickness=3) # FPS 계산 current_time = time.time() fps = 1 / (current_time - prev_time) prev_time = current_time # FPS 화면에 표시 cv2.putText(image, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, lineType=cv2.LINE_AA) return image def _get_max_preds(heatmaps): """Get keypoint predictions from score maps. Note: batch_size: N num_keypoints: K heatmap height: H heatmap width: W Args: heatmaps (np.ndarray[N, H, W, K): model predicted heatmaps. Returns: tuple: A tuple containing aggregated results. - preds (np.ndarray[N, K, 2]): Predicted keypoint location. - maxvals (np.ndarray[N, K, 1]): Scores (confidence) of the keypoints. """ assert isinstance(heatmaps, np.ndarray), ('heatmaps should be numpy.ndarray') assert heatmaps.ndim == 4, 'batch_images should be 4-ndim' N, _, W, K = heatmaps.shape heatmaps_reshaped = heatmaps.reshape((N, -1, K)) idx = np.argmax(heatmaps_reshaped, 1).reshape((N, K, 1)) maxvals = np.amax(heatmaps_reshaped, 1).reshape((N, K, 1)) preds = np.tile(idx, (1, 1, 2)).astype(np.float32) preds[:, :, 0] = preds[:, :, 0] % W preds[:, :, 1] = preds[:, :, 1] // W preds = np.where(np.tile(maxvals, (1, 1, 2)) > 0.0, preds, -1) return preds, maxvals def postprocess_(heatmaps, number_of_keypoints=68, post_process='megvii', body=False, **kwargs): """ Input: heatmaps (np.ndarray[N, H, W, K): model predicted heatmaps. Note: batch_size: N num_keypoints: K heatmap height: H heatmap width: W """ #heat_map : 115200개의 데이터 N, H, W, K = heatmaps.shape preds, maxvals = _get_max_preds(heatmaps) # preds (np.ndarray[N, K, 2]): Predicted keypoint location. # maxvals (np.ndarray[N, K, 1]): Scores (confidence) of the keypoints. # add +/-0.25 shift to the predicted locations for higher acc. for n in range(N): for k in range(K): # k = number of keypoint heatmap = heatmaps[n, :, :, k] px = int(preds[n][k][0]) py = int(preds[n][k][1]) if 1 < px < W - 1 and 1 < py < H - 1: diff = np.array([ heatmap[py][px + 1] - heatmap[py][px - 1], heatmap[py + 1][px] - heatmap[py - 1][px] ]) preds[n][k] += np.sign(diff) * .25 if post_process in ['megvii', 'megvii-mpii']: preds[n][k] += 0.5 else: raise NotImplementedError preds *= 4 # scale back to input size if post_process in ['megvii', 'megvii-mpii']: # normalization on the confidence score maxvals = maxvals / 255.0 + 0.5 keypoint = [] scores = [] preds = preds[0] maxvals = maxvals[0] for i in range(number_of_keypoints): if post_process == 'megvii' and body == True: if i > 0: # index flip if i % 2 == 0: index = i - 1 else: index = i + 1 else: index = i else: index = i keypoint_pair = preds[index].tolist() keypoint.extend(keypoint_pair) scores.append([float(maxvals[index][0]), i]) return keypoint, scores def convert_numpy_to_rgba_and_width_align_4(data): """Converts the numpy data into RGBA. 720 input is 4 byte width aligned. """ height, width, channel = data.shape width_aligned = 4 * math.ceil(width / 4.0) aligned_data = np.zeros((height, width_aligned, 4), dtype=np.int8) aligned_data[:height, :width, :channel] = data aligned_data = aligned_data.flatten() return aligned_data.tobytes() def face_detection_image_send_function(_device_group: kp.DeviceGroup) -> None: global _SEND_RUNNING, _RECEIVE_RUNNING camera_thread = CameraThread(device_group=_device_group) try: camera_thread.start() while _SEND_RUNNING: time.sleep(0.1) # Main thread can do other tasks or monitor status except KeyboardInterrupt: print("Stopping threads due to user interrupt.") finally: camera_thread.stop() camera_thread.join() _SEND_RUNNING = False _RECEIVE_RUNNING = False def face_detection_result_receive_function(_device_group: kp.DeviceGroup) -> None: global _image_to_show, _RECEIVE_RUNNING, landmark_generic_inference_input_descriptor # nef_radix = face_landmark_model_nef_descriptor.models[0].input_nodes[0].quantization_parameters.quantized_fixed_point_descriptor_list[0].radix # only support single model NEF # nef_model_width = face_landmark_model_nef_descriptor.models[0].input_nodes[0].shape_npu[3] # nef_model_height = face_landmark_model_nef_descriptor.models[0].input_nodes[0].shape_npu[2] # Model parameters nef_radix = 8 nef_model_width = 192 nef_model_height = 256 while _RECEIVE_RUNNING: try: generic_raw_result = kp.inference.generic_image_inference_receive(device_group=_device_group) print("receive result ", generic_raw_result) except kp.ApiKPException as exception: print(' - Error: inference failed, error = {}'.format(exception)) break with _LOCK: temp_image = _image_to_inference.copy() inf_node_output_list = [] for node_idx in range(generic_raw_result.header.num_output_node): inference_float_node_output = kp.inference.generic_inference_retrieve_float_node( node_idx=node_idx, generic_raw_result=generic_raw_result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW ) inf_node_output_list.append(inference_float_node_output) # do post-process _yolo_result = post_process_yolo_v5(inference_float_node_output_list=inf_node_output_list, hardware_preproc_info=generic_raw_result.header.hw_pre_proc_info_list[0], thresh_value=0.15, with_sigmoid=False) print("yolo box : ",_yolo_result) for yolo_box in _yolo_result.box_list: x1, y1, x2, y2 = map(int, [yolo_box.x1, yolo_box.y1, yolo_box.x2, yolo_box.y2]) cropped_image = temp_image[y1:y2, x1:x2] #cv2.imwrite(f'asd{x1,y1,x2,y2}.jpg',cropped_image) # resize to model input size cropped_image = np.array(list(cropped_image)) height, width = cropped_image.shape[:2] # 세로(Height), 가로(Width) img_input = cv2.resize(cropped_image, (nef_model_width, nef_model_height), interpolation=cv2.INTER_AREA) #cv2.imwrite(f'middle_{x1,y1,x2,y2}_{height,width}.jpg',img_input) # this model trained with normalize method: (datza - 128)/256 , cv2.rectangle(img=cropped_image, pt1=(int(x1), int(y1)), pt2=(int(x2), int(y2)), color=(0,0,255), thickness=3) img_input = img_input / 256 img_input -= 0.5 # toolchain calculate the radix value from input data (after normalization), and set it into NEF model. # NPU will divide input data "2^radix" automatically, so, we have to scaling the input data here due to this reason. img_input *= pow(2, nef_radix) # convert rgb to rgba and width align 4, due to npu requirement. img_buffer = convert_numpy_to_rgba_and_width_align_4(img_input) landmark_generic_inference_input_descriptor = kp.GenericDataInferenceDescriptor ( model_id=1, inference_number=1, ) inference_image = cv2.cvtColor(src=cropped_image, code=cv2.COLOR_BGR2BGR565) landmark_generic_inference_input_descriptor.input_node_data_list = [ ##241217 11:26 바뀐 부분 kp.GenericInputNodeData(buffer=img_buffer) ] try: temp_image = [] """ FACE LANDMARK INFERENCE IMAGE SEND """ print(' - Landmark Image input size Change Success') """ prepare generic data inference input descriptor """ for i in range(10): try: kp.inference.generic_data_inference_send(device_group=detection_device_group, generic_inference_input_descriptor=landmark_generic_inference_input_descriptor) landmark_result = kp.inference.generic_data_inference_receive(device_group=detection_device_group) except kp.ApiKPException as exception: print(' - Error: inference failed, error = {}'.format(exception)) exit(0) with _LOCK: temp_image = _image_to_inference.copy() print('[Retrieve Inference Node Output ]') inf_node_output_list = [] for node_idx in range(landmark_result.header.num_output_node): inference_float_node_output = kp.inference.generic_inference_retrieve_float_node( node_idx=node_idx, generic_raw_result=landmark_result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW ) inf_node_output_list.append(inference_float_node_output) out_data = inf_node_output_list[0].ndarray.transpose((0, 2, 3, 1)) print(out_data.shape) lm, score = postprocess_(out_data, number_of_keypoints=18) print("landmark : ",lm) temp_image = drawbbox_pose(temp_image, [[x1, y1, x2 - x1, y2 - y1], lm, score],resized_image_width=width,resized_image_height=height,yolo_x1=x1,yolo_y1=y1,yolo_x2=x2,yolo_y2=y2) except kp.ApiKPException as exception: print(' - Error: landmark inference failed, error = {}'.format(exception)) with _LOCK: temp_image = np.array(temp_image, dtype=np.uint8) # NumPy 배열로 변환 _image_to_show = temp_image.copy() _RECEIVE_RUNNING = False if __name__ == '__main__': detection_usb_port_id = 9 try: if kp.UsbSpeed.KP_USB_SPEED_HIGH != get_device_usb_speed_by_port_id(usb_port_id=detection_usb_port_id): print('\033[91m' + '[Error] Device is not run at high speed.' + '\033[0m') exit(0) except kp.ApiKPException as exception: print('Error: check device USB speed fail, port ID = \'{}\', error msg: [{}]'.format(detection_usb_port_id, str(exception))) exit(0) try: print(f'[Connect Device] : {detection_usb_port_id}') detection_device_group = kp.core.connect_devices(usb_port_ids=[detection_usb_port_id]) #print(detection_device_group) print(' - Success') except kp.ApiKPException as exception: print('Error: connect device fail, port ID = \'{}\', error msg: [{}]'.format(detection_usb_port_id, str(exception))) exit(0) print(f'[Set Device Timeout] : {detection_usb_port_id}') kp.core.set_timeout(device_group=detection_device_group, milliseconds=10000) print(' - Success') try: print(f'[Upload Firmware] : {detection_usb_port_id}') kp.core.load_firmware_from_file(device_group=detection_device_group, scpu_fw_path=SCPU_FW_PATH, ncpu_fw_path="") print(' - Success') except kp.ApiKPException as exception: print('Error: upload firmware failed, error = \'{}\''.format(str(exception))) exit(0) try: print(f'[Upload Model] : {detection_usb_port_id}') model_nef_descriptor = kp.core.load_model_from_file(device_group=detection_device_group, file_path=COMBINE_MODEL_FILE_PATH) print(' - Success') print('[Model NEF Information]') print(model_nef_descriptor) except kp.ApiKPException as exception: print('Error: upload model failed, error = \'{}\''.format(str(exception))) exit(0) _generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor( #detection model_id=32769, inference_number=0, ) print("[DEBUG] detection device group : ",detection_device_group) print('[Starting Detection Inference Work]') detection_send_thread = threading.Thread(target=face_detection_image_send_function, args=(detection_device_group,), daemon=True) detection_receive_thread = threading.Thread(target=face_detection_result_receive_function, args=(detection_device_group,), daemon=True) detection_send_thread.start() detection_receive_thread.start() cv2.namedWindow('Generic Inference', cv2.WND_PROP_ASPECT_RATIO or cv2.WINDOW_GUI_EXPANDED) cv2.setWindowProperty('Generic Inference', cv2.WND_PROP_ASPECT_RATIO, cv2.WND_PROP_ASPECT_RATIO) try: while True: with _LOCK: if _image_to_show is not None: cv2.imshow('Generic Inference', _image_to_show) if (27 == cv2.waitKey(10)) or (not _SEND_RUNNING) or (not _RECEIVE_RUNNING): break except KeyboardInterrupt: pass _SEND_RUNNING = False detection_send_thread.join() detection_receive_thread.join()