diff options
Diffstat (limited to 'megapixels/app/server/tasks/demo.py')
| -rw-r--r-- | megapixels/app/server/tasks/demo.py | 244 |
1 files changed, 244 insertions, 0 deletions
diff --git a/megapixels/app/server/tasks/demo.py b/megapixels/app/server/tasks/demo.py new file mode 100644 index 00000000..acc5dbac --- /dev/null +++ b/megapixels/app/server/tasks/demo.py @@ -0,0 +1,244 @@ + +import app.settings.app_cfg as cfg +from app.server.tasks import celery + +from celery.utils.log import get_task_logger +log = get_task_logger(__name__) + +opt_size = (256, 256,) + +@celery.task(bind=True) +def demo_task(self, uuid_name, fn): + + import sys + import os + from os.path import join + from pathlib import Path + import time + + import numpy as np + import cv2 as cv + import dlib + from PIL import Image + import matplotlib.pyplot as plt + + from app.utils import logger_utils, file_utils, im_utils, display_utils, draw_utils + from app.utils import plot_utils + from app.processors import face_detector, face_landmarks + from app.models.data_store import DataStore + + # TODO add selective testing + opt_gpu = -1 + opt_run_pose = True + opt_run_2d_68 = True + opt_run_3d_68 = True + opt_run_3d_68 = True + paths + + meta = { + 'step': 0, + 'total': 3, + 'message': 'Starting', + 'uuid': uuid_name, + 'data': {}, + } + paths = [] + + def step(msg, step=0): + meta['step'] += step + meta['message'] = msg + log.debug('> {}'.format(msg)) + self.update_state(state='PROCESSING', meta=meta) + + step('Loading image') + self.update_state(state='PROCESSING', meta=meta) + + # os.path.join('/user_content/', fn) + + # ------------------------------------------------- + # init here + + # load image + im = cv.imread(fn) + im_resized = im_utils.resize(im, width=opt_size[0], height=opt_size[1]) + + # ---------------------------------------------------------------------------- + # detect face + + face_detector_instance = face_detector.DetectorDLIBCNN(gpu=opt_gpu) # -1 for CPU + step('Detecting face') + st = time.time() + bboxes = face_detector_instance.detect(im_resized, largest=True) + bbox = bboxes[0] + dim = im_resized.shape[:2][::-1] + bbox_dim = bbox.to_dim(dim) + if not bbox: + log.error('No face detected') + meta['error'] = 'No face detected' + self.update_state(state='FAILURE', meta=meta) + return meta + else: + log.info(f'Detected face in {(time.time() - st):.2f}s') + + + # ---------------------------------------------------------------------------- + # detect 3D landmarks + + step('Generating 3D Landmarks') + log.info('loading 3D landmark generator files...') + landmark_detector_3d_68 = face_landmarks.FaceAlignment3D_68(gpu=opt_gpu) # -1 for CPU + log.info('generating 3D landmarks...') + st = time.time() + points_3d_68 = landmark_detector_3d_68.landmarks(im_resized, bbox_dim.to_xyxy()) + log.info(f'generated 3D landmarks in {(time.time() - st):.2f}s') + log.info('') + + # draw 3d landmarks + im_landmarks_3d_68 = im_resized.copy() + draw_utils.draw_landmarks3D(im_landmarks_3d_68, points_3d_68) + draw_utils.draw_bbox(im_landmarks_3d_68, bbox_dim) + + save_image('landmarks_3d_68', '3D Landmarks', im_landmarks_3d_68) + + def save_image(key, title, data): + fn = '{}_{}.jpg'.format(uuid_name, key) + fpath = os.path.join(cfg.DIR_SITE_USER_CONTENT, fn) + paths.append(fpath) + cv.imwrite(fpath, im_landmarks_3d_68) + + meta['data']['landmarks_3d_68'] = { + 'title': '3D Landmarks', + 'url': os.path.join('/user_content/', fn), + } + step('Generated 3D Landmarks', step=0) + + # ---------------------------------------------------------------------------- + # generate 3D GIF animation + + # step('Generating GIF Animation') + # log.info('generating 3D animation...') + # if not opt_fp_out: + # fpp_im = Path(opt_fp_in) + # fp_out = join(fpp_im.parent, f'{fpp_im.stem}_anim.gif') + # else: + # fp_out = opt_fp_out + # st = time.time() + # plot_utils.generate_3d_landmark_anim(np.array(points_3d_68), fp_out, + # size=opt_gif_size, num_frames=opt_gif_frames) + # log.info(f'Generated animation in {(time.time() - st):.2f}s') + # log.info(f'Saved to: {fp_out}') + # log.info('') + + + # # ---------------------------------------------------------------------------- + # # generate face vectors, only to test if feature extraction works + + # step('Generating face vectors') + # log.info('initialize face recognition model...') + # from app.processors import face_recognition + # face_rec = face_recognition.RecognitionDLIB() + # st = time.time() + # log.info('generating face vector...') + # vec = face_rec.vec(im_resized, bbox_dim) + # log.info(f'generated face vector in {(time.time() - st):.2f}s') + # log.info('') + + + # # ---------------------------------------------------------------------------- + # # generate 68 point landmarks using dlib + + # step('Generating 2D 68PT landmarks') + # log.info('initializing face landmarks 68 dlib...') + # from app.processors import face_landmarks + # landmark_detector_2d_68 = face_landmarks.Dlib2D_68() + # log.info('generating 2D 68PT landmarks...') + # st = time.time() + # points_2d_68 = landmark_detector_2d_68.landmarks(im_resized, bbox_dim) + # log.info(f'generated 2D 68PT face landmarks in {(time.time() - st):.2f}s') + # log.info('') + + + # # ---------------------------------------------------------------------------- + # # generate pose from 68 point 2D landmarks + + # if opt_run_pose: + # step('Generating pose') + # log.info('initialize pose...') + # from app.processors import face_pose + # pose_detector = face_pose.FacePoseDLIB() + # log.info('generating pose...') + # st = time.time() + # pose_data = pose_detector.pose(points_2d_68, dim) + # log.info(f'generated pose {(time.time() - st):.2f}s') + # log.info('') + + + # # ---------------------------------------------------------------------------- + # # generate pose from 68 point 2D landmarks + + step('Done') + + # done + # self.log.debug('Add age real') + # self.log.debug('Add age apparent') + # self.log.debug('Add gender') + + + # # 3DDFA + # self.log.debug('Add depth') + # self.log.debug('Add pncc') + + # # TODO + # self.log.debug('Add 3D face model') + # self.log.debug('Add face texture flat') + # self.log.debug('Add ethnicity') + + # display + # draw bbox + + # # draw 2d landmarks + # im_landmarks_2d_68 = im_resized.copy() + # draw_utils.draw_landmarks2D(im_landmarks_2d_68, points_2d_68) + # draw_utils.draw_bbox(im_landmarks_2d_68, bbox_dim) + + # # draw pose + # if opt_run_pose: + # im_pose = im_resized.copy() + # draw_utils.draw_pose(im_pose, pose_data['point_nose'], pose_data['points']) + # draw_utils.draw_degrees(im_pose, pose_data) + + # # draw animated GIF + # im = Image.open(fp_out) + # im_frames = [] + # duration = im.info['duration'] + # try: + # while True: + # im.seek(len(im_frames)) + # mypalette = im.getpalette() + # im.putpalette(mypalette) + # im_jpg = Image.new("RGB", im.size) + # im_jpg.paste(im) + # im_np = im_utils.pil2np(im_jpg.copy()) + # im_frames.append(im_np) + # except EOFError: + # pass # end of GIF sequence + + # n_frames = len(im_frames) + # frame_number = 0 + + # # show all images here + # cv.imshow('Original', im_resized) + # cv.imshow('2D 68PT Landmarks', im_landmarks_2d_68) + # cv.imshow('3D 68PT Landmarks', im_landmarks_3d_68) + # cv.imshow('Pose', im_pose) + # cv.imshow('3D 68pt GIF', im_frames[frame_number]) + + log.debug('done!!') + + for path in paths: + if os.path.exists(path): + os.remove(path) + + meta['step'] = meta['total'] + meta['state'] = 'SUCCESS' + return meta |
