Coverage for ivatar/views.py: 50%

327 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-15 23:12 +0000

1# -*- coding: utf-8 -*- 

2""" 

3views under / 

4""" 

5 

6import contextlib 

7from io import BytesIO 

8from os import path 

9import hashlib 

10from ivatar.utils import urlopen, Bluesky 

11from urllib.error import HTTPError, URLError 

12from ssl import SSLError 

13from django.views.generic.base import TemplateView, View 

14from django.http import HttpResponse, HttpResponseRedirect 

15from django.http import HttpResponseNotFound, JsonResponse 

16from django.core.exceptions import ObjectDoesNotExist 

17from django.core.cache import cache, caches 

18from django.utils.translation import gettext_lazy as _ 

19from django.urls import reverse_lazy 

20from django.db.models import Q 

21from django.contrib.auth.models import User 

22 

23from PIL import Image 

24 

25from monsterid.id import build_monster as BuildMonster 

26import Identicon 

27from pydenticon5 import Pydenticon5 

28import pagan 

29from robohash import Robohash 

30 

31from ivatar.settings import AVATAR_MAX_SIZE, JPEG_QUALITY, DEFAULT_AVATAR_SIZE 

32from ivatar.settings import CACHE_RESPONSE 

33from ivatar.settings import CACHE_IMAGES_MAX_AGE 

34from ivatar.settings import TRUSTED_DEFAULT_URLS 

35from .ivataraccount.models import ConfirmedEmail, ConfirmedOpenId 

36from .ivataraccount.models import UnconfirmedEmail, UnconfirmedOpenId 

37from .ivataraccount.models import Photo 

38from .ivataraccount.models import pil_format, file_format 

39from .utils import is_trusted_url, mm_ng, resize_animated_gif 

40 

41 

42def get_size(request, size=DEFAULT_AVATAR_SIZE): 

43 """ 

44 Get size from the URL arguments 

45 """ 

46 sizetemp = None 

47 if "s" in request.GET: 

48 sizetemp = request.GET["s"] 

49 if "size" in request.GET: 

50 sizetemp = request.GET["size"] 

51 if sizetemp: 

52 if sizetemp not in ["", "0"]: 

53 with contextlib.suppress(ValueError): 

54 if int(sizetemp) > 0: 

55 size = int(sizetemp) 

56 size = min(size, int(AVATAR_MAX_SIZE)) 

57 return size 

58 

59 

60class CachingHttpResponse(HttpResponse): 

61 """ 

62 Handle caching of response 

63 """ 

64 

65 def __init__( 

66 self, 

67 uri, 

68 content=b"", 

69 content_type=None, 

70 status=200, # pylint: disable=too-many-arguments 

71 reason=None, 

72 charset=None, 

73 ): 

74 if CACHE_RESPONSE: 

75 caches["filesystem"].set( 

76 uri, 

77 { 

78 "content": content, 

79 "content_type": content_type, 

80 "status": status, 

81 "reason": reason, 

82 "charset": charset, 

83 }, 

84 ) 

85 super().__init__(content, content_type, status, reason, charset) 

86 

87 

88class AvatarImageView(TemplateView): 

89 """ 

90 View to return (binary) image, based on OpenID/Email (both by digest) 

91 """ 

92 

93 # TODO: Do cache resize images!! Memcached? 

94 

95 def options(self, request, *args, **kwargs): 

96 response = HttpResponse("", content_type="text/plain") 

97 response["Allow"] = "404 mm mp retro pagan wavatar monsterid robohash identicon" 

98 return response 

99 

