"""Create screenshots for YouTube.com URLs in the IJB dataset TODO - grey out boxes in sidebar - resize driver screenshot area to include author text Installing webdrivers: Chrome wget https://chromedriver.storage.googleapis.com/73.0.3683.20/chromedriver_linux64.zip Firefox wget https://github.com/mozilla/geckodriver/releases/download/v0.24.0/geckodriver-v0.24.0-linux64.tar.gz PhantomJS npm install -g phantomjs """ import click from app.settings import app_cfg # The search result title must contain one of these words valid_title_words = ['embassy', 'botschaft'] @click.command() @click.option('-i', '--input', 'opt_fp_in', required=True, help='Input CSV with list of embassies') @click.option('-o', '--output', 'opt_fp_out', required=True, help='Output CSV') @click.option('-t', '--threads', 'opt_threads', default=20, help='Number of threads') @click.option('--slice', 'opt_slice', type=(int, int), default=(None, None), help='Slice list of files') @click.pass_context def cli(ctx, opt_fp_in, opt_fp_out, opt_threads, opt_slice, opt_verify): """IJB-C screenshot sources""" import sys import os from glob import glob from os.path import join from pathlib import Path import time from functools import partial from multiprocessing.dummy import Pool as ThreadPool import pandas as pd from PIL import Image import io from tqdm import tqdm from selenium import webdriver from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.common.by import By from app.utils import file_utils, im_utils, logger_utils log = logger_utils.Logger.getLogger() chrome_options = webdriver.ChromeOptions() chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--headless') chrome_options.add_argument('--disable-dev-shm-usage') def pool_process(route, chrome_options): # Threaded image resize function try: pbar.update(1) driver = webdriver.Chrome(chrome_options=chrome_options) driver.set_window_size(1920,3600) # accommodate vertical videos url = route['url'] fp_out = route['dst'] log.debug(f'url: {url}, dst: {fp_out}') driver.get(url) if 'youtube.com' in url: try: #wait = WebDriverWait(driver, 10).until(EC.visibility_of_element_located((By.CLASS_NAME,'ytd-video-secondary-info-renderer'))) wait = WebDriverWait(driver, 20).until(EC.visibility_of_element_located((By.ID,'player-container-outer'))) except Exception as e: log.debug(f'WebDriver error: {e}') pass else: wait = WebDriverWait(driver,10) time.sleep(1) # wait for element time.sleep(5) # wait for element #el_vid = driver.find_element_by_id('player-container-outer') el_shelf = driver.find_element_by_id('results_links_deep') el_related = driver.find_element_by_id('related') el_primary = driver.find_element_by_id('primary') err = False try: el_error = driver.find_element_by_id('error-screen') if not(el_error.location['x'] == 0 and el_error.location['width'] == 0): err = True except: pass margin_left = 24 margin_bottom = 24 if err else 0 box = (el_primary.location['x'] - margin_left, el_primary.location['y'], el_primary.location['x'] + el_primary.size['width'], el_shelf.location['y'] + margin_bottom) im_bytes = driver.get_screenshot_as_png() im = Image.open(io.BytesIO(im_bytes)) im = im.crop(box) file_utils.mkdirs(fp_out) log.debug(f'save to: {fp_out}') #driver.get_screenshot_as_file(fp_out) im.save(fp_out) driver.quit() return True except: return False # load routes = [] video_ids = [] df_licenses = pd.read_csv(opt_fp_in) log.info(f'{len(df_licenses)} rows') for df_idx, df_license in tqdm(df_licenses.iterrows(), total=len(df_licenses)): filepath = df_license['Media ID'] if not 'video/' in filepath: continue url = str(df_license['Media URL']) try: video_id = url.split('?v=')[1] except Exception as e: log.debug(f'error parsing url: "{url}"') if video_id in video_ids: continue video_ids.append(video_id) if not ('http://' in url or 'https://' in url): url = 'http://' + url #fp_media = filepath.replace(Path(filepath).suffix, '.png') #fp_out = join(opt_fp_out, fp_media) fp_out = join(opt_fp_out, f'{video_id}.png') if Path(fp_out).exists() and (os.stat(fp_out).st_size // 1000) > 13: continue obj = {'url': url, 'dst': fp_out} routes.append(obj) if opt_slice: routes = routes[opt_slice[0]:opt_slice[1]] log.debug(f'processing: {len(routes)}') # setup multithreading results = [] pbar = tqdm(total=len(routes)) pool_process = partial(pool_process, chrome_options=chrome_options) pool = ThreadPool(opt_threads) with tqdm(total=len(routes)) as pbar: results = pool.map(pool_process, routes) pbar.close() #wait = WebDriverWait(driver,3).until(EC.presence_of_element_located((By.CLASS_NAME,'ytd-watch-next-secondary-results-renderer'))) #wait = WebDriverWait(driver,3).until(EC.text_to_be_present_in_element_value((By.CLASS_NAME,'yt-next-continuation'), 'show')) #wait = WebDriverWait(driver,10).until(EC.presence_of_element_located((By.CLASS_NAME,'ytd-video-secondary-info-renderer'))) #driver.execute_script("document.getElementById('related').style.display = 'None';") ''' title_is title_contains presence_of_element_located visibility_of_element_located visibility_of presence_of_all_elements_located text_to_be_present_in_element text_to_be_present_in_element_value frame_to_be_available_and_switch_to_it invisibility_of_element_located element_to_be_clickable - it is Displayed and Enabled. staleness_of element_to_be_selected element_located_to_be_selected element_selection_state_to_be element_located_selection_state_to_be alert_is_present '''