summaryrefslogtreecommitdiff
path: root/check/app/models
diff options
context:
space:
mode:
Diffstat (limited to 'check/app/models')
-rw-r--r--check/app/models/click_factory.py145
-rw-r--r--check/app/models/sql_factory.py50
2 files changed, 188 insertions, 7 deletions
diff --git a/check/app/models/click_factory.py b/check/app/models/click_factory.py
new file mode 100644
index 0000000..61a3b5e
--- /dev/null
+++ b/check/app/models/click_factory.py
@@ -0,0 +1,145 @@
+"""
+Click processor factory
+- Inspired by and used code from @wiretapped's HTSLAM codebase
+- In particular the very useful
+"""
+
+import os
+import sys
+from os.path import join
+from pathlib import Path
+import os
+from os.path import join
+import sys
+from functools import update_wrapper, wraps
+import itertools
+from pathlib import Path
+from glob import glob
+import importlib
+import logging
+
+import click
+from app.settings import app_cfg as cfg
+
+
+# --------------------------------------------------------
+# Click Group Class
+# --------------------------------------------------------
+
+# set global variable during parent class create
+dir_plugins = None # set in create
+
+class ClickComplex:
+ """Wrapper generator for custom Click CLI's based on LR's coroutine"""
+
+ def __init__(self):
+ pass
+
+
+ class CustomGroup(click.Group):
+ #global dir_plugins # from CliGenerator init
+
+ # lists commands in plugin directory
+ def list_commands(self, ctx):
+ global dir_plugins # from CliGenerator init
+ rv = list(self.commands.keys())
+ fp_cmds = [Path(x) for x in Path(dir_plugins).iterdir() \
+ if str(x).endswith('.py') \
+ and '__init__' not in str(x)]
+ for fp_cmd in fp_cmds:
+ try:
+ assert fp_cmd.name not in rv, "[-] Error: {} can't exist in cli.py and {}".format(fp_cmd.name)
+ except Exception as ex:
+ logging.getLogger('app').error('{}'.format(ex))
+ rv.append(fp_cmd.stem)
+ rv.sort()
+ return rv
+
+ # Complex version: gets commands in directory and in this file
+ # Based on code from @wiretapped + HTSLAM
+ def get_command(self, ctx, cmd_name):
+ global dir_plugins
+ if cmd_name in self.commands:
+ return self.commands[cmd_name]
+ ns = {}
+ fpp_cmd = Path(dir_plugins, cmd_name + '.py')
+ fp_cmd = fpp_cmd.as_posix()
+ if not fpp_cmd.exists():
+ sys.exit('[-] {} file does not exist'.format(fpp_cmd))
+ code = compile(fpp_cmd.read_bytes(), fp_cmd, 'exec')
+ try:
+ eval(code, ns, ns)
+ except Exception as ex:
+ logging.getLogger('vframe').error('exception: {}'.format(ex))
+ @click.command()
+ def _fail():
+ raise Exception('while loading {}'.format(fpp_cmd.name))
+ _fail.short_help = repr(ex)
+ _fail.help = repr(ex)
+ return _fail
+ if 'cli' not in ns:
+ sys.exit('[-] Error: {} does not contain a cli function'.format(fp_cmd))
+ return ns['cli']
+
+ @classmethod
+ def create(self, dir_plugins_local):
+ global dir_plugins
+ dir_plugins = dir_plugins_local
+ return self.CustomGroup
+
+
+
+class ClickSimple:
+ """Wrapper generator for custom Click CLI's"""
+
+ def __init__(self):
+ pass
+
+
+ class CustomGroup(click.Group):
+ #global dir_plugins # from CliGenerator init
+
+ # lists commands in plugin directory
+ def list_commands(self, ctx):
+ global dir_plugins # from CliGenerator init
+ rv = list(self.commands.keys())
+ fp_cmds = [Path(x) for x in Path(dir_plugins).iterdir() \
+ if str(x).endswith('.py') \
+ and '__init__' not in str(x)]
+ for fp_cmd in fp_cmds:
+ assert fp_cmd.name not in rv, "[-] Error: {} can't exist in cli.py and {}".format(fp_cmd.name)
+ rv.append(fp_cmd.stem)
+ rv.sort()
+ return rv
+
+ # Complex version: gets commands in directory and in this file
+ # from HTSLAM
+ def get_command(self, ctx, cmd_name):
+ global dir_plugins # from CliGenerator init
+ if cmd_name in self.commands:
+ return self.commands[cmd_name]
+ ns = {}
+ fpp_cmd = Path(dir_plugins, cmd_name + '.py')
+ fp_cmd = fpp_cmd.as_posix()
+ if not fpp_cmd.exists():
+ sys.exit('[-] {} file does not exist'.format(fpp_cmd))
+ code = compile(fpp_cmd.read_bytes(), fp_cmd, 'exec')
+ try:
+ eval(code, ns, ns)
+ except Exception as ex:
+ logging.getLogger('vframe').error('exception: {}'.format(ex))
+ @click.command()
+ def _fail():
+ raise Exception('while loading {}'.format(fpp_cmd.name))
+ _fail.short_help = repr(ex)
+ _fail.help = repr(ex)
+ return _fail
+ if 'cli' not in ns:
+ sys.exit('[-] Error: {} does not contain a cli function'.format(fp_cmd))
+ return ns['cli']
+
+ @classmethod
+ def create(self, dir_plugins_local):
+ global dir_plugins
+ dir_plugins = dir_plugins_local
+ return self.CustomGroup
diff --git a/check/app/models/sql_factory.py b/check/app/models/sql_factory.py
index d4a371e..1d32a68 100644
--- a/check/app/models/sql_factory.py
+++ b/check/app/models/sql_factory.py
@@ -3,11 +3,17 @@ import glob
import time
import pandas as pd
-from sqlalchemy import create_engine, Table, Column, String, Integer, BigInteger
+from PIL import Image
+
+from sqlalchemy import create_engine, Table, Column, String, Integer, BigInteger, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from app.settings import app_cfg as cfg
+from app.settings.types import VALID_IMAGE_EXTENSIONS
+
+from app.utils.im_utils import compute_phash_int
+from app.utils.file_utils import sha256
connection_url = "mysql+mysqlconnector://{}:{}@{}/{}?charset=utf8mb4".format(
os.getenv("DB_USER"),
@@ -19,16 +25,15 @@ connection_url = "mysql+mysqlconnector://{}:{}@{}/{}?charset=utf8mb4".format(
loaded = False
engine = create_engine(connection_url, encoding="utf-8", pool_recycle=3600)
Session = sessionmaker(bind=engine)
-
Base = declarative_base()
class FileTable(Base):
"""Table for storing various hashes of images"""
__tablename__ = 'files'
id = Column(Integer, primary_key=True)
- sha256 = Column(String(36, convert_unicode=True), nullable=False)
+ sha256 = Column(String(64, convert_unicode=True), nullable=False)
phash = Column(BigInteger, nullable=False)
- ext = Column(String(3, convert_unicode=True), nullable=False)
+ ext = Column(String(4, convert_unicode=True), nullable=False)
def toJSON(self):
return {
'id': self.id,
@@ -37,12 +42,21 @@ class FileTable(Base):
'ext': self.ext,
}
+Base.metadata.create_all(engine)
+
def search_by_phash(phash, threshold=6):
"""Search files for a particular phash"""
connection = engine.connect()
- cmd = 'SELECT files.*, BIT_COUNT(phash ^ :phash) as hamming_distance FROM images_image HAVING hamming_distance < :threshold ORDER BY hamming_distance ASC LIMIT 1'
- matches = connection.execute(text(cmd), phash=phash, threshold=threshold)
- return matches
+ cmd = 'SELECT files.*, BIT_COUNT(phash ^ :phash) as hamming_distance FROM files HAVING hamming_distance < :threshold ORDER BY hamming_distance ASC LIMIT 1'
+ matches = connection.execute(text(cmd), phash=phash, threshold=threshold).fetchall()
+ keys = ('id', 'sha256', 'phash', 'ext', 'score')
+ results = [ dict(zip(keys, values)) for values in matches ]
+ return results
+
+def search_by_hash(hash):
+ session = Session()
+ match = session.query(FileTable).filter(FileTable.sha256 == hash)
+ return match.first()
def add_phash(sha256, phash, ext):
"""Add a file to the table"""
@@ -51,4 +65,26 @@ def add_phash(sha256, phash, ext):
)
session = Session()
session.add(rec)
+ session.commit()
session.flush()
+
+def add_phash_by_filename(path):
+ """Add a file by filename, getting all the necessary attributes"""
+ print(path)
+ if not os.path.exists(path):
+ print("File does not exist")
+ return
+
+ dir, fn = os.path.split(path)
+ root, ext = os.path.splitext(fn)
+ ext = ext.strip('.')
+ if ext not in VALID_IMAGE_EXTENSIONS:
+ print("Not an image file")
+ return
+
+ im = Image.open(path).convert('RGB')
+ phash = compute_phash_int(im)
+
+ hash = sha256(path)
+
+ add_phash(sha256=hash, phash=phash, ext=ext)