100 def get( 

101 self, request, *args, **kwargs 

102 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,too-many-return-statements 

103 """ 

104 Override get from parent class 

105 """ 

106 model = ConfirmedEmail 

107 size = get_size(request) 

108 imgformat = "png" 

109 obj = None 

110 default = None 

111 forcedefault = False 

112 gravatarredirect = False 

113 gravatarproxy = True 

114 uri = request.build_absolute_uri() 

115 

116 # Check the cache first 

117 if CACHE_RESPONSE: 

118 if centry := caches["filesystem"].get(uri): 

119 # For DEBUG purpose only 

120 # print('Cached entry for %s' % uri) 

121 return HttpResponse( 

122 centry["content"], 

123 content_type=centry["content_type"], 

124 status=centry["status"], 

125 reason=centry["reason"], 

126 charset=centry["charset"], 

127 ) 

128 

129 # In case no digest at all is provided, return to home page 

130 if "digest" not in kwargs: 

131 return HttpResponseRedirect(reverse_lazy("home")) 

132 

133 if "d" in request.GET: 

134 default = request.GET["d"] 

135 if "default" in request.GET: 

136 default = request.GET["default"] 

137 

138 if default is not None: 

139 if TRUSTED_DEFAULT_URLS is None: 

140 print("Query parameter `default` is disabled.") 

141 default = None 

142 elif default.find("://") > 0: 

143 # Check if it's trusted, if not, reset to None 

144 trusted_url = is_trusted_url(default, TRUSTED_DEFAULT_URLS) 

145 

146 if not trusted_url: 

147 print( 

148 f"Default URL is not in trusted URLs: '{default}'; Kicking it!" 

149 ) 

150 default = None 

151 

152 if "f" in request.GET: 

153 if request.GET["f"] == "y": 

154 forcedefault = True 

155 if "forcedefault" in request.GET: 

156 if request.GET["forcedefault"] == "y": 

157 forcedefault = True 

158 

159 if "gravatarredirect" in request.GET: 

160 if request.GET["gravatarredirect"] == "y": 

161 gravatarredirect = True 

162 

163 if "gravatarproxy" in request.GET: 

164 if request.GET["gravatarproxy"] == "n": 

165 gravatarproxy = False 

166 

167 try: 

168 obj = model.objects.get(digest=kwargs["digest"]) 

169 except ObjectDoesNotExist: 

170 try: 

171 obj = model.objects.get(digest_sha256=kwargs["digest"]) 

172 except ObjectDoesNotExist: 

173 model = ConfirmedOpenId 

174 with contextlib.suppress(Exception): 

175 d = kwargs["digest"] # pylint: disable=invalid-name 

176 # OpenID is tricky. http vs. https, versus trailing slash or not 

177 # However, some users eventually have added their variations already 

178 # and therefore we need to use filter() and first() 

179 obj = model.objects.filter( 

180 Q(digest=d) 

181 | Q(alt_digest1=d) 

182 | Q(alt_digest2=d) 

183 | Q(alt_digest3=d) 

184 ).first() 

185 # Handle the special case of Bluesky 

186 if obj: 

187 if obj.bluesky_handle: 

188 return HttpResponseRedirect( 

189 reverse_lazy("blueskyproxy", args=[kwargs["digest"]]) 

190 ) 

191 # If that mail/openid doesn't exist, or has no photo linked to it 

192 if not obj or not obj.photo or forcedefault: 

193 gravatar_url = ( 

194 "https://secure.gravatar.com/avatar/" 

195 + kwargs["digest"] 

196 + "?s=%i" % size 

197 ) 

198 

199 # If we have redirection to Gravatar enabled, this overrides all 

200 # default= settings, except forcedefault! 

201 if gravatarredirect and not forcedefault: 

202 return HttpResponseRedirect(gravatar_url) 

203 

204 # Request to proxy Gravatar image - only if not forcedefault 

205 if gravatarproxy and not forcedefault: 

206 url = ( 

207 reverse_lazy("gravatarproxy", args=[kwargs["digest"]]) 

208 + "?s=%i" % size 

209 ) 

210 # Ensure we do not convert None to string 'None' 

211 if default: 

212 url += f"&default={default}" 

213 return HttpResponseRedirect(url) 

214 

215 # Return the default URL, as specified, or 404 Not Found, if default=404 

216 if default: 

217 # Proxy to gravatar to generate wavatar - lazy me 

218 if str(default) == "wavatar": 

219 url = ( 

220 reverse_lazy("gravatarproxy", args=[kwargs["digest"]]) 

221 + "?s=%i" % size 

222 + f"&default={default}&f=y" 

223 ) 

224 return HttpResponseRedirect(url) 

225 

226 if str(default) == str(404): 

227 return HttpResponseNotFound(_("<h1>Image not found</h1>")) 

228 

229 if str(default) == "monsterid": 

230 monsterdata = BuildMonster(seed=kwargs["digest"], size=(size, size)) 

231 data = BytesIO() 

232 return self._return_cached_png(monsterdata, data, uri) 

233 if str(default) == "robohash": 

234 roboset = request.GET.get("robohash") or "any" 

235 robohash = Robohash(kwargs["digest"]) 

236 robohash.assemble(roboset=roboset, sizex=size, sizey=size) 

237 data = BytesIO() 

238 robohash.img.save(data, format="png") 

239 return self._return_cached_response(data, uri) 

240 if str(default) == "retro": 

241 identicon = Identicon.render(kwargs["digest"]) 

242 data = BytesIO() 

243 img = Image.open(BytesIO(identicon)) 

244 img = img.resize((size, size), Image.LANCZOS) 

245 return self._return_cached_png(img, data, uri) 

246 if str(default) == "pagan": 

247 paganobj = pagan.Avatar(kwargs["digest"]) 

248 data = BytesIO() 

249 img = paganobj.img.resize((size, size), Image.LANCZOS) 

250 return self._return_cached_png(img, data, uri) 

251 if str(default) == "identicon": 

252 p = Pydenticon5() # pylint: disable=invalid-name 

253 # In order to make use of the whole 32 bytes digest, we need to redigest them. 

254 newdigest = hashlib.md5( 

255 bytes(kwargs["digest"], "utf-8") 

256 ).hexdigest() 

257 img = p.draw(newdigest, size, 0) 

258 data = BytesIO() 

259 return self._return_cached_png(img, data, uri) 

260 if str(default) == "mmng": 

261 mmngimg = mm_ng(idhash=kwargs["digest"], size=size) 

262 data = BytesIO() 

263 return self._return_cached_png(mmngimg, data, uri) 

264 if str(default) in {"mm", "mp"}: 

265 return self._redirect_static_w_size("mm", size) 

266 return HttpResponseRedirect(default) 

267 

268 return self._redirect_static_w_size("nobody", size) 

269 imgformat = obj.photo.format 

270 photodata = Image.open(BytesIO(obj.photo.data)) 

271 

272 data = BytesIO() 

273 

274 # Animated GIFs need additional handling 

275 if imgformat == "gif" and photodata.is_animated: 

276 # Debug only 

277 # print("Object is animated and has %i frames" % photodata.n_frames) 

278 data = resize_animated_gif(photodata, (size, size)) 

279 else: 

280 # If the image is smaller than what was requested, we need 

281 # to use the function resize 

282 if photodata.size[0] < size or photodata.size[1] < size: 

283 photodata = photodata.resize((size, size), Image.LANCZOS) 

284 else: 

285 photodata.thumbnail((size, size), Image.LANCZOS) 

286 photodata.save(data, pil_format(imgformat), quality=JPEG_QUALITY) 

287 

288 data.seek(0) 

289 obj.photo.access_count += 1 

290 obj.photo.save() 

291 obj.access_count += 1 

292 obj.save() 

293 if imgformat == "jpg": 

294 imgformat = "jpeg" 

295 response = CachingHttpResponse(uri, data, content_type=f"image/{imgformat}") 

296 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE 

297 # Remove Vary header for images since language doesn't matter 

298 response["Vary"] = "" 

299 return response 

300 

301 def _redirect_static_w_size(self, arg0, size): 

302 """ 

303 Helper method to redirect to static image with size i/a 

304 """ 

305 # If mm is explicitly given, we need to catch that 

306 static_img = path.join("static", "img", arg0, f"{str(size)}.png") 

307 if not path.isfile(static_img): 

308 # We trust this exists!!! 

309 static_img = path.join("static", "img", arg0, "512.png") 

310 # We trust static/ is mapped to /static/ 

311 return HttpResponseRedirect(f"/{static_img}") 

312 

313 def _return_cached_response(self, data, uri): 

314 data.seek(0) 

315 response = CachingHttpResponse(uri, data, content_type="image/png") 

316 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE 

317 # Remove Vary header for images since language doesn't matter 

318 response["Vary"] = "" 

319 return response 

320 

321 def _return_cached_png(self, arg0, data, uri): 

322 arg0.save(data, "PNG", quality=JPEG_QUALITY) 

323 return self._return_cached_response(data, uri) 

324 

325 

326class GravatarProxyView(View): 

327 """ 

328 Proxy request to Gravatar and return the image from there 

329 """ 

330 

331 # TODO: Do cache images!! Memcached? 

332 

333 def get( 

334 self, request, *args, **kwargs 

335 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements 

336 """ 

337 Override get from parent class 

338 """ 

339 

340 def redir_default(default=None): 

341 url = ( 

342 reverse_lazy("avatar_view", args=[kwargs["digest"]]) 

343 + "?s=%i" % size 

344 + "&forcedefault=y" 

345 ) 

346 if default is not None: 

347 url += f"&default={default}" 

348 return HttpResponseRedirect(url) 

349 

350 size = get_size(request) 

351 gravatarimagedata = None 

352 default = None 

353 

354 with contextlib.suppress(Exception): 

355 if str(request.GET["default"]) != "None": 

356 default = request.GET["default"] 

357 if str(default) != "wavatar": 

358 # This part is special/hackish 

359 # Check if the image returned by Gravatar is their default image, if so, 

360 # redirect to our default instead. 

361 gravatar_test_url = ( 

362 "https://secure.gravatar.com/avatar/" 

363 + kwargs["digest"] 

364 + "?s=%i&d=%i" % (50, 404) 

365 ) 

366 if cache.get(gravatar_test_url) == "default": 

367 # DEBUG only 

368 # print("Cached Gravatar response: Default.") 

369 return redir_default(default) 

370 try: 

371 urlopen(gravatar_test_url) 

372 except HTTPError as exc: 

373 if exc.code == 404: 

374 cache.set(gravatar_test_url, "default", 60) 

375 else: 

376 print(f"Gravatar test url fetch failed: {exc}") 

377 return redir_default(default) 

378 

379 gravatar_url = ( 

380 "https://secure.gravatar.com/avatar/" + kwargs["digest"] + "?s=%i" % size 

381 ) 

382 if default: 

383 gravatar_url += f"&d={default}" 

384 

385 try: 

386 if cache.get(gravatar_url) == "err": 

387 print(f"Cached Gravatar fetch failed with URL error: {gravatar_url}") 

388 return redir_default(default) 

389 

390 gravatarimagedata = urlopen(gravatar_url) 

391 except HTTPError as exc: 

392 if exc.code not in [404, 503]: 

393 print( 

394 f"Gravatar fetch failed with an unexpected {exc.code} HTTP error: {gravatar_url}" 

395 ) 

396 cache.set(gravatar_url, "err", 30) 

397 return redir_default(default) 

398 except URLError as exc: 

399 print(f"Gravatar fetch failed with URL error: {exc.reason}") 

400 cache.set(gravatar_url, "err", 30) 

401 return redir_default(default) 

402 except SSLError as exc: 

403 print(f"Gravatar fetch failed with SSL error: {exc.reason}") 

404 cache.set(gravatar_url, "err", 30) 

405 return redir_default(default) 

406 try: 

407 data = BytesIO(gravatarimagedata.read()) 

408 img = Image.open(data) 

409 data.seek(0) 

410 response = HttpResponse( 

411 data.read(), content_type=f"image/{file_format(img.format)}" 

412 ) 

413 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE 

414 # Remove Vary header for images since language doesn't matter 

415 response["Vary"] = "" 

416 return response 

417 

418 except ValueError as exc: 

419 print(f"Value error: {exc}") 

420 return redir_default(default) 

421 

422 # We shouldn't reach this point... But make sure we do something 

423 return redir_default(default) 

424 

425 

426class BlueskyProxyView(View): 

427 """ 

428 Proxy request to Bluesky and return the image from there 

429 """ 

430 

431 def get( 

432 self, request, *args, **kwargs 

433 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements 

434 """ 

435 Override get from parent class 

436 """ 

437 

438 def redir_default(default=None): 

439 url = ( 

440 reverse_lazy("avatar_view", args=[kwargs["digest"]]) 

441 + "?s=%i" % size 

442 + "&forcedefault=y" 

443 ) 

444 if default is not None: 

445 url += f"&default={default}" 

446 return HttpResponseRedirect(url) 

447 

448 size = get_size(request) 

449 print(size) 

450 blueskyimagedata = None 

451 default = None 

452 

453 with contextlib.suppress(Exception): 

454 if str(request.GET["default"]) != "None": 

455 default = request.GET["default"] 

456 identity = None 

457 

458 # First check for email, as this is the most common 

459 try: 

460 identity = ConfirmedEmail.objects.filter( 

461 Q(digest=kwargs["digest"]) | Q(digest_sha256=kwargs["digest"]) 

462 ).first() 

463 except Exception as exc: 

464 print(exc) 

465 

466 # If no identity is found in the email table, try the openid table 

467 if not identity: 

468 try: 

469 identity = ConfirmedOpenId.objects.filter( 

470 Q(digest=kwargs["digest"]) 

471 | Q(alt_digest1=kwargs["digest"]) 

472 | Q(alt_digest2=kwargs["digest"]) 

473 | Q(alt_digest3=kwargs["digest"]) 

474 ).first() 

475 except Exception as exc: 

476 print(exc) 

477 

478 # If still no identity is found, redirect to the default 

479 if not identity: 

480 return redir_default(default) 

481 

482 bs = Bluesky() 

483 bluesky_url = None 

484 # Try with the cache first 

485 with contextlib.suppress(Exception): 

486 if cache.get(identity.bluesky_handle): 

487 bluesky_url = cache.get(identity.bluesky_handle) 

488 if not bluesky_url: 

489 try: 

490 bluesky_url = bs.get_avatar(identity.bluesky_handle) 

491 cache.set(identity.bluesky_handle, bluesky_url) 

492 except Exception: # pylint: disable=bare-except 

493 return redir_default(default) 

494 

495 try: 

496 if cache.get(bluesky_url) == "err": 

497 print(f"Cached Bluesky fetch failed with URL error: {bluesky_url}") 

498 return redir_default(default) 

499 

500 blueskyimagedata = urlopen(bluesky_url) 

501 except HTTPError as exc: 

502 if exc.code not in [404, 503]: 

503 print( 

504 f"Bluesky fetch failed with an unexpected {exc.code} HTTP error: {bluesky_url}" 

505 ) 

506 cache.set(bluesky_url, "err", 30) 

507 return redir_default(default) 

508 except URLError as exc: 

509 print(f"Bluesky fetch failed with URL error: {exc.reason}") 

510 cache.set(bluesky_url, "err", 30) 

511 return redir_default(default) 

512 except SSLError as exc: 

513 print(f"Bluesky fetch failed with SSL error: {exc.reason}") 

514 cache.set(bluesky_url, "err", 30) 

515 return redir_default(default) 

516 try: 

517 data = BytesIO(blueskyimagedata.read()) 

518 img = Image.open(data) 

519 img_format = img.format 

520 if max(img.size) > size: 

521 aspect = img.size[0] / float(img.size[1]) 

522 if aspect > 1: 

523 new_size = (size, int(size / aspect)) 

524 else: 

525 new_size = (int(size * aspect), size) 

526 img = img.resize(new_size) 

527 data = BytesIO() 

528 img.save(data, format=img_format) 

529 

530 data.seek(0) 

531 response = HttpResponse( 

532 data.read(), content_type=f"image/{file_format(format)}" 

533 ) 

534 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE 

535 # Remove Vary header for images since language doesn't matter 

536 response["Vary"] = "" 

537 return response 

538 except ValueError as exc: 

539 print(f"Value error: {exc}") 

540 return redir_default(default) 

541 

542 # We shouldn't reach this point... But make sure we do something 

543 return redir_default(default) 

544 

545 

546class StatsView(TemplateView, JsonResponse): 

547 """ 

548 Return stats 

549 """ 

550 

551 def get( 

552 self, request, *args, **kwargs 

553 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements 

554 retval = { 

555 "users": User.objects.count(), 

556 "mails": ConfirmedEmail.objects.count(), 

557 "openids": ConfirmedOpenId.objects.count(), # pylint: disable=no-member 

558 "unconfirmed_mails": UnconfirmedEmail.objects.count(), # pylint: disable=no-member 

559 "unconfirmed_openids": UnconfirmedOpenId.objects.count(), # pylint: disable=no-member 

560 "avatars": Photo.objects.count(), # pylint: disable=no-member 

561 } 

562 

563 return JsonResponse(retval)