1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
from flask import request, jsonify
from flask_classful import FlaskView, route
from werkzeug.datastructures import MultiDict
from werkzeug.utils import secure_filename
import os
import numpy as np
from PIL import Image
from app.settings import app_cfg
from app.sql.common import db, Session
from app.sql.models.upload import Upload
from app.utils.file_utils import sha256_stream, sha256_tree, VALID_IMAGE_EXTS
from app.server.decorators import APIError
class UploadView(FlaskView):
def index(self):
"""
List all uploaded files.
* Query string params: offset, limit, sort (id, date), order (asc, desc)
"""
session = Session()
uploads = session.query(Upload).all()
response = {
'status': 'ok',
'res': [ upload.toJSON() for upload in uploads ],
}
session.close()
return jsonify(response)
def get(self, id):
"""
Fetch a single upload.
"""
session = Session()
upload = session.query(Upload).get(id)
response = {
'status': 'ok',
'res': upload.toJSON(),
}
session.close()
return jsonify(response)
def post(self):
"""
Upload a new file.
* JSON params: username
"""
try:
username = request.form.get('username')
except:
raise APIError('No username specified')
param_name = 'image'
if param_name not in request.files:
raise APIError('No file uploaded')
file = request.files[param_name]
# get sha256
sha256 = sha256_stream(file)
_, ext = os.path.splitext(file.filename)
if ext == '.jpeg':
ext = '.jpg'
# TODO: here check sha256
# upload = Upload.query.get(id)
if ext[1:] not in VALID_IMAGE_EXTS:
return jsonify({ 'status': 'error', 'error': 'Not a valid image' })
# convert string of image data to uint8
file.seek(0)
nparr = np.fromstring(file.read(), np.uint8)
# decode image
try:
im = Image.fromarray(nparr)
except:
return jsonify({ 'status': 'error', 'error': 'Image parse error' })
session = Session()
upload = session.query(Upload).filter_by(sha256=sha256).first()
if upload is not None:
print("Already uploaded image")
response = {
'status': 'ok',
'notes': 'Image already uploaded',
'res': upload.toJSON(),
}
session.close()
return jsonify(response)
uploaded_im_fn = secure_filename(sha256 + ext)
uploaded_im_abspath = os.path.join(app_cfg.DIR_UPLOADS, sha256_tree(sha256))
uploaded_im_fullpath = os.path.join(uploaded_im_abspath, uploaded_im_fn)
os.makedirs(uploaded_im_abspath, exist_ok=True)
nparr.tofile(uploaded_im_fullpath)
upload = Upload(username=username, sha256=sha256, ext=ext)
session.add(upload)
session.commit()
response = {
'status': 'ok',
'res': upload.toJSON(),
}
session.close()
return jsonify(response)
def delete(self, id):
"""
Delete an uploaded file.
"""
session = Session()
upload = session.query(Upload).get(id)
if not upload:
session.close()
return jsonify({
'status': 'error',
'error': 'not found',
})
sha256 = upload.sha256
uploaded_im_fn = secure_filename(sha256 + upload.ext)
uploaded_im_abspath = os.path.join(app_cfg.DIR_UPLOADS, sha256_tree(sha256))
uploaded_im_fullpath = os.path.join(uploaded_im_abspath, uploaded_im_fn)
if os.path.exists(uploaded_im_fullpath):
print("Removing " + uploaded_im_fullpath)
os.remove(uploaded_im_fullpath)
session.delete(upload)
session.commit()
response = {
'status': 'ok',
'id': id,
}
session.close()
return jsonify(response)
|