summaryrefslogtreecommitdiff
path: root/animism-align/cli/app/utils/s3_utils.py
blob: d8cff79a1ca9f77091fa8b86c0bfde8952825f4f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import os
import glob
from os.path import join
from dataclasses import dataclass
from glob import glob
from pathlib import Path
import logging

import dacite  # dataclass helper util
import boto3


@dataclass
class S3Config:
  S3_BUCKET: str
  S3_KEY: str
  S3_SECRET: str
  S3_ENDPOINT: str
  S3_REGION: str


class RemoteStorageS3:

  def __init__(self):

    self.log = logging.getLogger('swimmer')

    self.s3_cfg = dacite.from_dict(data_class=S3Config, data=os.environ)

    self.session = boto3.session.Session()

    self.s3_client = self.session.client(
      service_name='s3',
      aws_access_key_id=self.s3_cfg.S3_KEY,
      aws_secret_access_key=self.s3_cfg.S3_SECRET,
      endpoint_url=self.s3_cfg.S3_ENDPOINT,
      region_name=self.s3_cfg.S3_REGION,
    )


  def list_dir(self, fp_dir_remote):
    '''Sync local directory to remote directory
    '''
    
    obj_list_remote = self.s3_client.list_objects(
      Bucket=self.s3_cfg.S3_BUCKET, 
      Prefix=fp_dir_remote)


    for obj in obj_list_remote.get('Contents', []):
      s3_fn = obj['Key']
      self.log.debug(s3_fn)


  def sync_dir(self, fp_dir_local, fp_dir_remote):
    '''Sync local directory to remote directory
    '''
    
    # get list of local files
    fps_local = glob(join(fp_dir_local, '*'))
    fp_local_lkup = {}
    for fp in fps_local:
      fp_local_lkup[Path(fp).name] = fp

    # get list of remote files
    obj_list_remote = self.s3_client.list_objects(Bucket=self.s3_cfg.S3_BUCKET, Prefix=fp_dir_remote)
    # check if remove files exist locally
    if 'Contents' in obj_list_remote:
      for obj in obj_list_remote['Contents']:
        s3_fn = obj['Key']
        fn_remote = Path(s3_fn).name
        if fn_remote in fp_local_lkup.keys():
          # remove from queue
          # compare timestamps
          fp_local = fp_local_lkup[fn_remote]
          del fp_local_lkup[fn_remote]
          if obj['LastModified'].timestamp() < os.path.getmtime(fp_local):
            self.log.debug("Update s3 with newer local file: {}".format(s3_fn))
            self.s3_client.upload_file(
              fp_local,
              self.s3_cfg.S3_BUCKET,
              s3_fn,
              ExtraArgs={'ACL': 'public-read' })
          else:
            self.log.debug(f'Skipping same file: {s3_fn}')
        else:
          self.log.debug(f'Orphan remote file: {s3_fn}')
          self.log.debug("s3 delete {}".format(s3_fn))
          response = self.s3_client.delete_object(
            Bucket=self.s3_cfg.S3_BUCKET,
            Key=s3_fn,
          )
    else:
      self.log.debug(f'No "Contents" in {obj_list_remote.keys()}')

    # put the remaining files to S3
    for fn_local, fp_local in fp_local_lkup.items():
      s3_fn = join(fp_dir_remote, fn_local)
      self.log.debug("s3 create {}".format(s3_fn))
      self.s3_client.upload_file(
        fp_local,
        os.getenv('S3_BUCKET'),
        s3_fn,
        ExtraArgs={ 'ACL': 'public-read' })


  def sync_file(self, fp_local, fp_remote):
    '''Sync local file to remove file
    '''
    self.log.warn('Not yet implemented')


  #def make_s3_path(s3_dir, metadata_path):
  #  return "{}/{}/{}{}".format(os.getenv('S3_ENDPOINT'), os.getenv('S3_BUCKET'), s3_dir, metadata_path)