diff options
Diffstat (limited to 'impattern/pbutils.py')
| -rw-r--r-- | impattern/pbutils.py | 235 |
1 files changed, 0 insertions, 235 deletions
diff --git a/impattern/pbutils.py b/impattern/pbutils.py deleted file mode 100644 index 26e0378..0000000 --- a/impattern/pbutils.py +++ /dev/null @@ -1,235 +0,0 @@ -#!/usr/bin/python
-import time
-import re
-from urllib2 import Request, urlopen
-from subprocess import check_output, call, Popen, PIPE
-from os import stat, path
-from random import randint
-from hashlib import sha1
-import mimetypes
-import s3
-import MySQLdb
-import logging
-from config import *
-
-ACCEPTABLE_FILE_TYPES = [".png", ".jpg", ".gif", ".jpeg"]
-MAX_DOWNLOAD_SIZE = 1024 * 1024 * 1.2
-
-BUCKET_NAME = "i.asdf.us"
-
-MYSQLUSER = "secretuser"
-MYSQLDB = "secretdb"
-MYSQLPW = "secretpw"
-
-BASE_DIR = "/var/www/asdf.us/im/"
-IDENTIFY = "/usr/bin/identify"
-CONVERT = "/usr/bin/convert"
-
-def now():
- return str(int(time.time()))
-
-class pb_log:
- " creates a log for each script "
- def __init__(self, logname):
- self.logger = logging.getLogger(logname)
- hdlr = logging.FileHandler("/var/tmp/photoblaster/"+logname+".log")
- formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
- hdlr.setFormatter(formatter)
- self.logger.addHandler(hdlr)
- self.logger.setLevel(logging.WARNING)
- def log(self, message):
- self.logger.warning(message)
-
-
-def error(s):
- " returns an error and exits the program "
- print("ERROR: "+s)
- exit(1)
-
-def hexdir(filename):
- " creates a two-letter directory name "
- return sha1(filename.encode()).hexdigest()[:2]
-
-def image_dimensions_and_test(filename):
- ident = Popen([IDENTIFY, filename], stdout=PIPE).communicate()[0]
- partz = ident.split(" ")
- filetype = "."+partz[1]
- size = partz[6]
- if filetype.lower() not in ACCEPTABLE_FILE_TYPES:
- error("file was not an image")
- return partz[2].split("x")
-
-#ok is this a little better? yes, add a dot to filetype since ACCEPTABLE_FILE_TYPES have one
-
-def image_dimensions(filename):
- ident = Popen([IDENTIFY, filename], stdout=PIPE).communicate()[0]
- partz = ident.split(" ")
- return partz[2].split("x")
-
-
-def process_form(form, param_list):
- """ converts form returned from form submission into an object with values
- takes the form and splitted param param_list as args """
- return dict([(key, form[key].value() or "" ) for key in param_list])
-
-
-def sanitize(s):
- " sanitizes potential shell commands out of form entries "
- return re.sub("[^a-zA-Z0-9]", '', s)
-
-
-def is_number(s):
- " makes sure that the number entries are numbers, not malicious strings "
- try:
- float(s)
- if s.lower() == "nan": raise ValueError
- return True
- except (ValueError, TypeError):
- error("One of the number values entered is not a number.")
- return False
-
-
-def check_color(colorparam, index, defaultcolors):
- " makes sure that there aren't malicious strings in the colorparam "
- if not colorparam:
- return defaultcolors[index]
- elif colorparam[0] != "#" and "rgb" not in colorparam:
- return sanitize(colorparam)
- else:
- return colorparam
-
-
-class gifCheck:
- " checks to see if file is a gif "
-
- def __init__(self, f):
- self.f = f
- frames = check_output([IDENTIFY, f]).decode().split('\n')
- self.frames = frames.remove('')
-
- def check_anim(self):
- return len(self.frames) > 1
-
- def pick_frame(self):
- if self.check_anim() is True:
- i = randint(0, (len(self.frames) - 1))
- choice = self.f + "[" + str(i) + "]"
- return choice
- else:
- return self.f
-
- def collapse(self):
- choice = self.pick_frame()
- call([CONVERT, choice, self.f])
-
-
-def pb_s3(hexdir,f):
- " sends a file to s3 and returns the new url "
- s3object = '/'.join(('im',hexdir,path.basename(f)))
- conn = s3.AWSAuthConnection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
- with open(f, 'rb') as opened:
- filedata = opened.read()
- content_type = mimetypes.guess_type(f)[0]
- if not content_type:
- content_type = 'text/plain'
- try:
- conn.put(BUCKET_NAME, s3object, s3.S3Object(filedata),
- {'x-amz-acl': 'public-read', 'Content-Type': content_type,
- 'x-amz-storage-class': 'REDUCED_REDUNDANCY'})
- call(['rm',f])
- return "http://i.asdf.us/"+s3object
- except Exception as e:
- return e
-
-
-#so now I need to test the image? yes
-
-def test_image(basename, ext):
- """ checks to make sure the image is an image """
-
-
-def download_image(url, filename=None,final_path=""):
- " downloads an image and stores it in a directory "
- headers = {
- 'User-Agent': 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)',
- 'Accept': '*/*',
- }
- if not filename:
- filename = url.split("/")[-1]
- try:
- req = Request(url, None, headers)
- response = urlopen(req).read()
- if not response: error("File did not exist or was zero-length")
- if len(response) > MAX_DOWNLOAD_SIZE:
- error(
- "file too big: max size %sKB\n %s is %s KB" %
- ( MAX_DOWNLOAD_SIZE/1024, filename, len(response)/1024 )
- )
- f = open(final_path+filename, "w")
- f.write(response)
- f.close()
- return final_path
-
- except Exception as E:
- error('There is a problem with the url or an I/O error: \n %s' % (E))
-
-
-
-def new_filename_from_url(url, username="", nametag=""):
- " creates a safe filename from a url "
- parts = url.rsplit("/", -1)
- try:
- name, filetype = parts.rsplit(".", -1)
- except:
- error("Invalid url") # I could pass in the logging function as an argument if you wanted to store the invalid urls
- if "?" in filetype: filetype = filetype.split("?")[1]
- name = sanitize(name)
- filetype = sanitize(filetype)
- if not ('.' + filetype.lower()) in ACCEPTABLE_FILE_TYPES:
- error("Invalid url")
-
-
- if len(name) > 32: name = "__abridged"
- name = now()+"_"+name
-
- if username: name += "_" + username
- if nametag: name += "_" + nametag
- return name +"."+filetype
-
-def new_filename(filetype, username="", nametag=""):
- " creates a new image filename "
- parts = now()
- if username: parts += "_"+username
- if nametag: parts += "_"+nametag
- return parts+"."+filetype
-
-
-def file_size (file):
- " returns the file size in bytes "
- return stat(file)[6]
-
-
-class db:
- " initializes the database connection "
- def __init__ (self):
- self.conn = None
- self.cursor = None
- self.connect()
-
- def connect (self):
- self.conn = MySQLdb.connect (host = "localhost",
- user = MYSQLUSER,
- passwd = MYSQLPW,
- db = MYSQLDB)
- self.cursor = self.conn.cursor ()
-
- def execute (self,sql,args=()):
- try:
- self.cursor.execute(sql,args)
- except MySQLdb.Error, e:
- error(" %d: %s" % (e.args[0], e.args[1]))
-
- def lastinsertid (self):
- return self.conn.insert_id()
-
-#pb_db = db()
|
