diff options
| author | Jules Laplace <julescarbon@gmail.com> | 2019-01-11 20:38:36 +0100 |
|---|---|---|
| committer | Jules Laplace <julescarbon@gmail.com> | 2019-01-11 20:38:36 +0100 |
| commit | 334ea5a2a91da853dc6faf7f48aaa12599201218 (patch) | |
| tree | 24c469d58ae7c15d09f0dc76836756af2fa4ff51 /megapixels/app/server | |
| parent | 3202498b5c195f103a7cd13609785c2bb97c2da6 (diff) | |
enable celery tasks
Diffstat (limited to 'megapixels/app/server')
| -rw-r--r-- | megapixels/app/server/api_task.py | 187 | ||||
| -rw-r--r-- | megapixels/app/server/create.py | 5 | ||||
| -rw-r--r-- | megapixels/app/server/tasks/__init__.py | 47 | ||||
| -rw-r--r-- | megapixels/app/server/tasks/blur.py | 91 | ||||
| -rw-r--r-- | megapixels/app/server/tasks/sleep.py | 38 |
5 files changed, 368 insertions, 0 deletions
diff --git a/megapixels/app/server/api_task.py b/megapixels/app/server/api_task.py new file mode 100644 index 00000000..acdb5b7d --- /dev/null +++ b/megapixels/app/server/api_task.py @@ -0,0 +1,187 @@ +import os +import re +import uuid +import time +import dlib +import simplejson as json +import numpy as np +from flask import Blueprint, request, jsonify +from PIL import Image # todo: try to remove PIL dependency + +# from app.models.sql_factory import load_sql_datasets, list_datasets, get_dataset, get_table +# from app.utils.im_utils import pil2np + +from celery.result import AsyncResult +from app.server.tasks import celery + +api_task = Blueprint('api_task', __name__) + +@api_task.route('/') +def index(): + """Dummy index""" + return jsonify({}) + +# from flask import render_template, redirect, url_for, send_from_directory +# from flask import request, make_response, jsonify +# from . import main, utils +from app.server.tasks import task_lookup, list_active_tasks +# from PIL import Image, ImageOps +# import cv2 as cv + +# import imutils + +@api_task.route('/<task_name>/<task_id>') +def task_status(task_name, task_id): + """Return celery image processing status""" + if task_name in task_lookup: + task = task_lookup[task_name]['task'].AsyncResult(task_id) + # task = AsyncResult(task_id, app=celery) + else: + return jsonify({ + 'state': 'error', + 'percent': 100, + 'message': 'Unknown task' + }) + + # app.logger.info('task state: {}'.format(task.state)) + if task.state == 'PENDING': + response = { + 'state': task.state, + 'percent': 0, + 'message': 'Pending...' + } + elif task.state != 'FAILURE': + response = { + 'state': task.state, + 'percent': task.info.get('percent', 0), + 'uuid': task.info.get('uuid', 0), + 'message': task.info.get('message', '') + } + if 'result' in task.info: + response['result'] = task.info['result'] + else: + # something went wrong in the background job + response = { + 'state': task.state, + 'percent': 100, + 'message': str(task.info), # this is the exception raised + } + return jsonify(response) + +@api_task.route('/upload/sleep', methods=['GET', 'POST']) +def sleep_test(): + async_task = task_lookup['sleep']['task'].apply_async(args=['sleep_test']) + task_url = '/task/{}/{}'.format('sleep', async_task.id) + return jsonify({ + 'result': True, + 'task_url': task_url, + }) + +# @api_task.route('/upload', methods=['POST']) +# def upload(): + +# style = request.form['style'] +# print('style',style) +# if style in task_lookup: +# task = task_lookup[style]['task'] +# print('task',task) +# else: +# return jsonify({ +# 'result': False, +# 'error': 'Unknown task', +# }) + +# file = request.files['user_image'] +# agree = bool(request.form['agree']) +# ext = request.form['ext'] +# if ext is None: +# ext = request.files['ext'] + +# uuid_name = str(uuid.uuid4()) + +# app.logger.info('[+] style: {}'.format(style)) +# app.logger.info('[+] ext: {}'.format(ext)) +# app.logger.info('[+] uuid_name: {}'.format(uuid_name)) +# app.logger.info('[+] agreed: {}'.format(agree)) + +# # convert PNG to JPG +# print('[+] Resizing image') + +# # LOL MaskRCNN needs to be run outside of the Celery Task +# im = Image.open(file.stream).convert('RGB') +# im = ImageOps.fit(im,(512,512)) +# if agree: +# upload_folder = app.config['UPLOADS'] +# else: +# upload_folder = app.config['UPLOADS_PRIVATE'] + +# fpath = os.path.join(upload_folder, uuid_name + '.jpg') + +# # Save image to disk +# print('[+] Save image to {}'.format(fpath)) +# im.save(fpath, 'JPEG', quality=100) +# im_pil_256 = im.resize((256,256)) + +# print('[+] ensure_np...') +# im_np = imx.ensure_np(im_pil_256) +# #print('[+] resize np...') +# #im_np = imutils.resize(im_np,width=256) + +# upload_dir, render_dir, json_dir, upload_uri, render_uri = get_paths(agree) + +# print('[+] Run mrcnn...') +# try: +# result = mask_rcnn.create_segmentations(im_np,concat=True) +# except: +# print('[-] Error. Could not run mask_rcnn') +# result = [] + +# if len(result) > 0: +# result = result[0] + +# # save data, then pass to celery task +# print('[+] Save masks') +# seg_mask = result['seg_mask'] +# fpath_seg_mask = os.path.join(render_dir, uuid_name + '_seg_mask.jpg') +# #cv.imwrite(fpath_seg_mask,cv.cvtColor(seg_mask,cv.COLOR_BGR2RGB)) +# #seg_mask = seg_mask[:,:,::-1] +# seg_mask_pil = imx.ensure_pil(seg_mask) +# seg_mask_pil.save(fpath_seg_mask, 'JPEG', quality=100) + +# im_mask = result['im_mask'] +# fpath_im_mask = os.path.join(render_dir, uuid_name + '_im_mask.jpg') +# #im_mask = im_mask[:,:,::-1] +# im_mask_pil = imx.ensure_pil(im_mask) +# im_mask_pil.save(fpath_im_mask, 'JPEG',quality=100) +# #cv.imwrite(fpath_im_mask,cv.cvtColor(im_mask,cv.COLOR_BGR2RGB)) + +# celery_result = { +# 'score':str(result['score']), +# 'name':str(result['name']), +# 'class_index':str(result['class_index']), +# 'color':str(result['color']), +# 'fp_im_mask':fpath_im_mask, +# 'fp_seg_mask':fpath_seg_mask, +# 'valid':True +# } +# else: +# print('[-] no reults. process background only') +# celery_result = { +# 'score':None, +# 'name':None, +# 'class_index':None, +# 'color':None, +# 'fp_im_mask':None, +# 'fp_seg_mask':None, +# 'valid':False +# } + +# print('[+] Start celery') +# async_task = task.apply_async(args=[uuid_name, agree, celery_result]) +# task_url = url_for('main.task_status', task_name=style, task_id=async_task.id) + +# return jsonify({ +# 'result': True, +# 'task_url': task_url, +# 'uuid': uuid_name +# }) diff --git a/megapixels/app/server/create.py b/megapixels/app/server/create.py index 4b1333b9..f46bb2a0 100644 --- a/megapixels/app/server/create.py +++ b/megapixels/app/server/create.py @@ -2,7 +2,9 @@ from flask import Flask, Blueprint, jsonify, send_from_directory from flask_sqlalchemy import SQLAlchemy from app.models.sql_factory import connection_url, load_sql_datasets +from app.settings import app_cfg as cfg from app.server.api import api +from app.server.api_task import api_task db = SQLAlchemy() @@ -13,11 +15,14 @@ def create_app(script_info=None): app = Flask(__name__, static_folder='static', static_url_path='') app.config['SQLALCHEMY_DATABASE_URI'] = connection_url app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False + app.config['CELERY_BROKER_URL'] = cfg.CELERY_BROKER_URL + app.config['CELERY_RESULT_BACKEND'] = cfg.CELERY_RESULT_BACKEND db.init_app(app) datasets = load_sql_datasets(replace=False, base_model=db.Model) app.register_blueprint(api, url_prefix='/api') + app.register_blueprint(api_task, url_prefix='/task') app.add_url_rule('/<path:file_relative_path_to_root>', 'serve_page', serve_page, methods=['GET']) @app.route('/', methods=['GET']) diff --git a/megapixels/app/server/tasks/__init__.py b/megapixels/app/server/tasks/__init__.py new file mode 100644 index 00000000..bac7309f --- /dev/null +++ b/megapixels/app/server/tasks/__init__.py @@ -0,0 +1,47 @@ +import simplejson as json +from app.settings import app_cfg as cfg +from celery import Celery + +celery = Celery(__name__, backend=cfg.CELERY_RESULT_BACKEND, broker=cfg.CELERY_BROKER_URL) + +from app.server.tasks.sleep import sleep_task +# from app.server.tasks.blur import blur_task + +def list_active_tasks(): + dropdown = {} + for k,v in task_lookup.items(): + if 'active' not in v or v['active'] is not False: + is_default = 'default' in v and v['default'] is True + task = { + 'name': k, + 'title': v['title'], + 'selected': is_default, + } + dropdown[k] = task + return dropdown + +################################################################### +# Add all valid tasks to this lookup. +# Set 'active': False to disable a task +# Set 'default': True to define the default task + +task_lookup = { + 'sleep': { + 'title': 'Sleep Test', + 'task': sleep_task, + 'active': True, + 'default': True, + }, + # 'blur': { + # 'title': 'Blur', + # 'task': blur_task, + # 'active': False, + # }, + # 'task_dull': { + # 'title': 'DullDream V2', + # 'task': task_dull, + # 'active': True, + # 'default': True, + # } +} + diff --git a/megapixels/app/server/tasks/blur.py b/megapixels/app/server/tasks/blur.py new file mode 100644 index 00000000..508de477 --- /dev/null +++ b/megapixels/app/server/tasks/blur.py @@ -0,0 +1,91 @@ +import os +import sys +import time +import datetime +import json +from PIL import Image, ImageFilter +import cv2 as cv +import numpy as np +from . import main, utils +from .. import basemodels +from flask import current_app as app +from .paths import get_paths +celery = basemodels.celery +from celery.utils.log import get_task_logger +celery_logger = get_task_logger(__name__) +import imutils + +@celery.task(bind=True) +def blur_task(self, uuid_name, agree, extra): + """Process image and update during""" + celery_logger.debug('process_image_task, uuid: {}'.format(uuid_name)) + + upload_dir, render_dir, json_dir, upload_uri, render_uri = get_paths(agree) + + files = [] + + im = Image.open(os.path.join(upload_dir, uuid_name + '.jpg')).convert('RGB') + im = im.resize((256,256)) + files.append({ + 'title': 'Original image', + 'fn': upload_uri + uuid_name + '.jpg' + }) + + self.update_state( + state = 'PROCESSING', + meta = { + 'percent': 0.25, + 'message': 'Applying blur', + 'uuid': uuid_name + }) + + im_np = utils.ensure_np(im) + im_blur = cv.blur(im_np, (5,5), 1.0) + im_blur_pil = utils.ensure_pil(im_blur) + + fn = uuid_name + '_blur.jpg' + fpath = os.path.join(render_dir, fn) + im_blur_pil.save(fpath, 'JPEG', quality=95) + + files.append({ + 'title': 'Blurred image', + 'fn': render_uri + uuid_name + '_blur.jpg' + }) + + time.sleep(3) + + self.update_state( + state = 'PROCESSING', + meta = { + 'percent': 0.50, + 'message': 'Sleeping for some reason', + 'uuid': uuid_name + }) + time.sleep(2) + + self.update_state( + state = 'PROCESSING', + meta = { + 'percent': 0.75, + 'message': 'Sleeping some more', + 'uuid': uuid_name + }) + time.sleep(2) + + data = { + 'uuid': uuid_name, + 'date': str(datetime.datetime.now()), + 'files': files + } + + json_path = os.path.join(json_dir, uuid_name + '.json') + with open(json_path, 'w') as json_file: + json.dump(data, json_file) + + celery_logger.debug('ok') + + return { + 'percent': 100, + 'state': 'complete', + 'uuid': uuid_name, + } diff --git a/megapixels/app/server/tasks/sleep.py b/megapixels/app/server/tasks/sleep.py new file mode 100644 index 00000000..9b91cc52 --- /dev/null +++ b/megapixels/app/server/tasks/sleep.py @@ -0,0 +1,38 @@ +import time + +# from .. import basemodels +# celery = basemodels.celery + +from celery.utils.log import get_task_logger +celery_logger = get_task_logger(__name__) + +from app.server.tasks import celery + +import imutils + +@celery.task(bind=True) +def sleep_task(self, uuid_name): + celery_logger.debug('sleep_task'.format(uuid_name)) + msgs = [ + {'msg':'Uploaded OK','time':.1}, + {'msg':'Segmenting Image...','time':2}, + {'msg':'Found: Person, Horse','time':1}, + {'msg':'Creating Pix2Pix','time':2} + ] + for i,m in enumerate(msgs): + percent = int(float(i)/float(len(msgs))*100.0) + self.update_state( + state = 'PROCESSING', + meta = { + 'percent': percent, + 'message': m['msg'], + 'uuid': uuid_name + }) + celery_logger.debug(m['msg']) + time.sleep(m['time']) + + return { + 'percent': 100, + 'state': 'complete', + 'uuid': uuid_name + } |
