Coverage for ivatar/views.py: 59%

273 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-17 23:10 +0000

1# -*- coding: utf-8 -*- 

2""" 

3views under / 

4""" 

5from io import BytesIO 

6from os import path 

7import hashlib 

8from urllib.request import urlopen 

9from urllib.error import HTTPError, URLError 

10from ssl import SSLError 

11from django.views.generic.base import TemplateView, View 

12from django.http import HttpResponse, HttpResponseRedirect 

13from django.http import HttpResponseNotFound, JsonResponse 

14from django.core.exceptions import ObjectDoesNotExist 

15from django.core.cache import cache, caches 

16from django.utils.translation import gettext_lazy as _ 

17from django.urls import reverse_lazy 

18from django.db.models import Q 

19from django.contrib.auth.models import User 

20 

21from PIL import Image 

22 

23from monsterid.id import build_monster as BuildMonster 

24import Identicon 

25from pydenticon5 import Pydenticon5 

26import pagan 

27from robohash import Robohash 

28 

29from ivatar.settings import AVATAR_MAX_SIZE, JPEG_QUALITY, DEFAULT_AVATAR_SIZE 

30from ivatar.settings import CACHE_RESPONSE 

31from ivatar.settings import CACHE_IMAGES_MAX_AGE 

32from ivatar.settings import TRUSTED_DEFAULT_URLS 

33from .ivataraccount.models import ConfirmedEmail, ConfirmedOpenId 

34from .ivataraccount.models import UnconfirmedEmail, UnconfirmedOpenId 

35from .ivataraccount.models import Photo 

36from .ivataraccount.models import pil_format, file_format 

37from .utils import is_trusted_url, mm_ng, resize_animated_gif 

38 

39URL_TIMEOUT = 5 # in seconds 

40 

41 

42def get_size(request, size=DEFAULT_AVATAR_SIZE): 

43 """ 

44 Get size from the URL arguments 

45 """ 

46 sizetemp = None 

47 if "s" in request.GET: 

48 sizetemp = request.GET["s"] 

49 if "size" in request.GET: 

50 sizetemp = request.GET["size"] 

51 if sizetemp: 

52 if sizetemp != "" and sizetemp is not None and sizetemp != "0": 

53 try: 

54 if int(sizetemp) > 0: 

55 size = int(sizetemp) 

56 # Should we receive something we cannot convert to int, leave 

57 # the user with the default value of 80 

58 except ValueError: 

59 pass 

60 

61 if size > int(AVATAR_MAX_SIZE): 

62 size = int(AVATAR_MAX_SIZE) 

63 return size 

64 

65 

66class CachingHttpResponse(HttpResponse): 

67 """ 

68 Handle caching of response 

69 """ 

70 

71 def __init__( 

72 self, 

73 uri, 

74 content=b"", 

75 content_type=None, 

76 status=200, # pylint: disable=too-many-arguments 

77 reason=None, 

78 charset=None, 

79 ): 

80 if CACHE_RESPONSE: 

81 caches["filesystem"].set( 

82 uri, 

83 { 

84 "content": content, 

85 "content_type": content_type, 

86 "status": status, 

87 "reason": reason, 

88 "charset": charset, 

89 }, 

90 ) 

91 super().__init__(content, content_type, status, reason, charset) 

92 

93 

94class AvatarImageView(TemplateView): 

95 """ 

96 View to return (binary) image, based on OpenID/Email (both by digest) 

97 """ 

98 

99 # TODO: Do cache resize images!! Memcached? 

100 

101 def options(self, request, *args, **kwargs): 

102 response = HttpResponse("", content_type="text/plain") 

103 response["Allow"] = "404 mm mp retro pagan wavatar monsterid robohash identicon" 

104 return response 

105 

