diff options
Diffstat (limited to 'check/app/models')
| -rw-r--r-- | check/app/models/click_factory.py | 145 | ||||
| -rw-r--r-- | check/app/models/sql_factory.py | 50 |
2 files changed, 188 insertions, 7 deletions
diff --git a/check/app/models/click_factory.py b/check/app/models/click_factory.py new file mode 100644 index 0000000..61a3b5e --- /dev/null +++ b/check/app/models/click_factory.py @@ -0,0 +1,145 @@ +""" +Click processor factory +- Inspired by and used code from @wiretapped's HTSLAM codebase +- In particular the very useful +""" + +import os +import sys +from os.path import join +from pathlib import Path +import os +from os.path import join +import sys +from functools import update_wrapper, wraps +import itertools +from pathlib import Path +from glob import glob +import importlib +import logging + +import click +from app.settings import app_cfg as cfg + + +# -------------------------------------------------------- +# Click Group Class +# -------------------------------------------------------- + +# set global variable during parent class create +dir_plugins = None # set in create + +class ClickComplex: + """Wrapper generator for custom Click CLI's based on LR's coroutine""" + + def __init__(self): + pass + + + class CustomGroup(click.Group): + #global dir_plugins # from CliGenerator init + + # lists commands in plugin directory + def list_commands(self, ctx): + global dir_plugins # from CliGenerator init + rv = list(self.commands.keys()) + fp_cmds = [Path(x) for x in Path(dir_plugins).iterdir() \ + if str(x).endswith('.py') \ + and '__init__' not in str(x)] + for fp_cmd in fp_cmds: + try: + assert fp_cmd.name not in rv, "[-] Error: {} can't exist in cli.py and {}".format(fp_cmd.name) + except Exception as ex: + logging.getLogger('app').error('{}'.format(ex)) + rv.append(fp_cmd.stem) + rv.sort() + return rv + + # Complex version: gets commands in directory and in this file + # Based on code from @wiretapped + HTSLAM + def get_command(self, ctx, cmd_name): + global dir_plugins + if cmd_name in self.commands: + return self.commands[cmd_name] + ns = {} + fpp_cmd = Path(dir_plugins, cmd_name + '.py') + fp_cmd = fpp_cmd.as_posix() + if not fpp_cmd.exists(): + sys.exit('[-] {} file does not exist'.format(fpp_cmd)) + code = compile(fpp_cmd.read_bytes(), fp_cmd, 'exec') + try: + eval(code, ns, ns) + except Exception as ex: + logging.getLogger('vframe').error('exception: {}'.format(ex)) + @click.command() + def _fail(): + raise Exception('while loading {}'.format(fpp_cmd.name)) + _fail.short_help = repr(ex) + _fail.help = repr(ex) + return _fail + if 'cli' not in ns: + sys.exit('[-] Error: {} does not contain a cli function'.format(fp_cmd)) + return ns['cli'] + + @classmethod + def create(self, dir_plugins_local): + global dir_plugins + dir_plugins = dir_plugins_local + return self.CustomGroup + + + +class ClickSimple: + """Wrapper generator for custom Click CLI's""" + + def __init__(self): + pass + + + class CustomGroup(click.Group): + #global dir_plugins # from CliGenerator init + + # lists commands in plugin directory + def list_commands(self, ctx): + global dir_plugins # from CliGenerator init + rv = list(self.commands.keys()) + fp_cmds = [Path(x) for x in Path(dir_plugins).iterdir() \ + if str(x).endswith('.py') \ + and '__init__' not in str(x)] + for fp_cmd in fp_cmds: + assert fp_cmd.name not in rv, "[-] Error: {} can't exist in cli.py and {}".format(fp_cmd.name) + rv.append(fp_cmd.stem) + rv.sort() + return rv + + # Complex version: gets commands in directory and in this file + # from HTSLAM + def get_command(self, ctx, cmd_name): + global dir_plugins # from CliGenerator init + if cmd_name in self.commands: + return self.commands[cmd_name] + ns = {} + fpp_cmd = Path(dir_plugins, cmd_name + '.py') + fp_cmd = fpp_cmd.as_posix() + if not fpp_cmd.exists(): + sys.exit('[-] {} file does not exist'.format(fpp_cmd)) + code = compile(fpp_cmd.read_bytes(), fp_cmd, 'exec') + try: + eval(code, ns, ns) + except Exception as ex: + logging.getLogger('vframe').error('exception: {}'.format(ex)) + @click.command() + def _fail(): + raise Exception('while loading {}'.format(fpp_cmd.name)) + _fail.short_help = repr(ex) + _fail.help = repr(ex) + return _fail + if 'cli' not in ns: + sys.exit('[-] Error: {} does not contain a cli function'.format(fp_cmd)) + return ns['cli'] + + @classmethod + def create(self, dir_plugins_local): + global dir_plugins + dir_plugins = dir_plugins_local + return self.CustomGroup diff --git a/check/app/models/sql_factory.py b/check/app/models/sql_factory.py index d4a371e..1d32a68 100644 --- a/check/app/models/sql_factory.py +++ b/check/app/models/sql_factory.py @@ -3,11 +3,17 @@ import glob import time import pandas as pd -from sqlalchemy import create_engine, Table, Column, String, Integer, BigInteger +from PIL import Image + +from sqlalchemy import create_engine, Table, Column, String, Integer, BigInteger, text from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from app.settings import app_cfg as cfg +from app.settings.types import VALID_IMAGE_EXTENSIONS + +from app.utils.im_utils import compute_phash_int +from app.utils.file_utils import sha256 connection_url = "mysql+mysqlconnector://{}:{}@{}/{}?charset=utf8mb4".format( os.getenv("DB_USER"), @@ -19,16 +25,15 @@ connection_url = "mysql+mysqlconnector://{}:{}@{}/{}?charset=utf8mb4".format( loaded = False engine = create_engine(connection_url, encoding="utf-8", pool_recycle=3600) Session = sessionmaker(bind=engine) - Base = declarative_base() class FileTable(Base): """Table for storing various hashes of images""" __tablename__ = 'files' id = Column(Integer, primary_key=True) - sha256 = Column(String(36, convert_unicode=True), nullable=False) + sha256 = Column(String(64, convert_unicode=True), nullable=False) phash = Column(BigInteger, nullable=False) - ext = Column(String(3, convert_unicode=True), nullable=False) + ext = Column(String(4, convert_unicode=True), nullable=False) def toJSON(self): return { 'id': self.id, @@ -37,12 +42,21 @@ class FileTable(Base): 'ext': self.ext, } +Base.metadata.create_all(engine) + def search_by_phash(phash, threshold=6): """Search files for a particular phash""" connection = engine.connect() - cmd = 'SELECT files.*, BIT_COUNT(phash ^ :phash) as hamming_distance FROM images_image HAVING hamming_distance < :threshold ORDER BY hamming_distance ASC LIMIT 1' - matches = connection.execute(text(cmd), phash=phash, threshold=threshold) - return matches + cmd = 'SELECT files.*, BIT_COUNT(phash ^ :phash) as hamming_distance FROM files HAVING hamming_distance < :threshold ORDER BY hamming_distance ASC LIMIT 1' + matches = connection.execute(text(cmd), phash=phash, threshold=threshold).fetchall() + keys = ('id', 'sha256', 'phash', 'ext', 'score') + results = [ dict(zip(keys, values)) for values in matches ] + return results + +def search_by_hash(hash): + session = Session() + match = session.query(FileTable).filter(FileTable.sha256 == hash) + return match.first() def add_phash(sha256, phash, ext): """Add a file to the table""" @@ -51,4 +65,26 @@ def add_phash(sha256, phash, ext): ) session = Session() session.add(rec) + session.commit() session.flush() + +def add_phash_by_filename(path): + """Add a file by filename, getting all the necessary attributes""" + print(path) + if not os.path.exists(path): + print("File does not exist") + return + + dir, fn = os.path.split(path) + root, ext = os.path.splitext(fn) + ext = ext.strip('.') + if ext not in VALID_IMAGE_EXTENSIONS: + print("Not an image file") + return + + im = Image.open(path).convert('RGB') + phash = compute_phash_int(im) + + hash = sha256(path) + + add_phash(sha256=hash, phash=phash, ext=ext) |
