diff options
Diffstat (limited to 'cli')
| -rw-r--r-- | cli/app/settings/app_cfg.py | 10 | ||||
| -rw-r--r-- | cli/app/thesaurus/api.py | 71 | ||||
| -rw-r--r-- | cli/app/utils/click_factory.py | 145 | ||||
| -rw-r--r-- | cli/app/utils/logger_utils.py | 68 | ||||
| -rw-r--r-- | cli/app/utils/util.py | 15 | ||||
| -rwxr-xr-x | cli/cli.py | 49 | ||||
| -rw-r--r-- | cli/commands/api/category.py | 19 | ||||
| -rw-r--r-- | cli/commands/api/search.py | 19 |
8 files changed, 396 insertions, 0 deletions
diff --git a/cli/app/settings/app_cfg.py b/cli/app/settings/app_cfg.py new file mode 100644 index 0000000..952e76b --- /dev/null +++ b/cli/app/settings/app_cfg.py @@ -0,0 +1,10 @@ +import os + +CLICK_GROUPS = { + 'api': 'commands/api', +} + +DATA_STORE = 'data_store' + +SEARCH_PATH = os.path.join(DATA_STORE, "search") +CATEGORIES_PATH = os.path.join(DATA_STORE, "categories") diff --git a/cli/app/thesaurus/api.py b/cli/app/thesaurus/api.py new file mode 100644 index 0000000..ad0dd92 --- /dev/null +++ b/cli/app/thesaurus/api.py @@ -0,0 +1,71 @@ +import os +import requests + +from app.utils.util import * +from app.settings import app_cfg + +class Thesaurus: + def __init__(self): + self.api = ThesaurusAPI() + + def load(self, base_path, word, api_fn): + sha = sha256(word) + hash_path = os.path.join(base_path, sha[0:2]) + os.makedirs(hash_path, exist_ok=True) + path = os.path.join(hash_path, word + '.json') + if os.path.exists(path): + return read_json(path) + data = api_fn(word) + write_json(path, data) + return data + + def search(self, word): + return self.load(app_cfg.SEARCH_PATH, word, self.api.search) + + def category(self, id): + return self.load(app_cfg.CATEGORY_PATH, str(id), self.api.category) + +class ThesaurusAPI: + SEARCH_ENDPOINT = "https://ht.ac.uk/category-selection/" + CATEGORY_ENDPOINT = "https://ht.ac.uk/api/v1/loadCategory-v2.php" + HEADERS = { + 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36', + } + + def search(self, word): + query = { + 'qsearch': word, + } + resp = requests.get(self.SEARCH_ENDPOINT, params=query, headers=self.HEADERS) + if resp.status_code != 200: + return [] + data = resp.text + data = data.split('<div id="resultsTimelineData">') + data = data[0].split('</div>') + return json.loads(data) + + def category(self, id): + query = { + 'id': id, + } + resp = requests.get(self.CATEGORY_ENDPOINT, params=query, headers=self.HEADERS) + if resp.status_code != 200: + return "" + raw = resp.text + classification = raw.split("<span style='font-size: 0.6em'>")[1].split('</span>')[0] + category = raw.split("<br />")[1].split('</h2>')[0] + raw_words = raw.split('<b>')[1:] + words = [] + for word in raw_words: + word, rest = word.split('</b>') + years = word.split(' <span')[0].trim() + words.append({ + 'word': word, + 'years': years, + }) + return { + 'id': id, + 'category': category, + 'classification': classification, + 'words': words, + }
\ No newline at end of file diff --git a/cli/app/utils/click_factory.py b/cli/app/utils/click_factory.py new file mode 100644 index 0000000..61a3b5e --- /dev/null +++ b/cli/app/utils/click_factory.py @@ -0,0 +1,145 @@ +""" +Click processor factory +- Inspired by and used code from @wiretapped's HTSLAM codebase +- In particular the very useful +""" + +import os +import sys +from os.path import join +from pathlib import Path +import os +from os.path import join +import sys +from functools import update_wrapper, wraps +import itertools +from pathlib import Path +from glob import glob +import importlib +import logging + +import click +from app.settings import app_cfg as cfg + + +# -------------------------------------------------------- +# Click Group Class +# -------------------------------------------------------- + +# set global variable during parent class create +dir_plugins = None # set in create + +class ClickComplex: + """Wrapper generator for custom Click CLI's based on LR's coroutine""" + + def __init__(self): + pass + + + class CustomGroup(click.Group): + #global dir_plugins # from CliGenerator init + + # lists commands in plugin directory + def list_commands(self, ctx): + global dir_plugins # from CliGenerator init + rv = list(self.commands.keys()) + fp_cmds = [Path(x) for x in Path(dir_plugins).iterdir() \ + if str(x).endswith('.py') \ + and '__init__' not in str(x)] + for fp_cmd in fp_cmds: + try: + assert fp_cmd.name not in rv, "[-] Error: {} can't exist in cli.py and {}".format(fp_cmd.name) + except Exception as ex: + logging.getLogger('app').error('{}'.format(ex)) + rv.append(fp_cmd.stem) + rv.sort() + return rv + + # Complex version: gets commands in directory and in this file + # Based on code from @wiretapped + HTSLAM + def get_command(self, ctx, cmd_name): + global dir_plugins + if cmd_name in self.commands: + return self.commands[cmd_name] + ns = {} + fpp_cmd = Path(dir_plugins, cmd_name + '.py') + fp_cmd = fpp_cmd.as_posix() + if not fpp_cmd.exists(): + sys.exit('[-] {} file does not exist'.format(fpp_cmd)) + code = compile(fpp_cmd.read_bytes(), fp_cmd, 'exec') + try: + eval(code, ns, ns) + except Exception as ex: + logging.getLogger('vframe').error('exception: {}'.format(ex)) + @click.command() + def _fail(): + raise Exception('while loading {}'.format(fpp_cmd.name)) + _fail.short_help = repr(ex) + _fail.help = repr(ex) + return _fail + if 'cli' not in ns: + sys.exit('[-] Error: {} does not contain a cli function'.format(fp_cmd)) + return ns['cli'] + + @classmethod + def create(self, dir_plugins_local): + global dir_plugins + dir_plugins = dir_plugins_local + return self.CustomGroup + + + +class ClickSimple: + """Wrapper generator for custom Click CLI's""" + + def __init__(self): + pass + + + class CustomGroup(click.Group): + #global dir_plugins # from CliGenerator init + + # lists commands in plugin directory + def list_commands(self, ctx): + global dir_plugins # from CliGenerator init + rv = list(self.commands.keys()) + fp_cmds = [Path(x) for x in Path(dir_plugins).iterdir() \ + if str(x).endswith('.py') \ + and '__init__' not in str(x)] + for fp_cmd in fp_cmds: + assert fp_cmd.name not in rv, "[-] Error: {} can't exist in cli.py and {}".format(fp_cmd.name) + rv.append(fp_cmd.stem) + rv.sort() + return rv + + # Complex version: gets commands in directory and in this file + # from HTSLAM + def get_command(self, ctx, cmd_name): + global dir_plugins # from CliGenerator init + if cmd_name in self.commands: + return self.commands[cmd_name] + ns = {} + fpp_cmd = Path(dir_plugins, cmd_name + '.py') + fp_cmd = fpp_cmd.as_posix() + if not fpp_cmd.exists(): + sys.exit('[-] {} file does not exist'.format(fpp_cmd)) + code = compile(fpp_cmd.read_bytes(), fp_cmd, 'exec') + try: + eval(code, ns, ns) + except Exception as ex: + logging.getLogger('vframe').error('exception: {}'.format(ex)) + @click.command() + def _fail(): + raise Exception('while loading {}'.format(fpp_cmd.name)) + _fail.short_help = repr(ex) + _fail.help = repr(ex) + return _fail + if 'cli' not in ns: + sys.exit('[-] Error: {} does not contain a cli function'.format(fp_cmd)) + return ns['cli'] + + @classmethod + def create(self, dir_plugins_local): + global dir_plugins + dir_plugins = dir_plugins_local + return self.CustomGroup diff --git a/cli/app/utils/logger_utils.py b/cli/app/utils/logger_utils.py new file mode 100644 index 0000000..f7c9eec --- /dev/null +++ b/cli/app/utils/logger_utils.py @@ -0,0 +1,68 @@ +""" +Logger instantiator for use with Click utlity scripts +""" +import sys +import os +import logging + +import colorlog + +from app.settings import app_cfg as cfg + + +class Logger: + + logger_name = 'MEGAPIXELS' + + def __init__(self): + pass + + @staticmethod + def create(verbosity=4, logfile=None): + """Configures a logger from click params + :param verbosity: (int) between 0 and 5 + :param logfile: (str) path to logfile + :returns: logging root object + """ + + loglevel = (5 - (max(0, min(verbosity, 5)))) * 10 # where logging.DEBUG = 10 + date_format = '%Y-%m-%d %H:%M:%S' + if 'colorlog' in sys.modules and os.isatty(2): + cformat = '%(log_color)s' + cfg.LOGFILE_FORMAT + f = colorlog.ColoredFormatter(cformat, date_format, + log_colors = { 'DEBUG' : 'yellow', 'INFO' : 'white', + 'WARNING' : 'bold_yellow', 'ERROR': 'bold_red', + 'CRITICAL': 'bold_red' }) + else: + f = logging.Formatter(cfg.LOGFILE_FORMAT, date_format) + + # logger = logging.getLogger(Logger.logger_name) + logger = logging.getLogger(cfg.LOGGER_NAME) + logger.setLevel(loglevel) + + if logfile: + # create file handler which logs even debug messages + fh = logging.FileHandler(logfile) + fh.setLevel(loglevel) + logger.addHandler(fh) + + # add colored handler + ch = logging.StreamHandler() + ch.setFormatter(f) + logger.addHandler(ch) + + if verbosity == 0: + logger.disabled = True + + # test + # logger.debug('Hello Debug') + # logger.info('Hello Info') + # logger.warn('Hello Warn') + # logger.error('Hello Error') + # logger.critical('Hello Critical') + + return logger + + @staticmethod + def getLogger(): + return logging.getLogger(cfg.LOGGER_NAME)
\ No newline at end of file diff --git a/cli/app/utils/util.py b/cli/app/utils/util.py new file mode 100644 index 0000000..5f72088 --- /dev/null +++ b/cli/app/utils/util.py @@ -0,0 +1,15 @@ +import simplejson as json +from hashlib import sha256 + +def sha256(s): + sha256 = hashlib.sha256() + sha256.update(s) + return sha256.hexdigest() + +def read_json(fn): + with open(fn, 'r') as json_file: + return json.load(json_file) + +def write_json(fn, data): + with open(fn, 'w') as outfile: + json.dump(data, outfile) diff --git a/cli/cli.py b/cli/cli.py new file mode 100755 index 0000000..516fe49 --- /dev/null +++ b/cli/cli.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python + +# -------------------------------------------------------- +# VFRAME Synthetic Data cli +# -------------------------------------------------------- + +import click + +from app.settings import app_cfg +from app.utils import logger_utils +from app.utils.click_factory import ClickSimple + +# -------------------------------------------------------- +# Entrypoint +# -------------------------------------------------------- + +if __name__ == '__main__': + + import os + import sys + import argparse + + # argparse: intercept group + argv_tmp = sys.argv + sys.argv = sys.argv[:2] + ap = argparse.ArgumentParser('\033[1m\033[94mHistorical Thesaurus\033[0m') + ap.add_argument('group', choices=app_cfg.CLICK_GROUPS.keys()) + args = ap.parse_args() + sys.argv = argv_tmp + sys.argv.pop(1) # remove group + + # click: parse rest of argv + cc = ClickSimple.create(app_cfg.CLICK_GROUPS[args.group]) + @click.group(cls=cc, chain=False, no_args_is_help=True) + @click.option('-v', '--verbose', 'opt_verbosity', count=True, default=4, + show_default=True, + help='Verbosity: -v DEBUG, -vv INFO, -vvv WARN, -vvvv ERROR, -vvvvv CRITICAL') + @click.pass_context + def cli(ctx, opt_verbosity): + """\033[1m\033[94mTHESAURUS\033[0m + """ + ctx.opts = {} + logger_utils.Logger.create(verbosity=opt_verbosity) # init logger + + # ------------------------------------------------------------ + # entrypoint + # ------------------------------------------------------------ + + cli() diff --git a/cli/commands/api/category.py b/cli/commands/api/category.py new file mode 100644 index 0000000..5b688f0 --- /dev/null +++ b/cli/commands/api/category.py @@ -0,0 +1,19 @@ +""" +Browse a category +""" + +import click +import simplejson as json + +from app.thesaurus.api import Thesaurus + +@click.command() +@click.option('-c', '--id', 'opt_cat_id', required=True, + help='Category ID') +@click.pass_context +def cli(ctx, opt_cat_id): + """Browse a category + """ + thesaurus = Thesaurus() + results = thesaurus.category(opt_cat_id) + print(json.dumps(results, indent=2)) diff --git a/cli/commands/api/search.py b/cli/commands/api/search.py new file mode 100644 index 0000000..3c0dc0e --- /dev/null +++ b/cli/commands/api/search.py @@ -0,0 +1,19 @@ +""" +Search for a word +""" + +import click +import simplejson as json + +from app.thesaurus.api import Thesaurus + +@click.command() +@click.option('-w', '--word', 'opt_word', required=True, + help='Word to search') +@click.pass_context +def cli(ctx, opt_word): + """Search for a word + """ + thesaurus = Thesaurus() + results = thesaurus.search(opt_word) + print(json.dumps(results, indent=2)) |