106 def get( 

107 self, request, *args, **kwargs 

108 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,too-many-return-statements 

109 """ 

110 Override get from parent class 

111 """ 

112 model = ConfirmedEmail 

113 size = get_size(request) 

114 imgformat = "png" 

115 obj = None 

116 default = None 

117 forcedefault = False 

118 gravatarredirect = False 

119 gravatarproxy = True 

120 uri = request.build_absolute_uri() 

121 

122 # Check the cache first 

123 if CACHE_RESPONSE: 

124 centry = caches["filesystem"].get(uri) 

125 if centry: 

126 # For DEBUG purpose only print('Cached entry for %s' % uri) 

127 return HttpResponse( 

128 centry["content"], 

129 content_type=centry["content_type"], 

130 status=centry["status"], 

131 reason=centry["reason"], 

132 charset=centry["charset"], 

133 ) 

134 

135 # In case no digest at all is provided, return to home page 

136 if "digest" not in kwargs: 

137 return HttpResponseRedirect(reverse_lazy("home")) 

138 

139 if "d" in request.GET: 

140 default = request.GET["d"] 

141 if "default" in request.GET: 

142 default = request.GET["default"] 

143 

144 if default is not None: 

145 if TRUSTED_DEFAULT_URLS is None: 

146 print("Query parameter `default` is disabled.") 

147 default = None 

148 elif default.find("://") > 0: 

149 # Check if it's trusted, if not, reset to None 

150 trusted_url = is_trusted_url(default, TRUSTED_DEFAULT_URLS) 

151 

152 if not trusted_url: 

153 print( 

154 "Default URL is not in trusted URLs: '%s' ; Kicking it!" 

155 % default 

156 ) 

157 default = None 

158 

159 if "f" in request.GET: 

160 if request.GET["f"] == "y": 

161 forcedefault = True 

162 if "forcedefault" in request.GET: 

163 if request.GET["forcedefault"] == "y": 

164 forcedefault = True 

165 

166 if "gravatarredirect" in request.GET: 

167 if request.GET["gravatarredirect"] == "y": 

168 gravatarredirect = True 

169 

170 if "gravatarproxy" in request.GET: 

171 if request.GET["gravatarproxy"] == "n": 

172 gravatarproxy = False 

173 

174 try: 

175 obj = model.objects.get(digest=kwargs["digest"]) 

176 except ObjectDoesNotExist: 

177 try: 

178 obj = model.objects.get(digest_sha256=kwargs["digest"]) 

179 except ObjectDoesNotExist: 

180 model = ConfirmedOpenId 

181 try: 

182 d = kwargs["digest"] # pylint: disable=invalid-name 

183 # OpenID is tricky. http vs. https, versus trailing slash or not 

184 # However, some users eventually have added their variations already 

185 # and therfore we need to use filter() and first() 

186 obj = model.objects.filter( 

187 Q(digest=d) 

188 | Q(alt_digest1=d) 

189 | Q(alt_digest2=d) 

190 | Q(alt_digest3=d) 

191 ).first() 

192 except Exception: # pylint: disable=bare-except 

193 pass 

194 

195 # If that mail/openid doesn't exist, or has no photo linked to it 

196 if not obj or not obj.photo or forcedefault: 

197 gravatar_url = ( 

198 "https://secure.gravatar.com/avatar/" 

199 + kwargs["digest"] 

200 + "?s=%i" % size 

201 ) 

202 

203 # If we have redirection to Gravatar enabled, this overrides all 

204 # default= settings, except forcedefault! 

205 if gravatarredirect and not forcedefault: 

206 return HttpResponseRedirect(gravatar_url) 

207 

208 # Request to proxy Gravatar image - only if not forcedefault 

209 if gravatarproxy and not forcedefault: 

210 url = ( 

211 reverse_lazy("gravatarproxy", args=[kwargs["digest"]]) 

212 + "?s=%i" % size 

213 ) 

214 # Ensure we do not convert None to string 'None' 

215 if default: 

216 url += "&default=%s" % default 

217 return HttpResponseRedirect(url) 

218 

219 # Return the default URL, as specified, or 404 Not Found, if default=404 

220 if default: 

221 # Proxy to gravatar to generate wavatar - lazy me 

222 if str(default) == "wavatar": 

223 url = ( 

224 reverse_lazy("gravatarproxy", args=[kwargs["digest"]]) 

225 + "?s=%i" % size 

226 + "&default=%s&f=y" % default 

227 ) 

228 return HttpResponseRedirect(url) 

229 

230 if str(default) == str(404): 

231 return HttpResponseNotFound(_("<h1>Image not found</h1>")) 

232 

233 if str(default) == "monsterid": 

234 monsterdata = BuildMonster(seed=kwargs["digest"], size=(size, size)) 

235 data = BytesIO() 

236 monsterdata.save(data, "PNG", quality=JPEG_QUALITY) 

237 data.seek(0) 

238 response = CachingHttpResponse(uri, data, content_type="image/png") 

239 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE 

240 return response 

241 

242 if str(default) == "robohash": 

243 roboset = "any" 

244 if request.GET.get("robohash"): 

245 roboset = request.GET.get("robohash") 

246 robohash = Robohash(kwargs["digest"]) 

247 robohash.assemble(roboset=roboset, sizex=size, sizey=size) 

248 data = BytesIO() 

249 robohash.img.save(data, format="png") 

250 data.seek(0) 

251 response = CachingHttpResponse(uri, data, content_type="image/png") 

252 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE 

253 return response 

254 

255 if str(default) == "retro": 

256 identicon = Identicon.render(kwargs["digest"]) 

257 data = BytesIO() 

258 img = Image.open(BytesIO(identicon)) 

259 img = img.resize((size, size), Image.LANCZOS) 

260 img.save(data, "PNG", quality=JPEG_QUALITY) 

261 data.seek(0) 

262 response = CachingHttpResponse(uri, data, content_type="image/png") 

263 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE 

264 return response 

265 

266 if str(default) == "pagan": 

267 paganobj = pagan.Avatar(kwargs["digest"]) 

268 data = BytesIO() 

269 img = paganobj.img.resize((size, size), Image.LANCZOS) 

270 img.save(data, "PNG", quality=JPEG_QUALITY) 

271 data.seek(0) 

272 response = CachingHttpResponse(uri, data, content_type="image/png") 

273 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE 

274 return response 

275 

276 if str(default) == "identicon": 

277 p = Pydenticon5() # pylint: disable=invalid-name 

278 # In order to make use of the whole 32 bytes digest, we need to redigest them. 

279 newdigest = hashlib.md5( 

280 bytes(kwargs["digest"], "utf-8") 

281 ).hexdigest() 

282 img = p.draw(newdigest, size, 0) 

283 data = BytesIO() 

284 img.save(data, "PNG", quality=JPEG_QUALITY) 

285 data.seek(0) 

286 response = CachingHttpResponse(uri, data, content_type="image/png") 

287 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE 

288 return response 

289 

290 if str(default) == "mmng": 

291 mmngimg = mm_ng(idhash=kwargs["digest"], size=size) 

292 data = BytesIO() 

293 mmngimg.save(data, "PNG", quality=JPEG_QUALITY) 

294 data.seek(0) 

295 response = CachingHttpResponse(uri, data, content_type="image/png") 

296 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE 

297 return response 

298 

299 if str(default) == "mm" or str(default) == "mp": 

300 # If mm is explicitly given, we need to catch that 

301 static_img = path.join( 

302 "static", "img", "mm", "%s%s" % (str(size), ".png") 

303 ) 

304 if not path.isfile(static_img): 

305 # We trust this exists!!! 

306 static_img = path.join("static", "img", "mm", "512.png") 

307 # We trust static/ is mapped to /static/ 

308 return HttpResponseRedirect("/" + static_img) 

309 return HttpResponseRedirect(default) 

310 

311 static_img = path.join( 

312 "static", "img", "nobody", "%s%s" % (str(size), ".png") 

313 ) 

314 if not path.isfile(static_img): 

315 # We trust this exists!!! 

316 static_img = path.join("static", "img", "nobody", "512.png") 

317 # We trust static/ is mapped to /static/ 

318 return HttpResponseRedirect("/" + static_img) 

319 

320 imgformat = obj.photo.format 

321 photodata = Image.open(BytesIO(obj.photo.data)) 

322 

323 data = BytesIO() 

324 

325 # Animated GIFs need additional handling 

326 if imgformat == "gif" and photodata.is_animated: 

327 # Debug only 

328 # print("Object is animated and has %i frames" % photodata.n_frames) 

329 data = resize_animated_gif(photodata, (size, size)) 

330 else: 

331 # If the image is smaller than what was requested, we need 

332 # to use the function resize 

333 if photodata.size[0] < size or photodata.size[1] < size: 

334 photodata = photodata.resize((size, size), Image.LANCZOS) 

335 else: 

336 photodata.thumbnail((size, size), Image.LANCZOS) 

337 photodata.save(data, pil_format(imgformat), quality=JPEG_QUALITY) 

338 

339 data.seek(0) 

340 obj.photo.access_count += 1 

341 obj.photo.save() 

342 obj.access_count += 1 

343 obj.save() 

344 if imgformat == "jpg": 

345 imgformat = "jpeg" 

346 response = CachingHttpResponse(uri, data, content_type="image/%s" % imgformat) 

347 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE 

348 return response 

349 

350 

351class GravatarProxyView(View): 

352 """ 

353 Proxy request to Gravatar and return the image from there 

354 """ 

355 

356 # TODO: Do cache images!! Memcached? 

357 

358 def get( 

359 self, request, *args, **kwargs 

360 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements 

361 """ 

362 Override get from parent class 

363 """ 

364 

365 def redir_default(default=None): 

366 url = ( 

367 reverse_lazy("avatar_view", args=[kwargs["digest"]]) 

368 + "?s=%i" % size 

369 + "&forcedefault=y" 

370 ) 

371 if default is not None: 

372 url += "&default=%s" % default 

373 return HttpResponseRedirect(url) 

374 

375 size = get_size(request) 

376 gravatarimagedata = None 

377 default = None 

378 

379 try: 

380 if str(request.GET["default"]) != "None": 

381 default = request.GET["default"] 

382 except Exception: # pylint: disable=bare-except 

383 pass 

384 

385 if str(default) != "wavatar": 

386 # This part is special/hackish 

387 # Check if the image returned by Gravatar is their default image, if so, 

388 # redirect to our default instead. 

389 gravatar_test_url = ( 

390 "https://secure.gravatar.com/avatar/" 

391 + kwargs["digest"] 

392 + "?s=%i&d=%i" % (50, 404) 

393 ) 

394 if cache.get(gravatar_test_url) == "default": 

395 # DEBUG only 

396 # print("Cached Gravatar response: Default.") 

397 return redir_default(default) 

398 try: 

399 urlopen(gravatar_test_url, timeout=URL_TIMEOUT) 

400 except HTTPError as exc: 

401 if exc.code == 404: 

402 cache.set(gravatar_test_url, "default", 60) 

403 else: 

404 print("Gravatar test url fetch failed: %s" % exc) 

405 return redir_default(default) 

406 

407 gravatar_url = ( 

408 "https://secure.gravatar.com/avatar/" + kwargs["digest"] + "?s=%i" % size 

409 ) 

410 if default: 

411 gravatar_url += "&d=%s" % default 

412 

413 try: 

414 if cache.get(gravatar_url) == "err": 

415 print("Cached Gravatar fetch failed with URL error: %s" % gravatar_url) 

416 return redir_default(default) 

417 

418 gravatarimagedata = urlopen(gravatar_url, timeout=URL_TIMEOUT) 

419 except HTTPError as exc: 

420 if exc.code != 404 and exc.code != 503: 

421 print( 

422 "Gravatar fetch failed with an unexpected %s HTTP error: %s" 

423 % (exc.code, gravatar_url) 

424 ) 

425 cache.set(gravatar_url, "err", 30) 

426 return redir_default(default) 

427 except URLError as exc: 

428 print("Gravatar fetch failed with URL error: %s" % exc.reason) 

429 cache.set(gravatar_url, "err", 30) 

430 return redir_default(default) 

431 except SSLError as exc: 

432 print("Gravatar fetch failed with SSL error: %s" % exc.reason) 

433 cache.set(gravatar_url, "err", 30) 

434 return redir_default(default) 

435 try: 

436 data = BytesIO(gravatarimagedata.read()) 

437 img = Image.open(data) 

438 data.seek(0) 

439 response = HttpResponse( 

440 data.read(), content_type="image/%s" % file_format(img.format) 

441 ) 

442 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE 

443 return response 

444 

445 except ValueError as exc: 

446 print("Value error: %s" % exc) 

447 return redir_default(default) 

448 

449 # We shouldn't reach this point... But make sure we do something 

450 return redir_default(default) 

451 

452 

453class StatsView(TemplateView, JsonResponse): 

454 """ 

455 Return stats 

456 """ 

457 

458 def get( 

459 self, request, *args, **kwargs 

460 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements 

461 retval = { 

462 "users": User.objects.count(), 

463 "mails": ConfirmedEmail.objects.count(), 

464 "openids": ConfirmedOpenId.objects.count(), # pylint: disable=no-member 

465 "unconfirmed_mails": UnconfirmedEmail.objects.count(), # pylint: disable=no-member 

466 "unconfirmed_openids": UnconfirmedOpenId.objects.count(), # pylint: disable=no-member 

467 "avatars": Photo.objects.count(), # pylint: disable=no-member 

468 } 

469 

470 return JsonResponse(retval)