import re from Config import * import time import urllib import urllib2 import sys import os import random from subprocess import Popen,PIPE,call from Params import Params import time Request = urllib2.Request urlencode = urllib.urlencode urlopen = urllib2.urlopen _max_filename_length = 20; class Pb(object): def __init__(self): self._now = str(int(time.time())); self.params = Params(classname=self.__class__.__name__, now=self._now); self._files_created = [] self.commands = []; self._working_dir = WORKING_DIR def _filename_create(self, url=None, namepart="", extension=""): if url: _basename = os.path.basename(url) namepart = re.split(r'\.', _basename)[0] namepart = self._url_sanitize(namepart)[0:_max_filename_length] name = "" if namepart: name += "%s-" % namepart name += "%s_%s" % (self.__class__.__name__, self._now) if self.params.username : name += "_%s" % self.params.username if extension: name += ".%s" % extension return name def _filepath_create(self, filename, directory=WORKING_DIR): return os.path.join(directory, filename) def _filename_filepath_create(self, url=None, namepart="", directory=WORKING_DIR, extension=""): _filename = self._filename_create(url=url, namepart=namepart, extension=extension); _filepath = self._filepath_create(_filename, directory=directory); return _filename, _filepath def _tempfilepath_create(self, namepart="temp", directory=WORKING_DIR, extension=""): _filename = self._filename_create(namepart=namepart, extension=extension) return self._filepath_create(_filename, directory=directory) def _url_sanitize (self, s): return re.sub(r'\W+', '', s) def _call_cmd(self, cmd): try: cmd = map(lambda i: str(i), cmd) call(cmd) self.commands.append(" ".join(cmd)); except Exception: raise Exception("Unable to call cmd {}".format(str(cmd))) @staticmethod def dimensions (filepath): #works in lieu of a mimetype check (it reads the header as well) ident = (Popen([BIN_IDENTIFY, filepath], stdout=PIPE).communicate()[0]).split(" ") return ident[2].split("x") def _width_and_height_set(self, filepath=None, width=DEFAULT_WIDTH, height=DEFAULT_HEIGHT): if filepath: self.width, self.height = Pb.dimensions(filepath) return self.width = width self.height = height @staticmethod def file_size (filepath): try: return os.stat(filepath)[6] except Exception as e: sys.stderr.write("Couldn't determine filesize\n") sys.stderr.write(str(e)+"\n") raise; def _file_read(self, filepath): f = open(filepath, 'r'); data = f.read() f.close() return data def err_warn(self, s): sys.stderr.write("ERROR:{} - {}\n".format(self.__class__.__name__, s)) def _cleanup(self): if not self._files_created: return cmd = ["rm"]+self._files_created self._call_cmd(cmd) def err_fatal(self, s): sys.stderr.write("ERROR[FATAL]:{} - {}\n".format(self.__class__.__name__, s)) sys.exit(1); @classmethod def example_run(cls, params=None, verbose=True): example_params = params or cls.example_params if not example_params: raise AttributeError ("Must supply test params to test %s" % cls.__name__) b = cls(**example_params) b.create(); if verbose: sys.stderr.write("generated %s\n" % b.filepath) sys.stderr.write("files created %s\n" % b._files_created) sys.stderr.write("commands %s" % " ".join(b.commands)) @staticmethod def gif_frames(filepath): try: info = Popen([BIN_IDENTIFY,filepath], stdout=PIPE).communicate()[0] frames = filter((lambda x: x), map( (lambda x: x.split(" ")[0]), (info).split('\n') )) return frames except Exception as e: self.err_warn("couldn't get gif frames") raise e; def _choose_gif_frame(self, filepath): _gif_frames = Pb.gif_frames(filepath) frame = random.choice(_gif_frames) self._call_cmd([BIN_CONVERT, frame, filepath])