import os import re import uuid import time import dlib import tempfile import simplejson as json import numpy as np from flask import Blueprint, request, jsonify from PIL import Image, ImageOps # todo: try to remove PIL dependency from celery.result import AsyncResult from app.server.tasks import celery from app.server.tasks import task_lookup, list_active_tasks # from app.models.sql_factory import load_sql_datasets, list_datasets, get_dataset, get_table api_task = Blueprint('task', __name__) @api_task.route('/') def index(): """List active tasks""" return jsonify(list_active_tasks) @api_task.route('/status//') def task_status(task_name, task_id): """Return celery image processing status""" if task_name in task_lookup: task = task_lookup[task_name]['task'].AsyncResult(task_id) # task = AsyncResult(task_id, app=celery) if task_name not in task_lookup or task.info is None: return jsonify({ 'state': 'error', 'percent': 100, 'message': 'Unknown task', }) # app.logger.info('task state: {}'.format(task.state)) if task.state == 'PENDING': response = { 'state': task.state, 'percent': 0, 'message': 'Pending...', 'data': task.info, } elif task.state != 'FAILURE': response = { 'state': task.state, 'percent': task.info.get('percent', 0), 'uuid': task.info.get('uuid', 0), 'message': task.info.get('message', ''), 'data': task.info, } if 'result' in task.info: response['result'] = task.info['result'] else: # something went wrong in the background job response = { 'state': task.state, 'percent': 100, 'message': str(task.info), # this is the exception raised 'data': task.info, } return jsonify(response) @api_task.route('/upload/sleep', methods=['GET', 'POST']) def sleep_test(): """ Test the Celery system using a task that sleeps. """ async_task = task_lookup['sleep']['task'].apply_async(args=['sleep_test']) task_url = '/task/status/{}/{}'.format('sleep', async_task.id) return jsonify({ 'result': True, 'task_url': task_url, }) @api_task.route('/upload/blur', methods=['POST']) def upload(): return process('blur') @api_task.route('/upload/demo', methods=['POST']) def demo(): return process('demo') def process(style): """ Process an image in a particular style """ print('style: {}'.format(style)) if style in task_lookup: task = task_lookup[style]['task'] print('task', task) else: return jsonify({ 'result': False, 'error': 'Unknown task', }) print('get file...') file = request.files['query_img'] uuid_str = str(uuid.uuid4()) print('[+] style: {}'.format(style)) print('[+] uuid_name: {}'.format(uuid_str)) im = Image.open(file.stream).convert('RGB') im = ImageOps.fit(im, (256, 256,)) tmpfile = tempfile.NamedTemporaryFile(delete=False) # Save image to disk print('[+] Save image to temporary file') im.save(tmpfile, 'JPEG', quality=80) print('[+] Start celery') async_task = task.apply_async(args=[uuid_str, tmpfile.name]) task_url = '/task/status/{}/{}'.format(style, async_task.id) return jsonify({ 'result': True, 'taskURL': task_url, 'uuid': uuid_str })