import os import re import time import dlib import numpy as np import operator from flask import Blueprint, request, jsonify from PIL import Image # todo: try to remove PIL dependency from app.processors import face_recognition from app.processors import face_detector from app.processors.faiss import load_faiss_databases from app.models.sql_factory import load_sql_datasets, list_datasets, get_dataset, get_table from app.utils.im_utils import pil2np sanitize_re = re.compile('[\W]+') valid_exts = ['.gif', '.jpg', '.jpeg', '.png'] LIMIT = 9 THRESHOLD = 0.3 api = Blueprint('api', __name__) faiss_datasets = load_faiss_databases() @api.route('/') def index(): """List the datasets and their fields""" return jsonify({ 'datasets': list_datasets() }) @api.route('/dataset/') def show(dataset_name): """Show the data that a dataset will return""" dataset = get_dataset(dataset_name) if dataset: return jsonify(dataset.describe()) else: return jsonify({ 'status': 404 }) @api.route('/dataset//face', methods=['POST']) def upload(dataset_name): """Query an image against FAISS and return the matching identities""" start = time.time() dataset = get_dataset(dataset_name) if dataset_name not in faiss_datasets: return jsonify({ 'error': 'bad_dataset' }) faiss_dataset = faiss_datasets[dataset_name] file = request.files['query_img'] fn = file.filename if fn.endswith('blob'): # FIX PNG IMAGES? fn = 'filename.jpg' basename, ext = os.path.splitext(fn) # print("got {}, type {}".format(basename, ext)) if ext.lower() not in valid_exts: return jsonify({ 'error': 'not_an_image' }) im = Image.open(file.stream).convert('RGB') im_np = pil2np(im) # Face detection detector = face_detector.DetectorDLIBHOG() # get detection as BBox object bboxes = detector.detect(im_np, largest=True) if not bboxes or not len(bboxes): return jsonify({ 'error': 'bbox' }) bbox = bboxes[0] if not bbox: return jsonify({ 'error': 'bbox' }) dim = im_np.shape[:2][::-1] bbox = bbox.to_dim(dim) # convert back to real dimensions # print("got bbox") if not bbox: return jsonify({ 'error': 'bbox' }) # extract 128-D vector recognition = face_recognition.RecognitionDLIB(gpu=-1) vec = recognition.vec(im_np, bbox) query = np.array([ vec ]).astype('float32') # query FAISS distances, indexes = faiss_dataset.search(query, LIMIT) if len(indexes) == 0 or len(indexes[0]) == 0: return jsonify({ 'error': 'nomatch' }) # get the results for this single query... distances = distances[0] indexes = indexes[0] dists = [] ids = [] for _d, _i in zip(distances, indexes): if _d <= THRESHOLD: dists.append(round(float(_d), 2)) ids.append(_i) identities = [ dataset.get_identity(int(_i)) for _i in ids ] identities = list(filter(None, identities)) # print(distances) # print(ids) # 'bbox': str(bboxes[0]), # 'bbox_dim': str(bbox), # print(bboxes[0]) # print(bbox) query = { 'timing': round(time.time() - start, 3), 'bbox': str(bbox), } # print(results) return jsonify({ 'query': query, 'results': identities, 'distances': dists, }) @api.route('/dataset//name', methods=['GET','POST']) def name_lookup(dataset_name): """Find a name in the dataset""" start = time.time() dataset = get_dataset(dataset_name) q = request.args.get('q') q = re.sub('[^a-zA-Z. ]+', '*', q) terms = q.split(' ') query = { 'q': q, 'timing': time.time() - start, } if len(terms) == 0: return jsonify({ 'query': query, 'results': [] }) lookup = {} results_lookup = {} names = dataset.search_name(q + '%') for name in names: if name.id in lookup: print(name.fullname) lookup[name.id] += 4 else: print(name.fullname) lookup[name.id] = 4 results_lookup[name.id] = name for i, term in enumerate(terms[0:5]): search_term = '%' + term + '%' names = dataset.search_name(search_term) if len(term) > 0 else [] descriptions = dataset.search_description(search_term) if len(term) > 0 else [] for name in names: if name.id in lookup: print(name.fullname) lookup[name.id] += 2 else: print(name.fullname) lookup[name.id] = 2 results_lookup[name.id] = name for name in descriptions: if name.id in lookup: print(name.fullname) lookup[name.id] += 1 else: print(name.fullname) lookup[name.id] = 1 results_lookup[name.id] = name sorted_names = sorted(lookup.items(), key=operator.itemgetter(1), reverse=True)[0:10] top_names = [results_lookup[item[0]] for item in sorted_names] results = dataset.get_file_records_for_identities(top_names) print(results) return jsonify({ 'query': query, 'results': results, })