import os import glob import time import pandas as pd from sqlalchemy import create_engine, Table, Column, String, Integer, DateTime, Float from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from app.utils.file_utils import load_recipe, load_csv_safe from app.settings import app_cfg as cfg connection_url = "mysql+mysqlconnector://{}:{}@{}/{}?charset=utf8mb4".format( os.getenv("DB_USER"), os.getenv("DB_PASS"), os.getenv("DB_HOST"), os.getenv("DB_NAME") ) datasets = {} loaded = False Session = None def list_datasets(): return [dataset.describe() for dataset in datasets.values()] def get_dataset(name): return datasets[name] if name in datasets else None def get_table(name, table_name): dataset = get_dataset(name) return dataset.get_table(table_name) if dataset else None def load_sql_datasets(replace=False, base_model=None): global datasets, loaded, Session if loaded: return datasets engine = create_engine(connection_url, encoding="utf-8") # db.set_character_set('utf8') # dbc = db.cursor() # dbc.execute('SET NAMES utf8;') # dbc.execute('SET CHARACTER SET utf8;') # dbc.execute('SET character_set_connection=utf8;') Session = sessionmaker(bind=engine) for path in glob.iglob(os.path.join(cfg.DIR_FAISS_METADATA, "*")): dataset = load_sql_dataset(path, replace, engine, base_model) datasets[dataset.name] = dataset loaded = True return datasets def load_sql_dataset(path, replace=False, engine=None, base_model=None): name = os.path.basename(path) dataset = SqlDataset(name, base_model=base_model) for fn in glob.iglob(os.path.join(path, "*.csv")): key = os.path.basename(fn).replace(".csv", "") table = dataset.get_table(key) if table is None: continue if replace: print('loading dataset {}'.format(fn)) df = pd.read_csv(fn) # fix columns that are named "index", a sql reserved word df.columns = table.__table__.columns.keys() df.to_sql(name=table.__tablename__, con=engine, if_exists='replace', index=False) return dataset class SqlDataset: """ Bridge between the facial information CSVs connected to the datasets, and MySQL - each dataset should have files that can be loaded into these database models - names will be fixed to work in SQL (index -> id) - we can then have more generic models for fetching this info after doing a FAISS query """ def __init__(self, name, engine=None, base_model=None): self.name = name self.tables = {} if base_model is None: self.engine = create_engine(connection_url) base_model = declarative_base(engine) self.base_model = base_model def describe(self): return { 'name': self.name, 'tables': list(self.tables.keys()), } def get_identity(self, id): table = self.get_table('identity_meta') # id += 1 identity = table.query.filter(table.image_id <= id).order_by(table.image_id.desc()).first().toJSON() return { 'uuid': self.select('uuids', id), 'identity': identity, 'roi': self.select('roi', id), 'pose': self.select('pose', id), } def search_name(self, q): table = self.get_table('identity_meta') uuid_table = self.get_table('uuids') identity = table.query.filter(table.fullname.like(q)).order_by(table.fullname.desc()).limit(30) identities = [] for row in identity: uuid = uuid_table.query.filter(uuid_table.id == row.image_id).first() identities.append({ 'uuid': uuid.toJSON(), 'identity': row.toJSON(), }) return identities def select(self, table, id): table = self.get_table(table) if not table: return None session = Session() # for obj in session.query(table).filter_by(id=id): # print(table) obj = session.query(table).filter(table.id == id).first() session.close() return obj.toJSON() def get_table(self, type): if type in self.tables: return self.tables[type] elif type == 'uuids': self.tables[type] = self.uuid_table() elif type == 'roi': self.tables[type] = self.roi_table() elif type == 'identity_meta': self.tables[type] = self.identity_table() elif type == 'pose': self.tables[type] = self.pose_table() else: return None return self.tables[type] # ==> uuids.csv <== # index,uuid # 0,f03fd921-2d56-4e83-8115-f658d6a72287 def uuid_table(self): class UUID(self.base_model): __tablename__ = self.name + "_uuid" id = Column(Integer, primary_key=True) uuid = Column(String(36, convert_unicode=True), nullable=False) def toJSON(self): return { 'id': self.id, 'uuid': self.uuid, } return UUID # ==> roi.csv <== # index,h,image_height,image_index,image_width,w,x,y # 0,0.33000000000000007,250,0,250,0.32999999999999996,0.33666666666666667,0.35 def roi_table(self): class ROI(self.base_model): __tablename__ = self.name + "_roi" id = Column(Integer, primary_key=True) h = Column(Float, nullable=False) image_height = Column(Integer, nullable=False) image_index = Column(Integer, nullable=False) image_width = Column(Integer, nullable=False) w = Column(Float, nullable=False) x = Column(Float, nullable=False) y = Column(Float, nullable=False) def toJSON(self): return { 'id': self.id, 'image_index': self.image_index, 'image_height': self.image_height, 'image_width': self.image_width, 'w': self.w, 'h': self.h, 'x': self.x, 'y': self.y, } return ROI # ==> identity.csv <== # index,fullname,description,gender,images,image_index # 0,A. J. Cook,Canadian actress,f,1,0 def identity_table(self): class Identity(self.base_model): __tablename__ = self.name + "_identity" id = Column(Integer, primary_key=True) fullname = Column(String(36, convert_unicode=True), nullable=False) description = Column(String(36, convert_unicode=True), nullable=False) gender = Column(String(1, convert_unicode=True), nullable=False) images = Column(Integer, nullable=False) image_id = Column(Integer, nullable=False) def toJSON(self): return { 'id': self.id, 'image_id': self.image_id, 'fullname': self.fullname, 'images': self.images, 'gender': self.gender, 'description': self.description, } return Identity # ==> pose.csv <== # index,image_index,pitch,roll,yaw # 0,0,11.16264458441435,10.415885631337728,22.99719032415318 def pose_table(self): class Pose(self.base_model): __tablename__ = self.name + "_pose" id = Column(Integer, primary_key=True) image_id = Column(Integer, primary_key=True) pitch = Column(Float, nullable=False) roll = Column(Float, nullable=False) yaw = Column(Float, nullable=False) def toJSON(self): return { 'id': self.id, 'image_id': self.image_id, 'pitch': self.pitch, 'roll': self.roll, 'yaw': self.yaw, } return Pose