Coverage for ivatar/views.py: 51%
323 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-12 23:12 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-12 23:12 +0000
1# -*- coding: utf-8 -*-
2"""
3views under /
4"""
6import contextlib
7from io import BytesIO
8from os import path
9import hashlib
10from ivatar.utils import urlopen, Bluesky
11from urllib.error import HTTPError, URLError
12from ssl import SSLError
13from django.views.generic.base import TemplateView, View
14from django.http import HttpResponse, HttpResponseRedirect
15from django.http import HttpResponseNotFound, JsonResponse
16from django.core.exceptions import ObjectDoesNotExist
17from django.core.cache import cache, caches
18from django.utils.translation import gettext_lazy as _
19from django.urls import reverse_lazy
20from django.db.models import Q
21from django.contrib.auth.models import User
23from PIL import Image
25from monsterid.id import build_monster as BuildMonster
26import Identicon
27from pydenticon5 import Pydenticon5
28import pagan
29from robohash import Robohash
31from ivatar.settings import AVATAR_MAX_SIZE, JPEG_QUALITY, DEFAULT_AVATAR_SIZE
32from ivatar.settings import CACHE_RESPONSE
33from ivatar.settings import CACHE_IMAGES_MAX_AGE
34from ivatar.settings import TRUSTED_DEFAULT_URLS
35from .ivataraccount.models import ConfirmedEmail, ConfirmedOpenId
36from .ivataraccount.models import UnconfirmedEmail, UnconfirmedOpenId
37from .ivataraccount.models import Photo
38from .ivataraccount.models import pil_format, file_format
39from .utils import is_trusted_url, mm_ng, resize_animated_gif
42def get_size(request, size=DEFAULT_AVATAR_SIZE):
43 """
44 Get size from the URL arguments
45 """
46 sizetemp = None
47 if "s" in request.GET:
48 sizetemp = request.GET["s"]
49 if "size" in request.GET:
50 sizetemp = request.GET["size"]
51 if sizetemp:
52 if sizetemp not in ["", "0"]:
53 with contextlib.suppress(ValueError):
54 if int(sizetemp) > 0:
55 size = int(sizetemp)
56 size = min(size, int(AVATAR_MAX_SIZE))
57 return size
60class CachingHttpResponse(HttpResponse):
61 """
62 Handle caching of response
63 """
65 def __init__(
66 self,
67 uri,
68 content=b"",
69 content_type=None,
70 status=200, # pylint: disable=too-many-arguments
71 reason=None,
72 charset=None,
73 ):
74 if CACHE_RESPONSE:
75 caches["filesystem"].set(
76 uri,
77 {
78 "content": content,
79 "content_type": content_type,
80 "status": status,
81 "reason": reason,
82 "charset": charset,
83 },
84 )
85 super().__init__(content, content_type, status, reason, charset)
88class AvatarImageView(TemplateView):
89 """
90 View to return (binary) image, based on OpenID/Email (both by digest)
91 """
93 # TODO: Do cache resize images!! Memcached?
95 def options(self, request, *args, **kwargs):
96 response = HttpResponse("", content_type="text/plain")
97 response["Allow"] = "404 mm mp retro pagan wavatar monsterid robohash identicon"
98 return response
100 def get(
101 self, request, *args, **kwargs
102 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,too-many-return-statements
103 """
104 Override get from parent class
105 """
106 model = ConfirmedEmail
107 size = get_size(request)
108 imgformat = "png"
109 obj = None
110 default = None
111 forcedefault = False
112 gravatarredirect = False
113 gravatarproxy = True
114 uri = request.build_absolute_uri()
116 # Check the cache first
117 if CACHE_RESPONSE:
118 if centry := caches["filesystem"].get(uri):
119 # For DEBUG purpose only
120 # print('Cached entry for %s' % uri)
121 return HttpResponse(
122 centry["content"],
123 content_type=centry["content_type"],
124 status=centry["status"],
125 reason=centry["reason"],
126 charset=centry["charset"],
127 )
129 # In case no digest at all is provided, return to home page
130 if "digest" not in kwargs:
131 return HttpResponseRedirect(reverse_lazy("home"))
133 if "d" in request.GET:
134 default = request.GET["d"]
135 if "default" in request.GET:
136 default = request.GET["default"]
138 if default is not None:
139 if TRUSTED_DEFAULT_URLS is None:
140 print("Query parameter `default` is disabled.")
141 default = None
142 elif default.find("://") > 0:
143 # Check if it's trusted, if not, reset to None
144 trusted_url = is_trusted_url(default, TRUSTED_DEFAULT_URLS)
146 if not trusted_url:
147 print(
148 f"Default URL is not in trusted URLs: '{default}'; Kicking it!"
149 )
150 default = None
152 if "f" in request.GET:
153 if request.GET["f"] == "y":
154 forcedefault = True
155 if "forcedefault" in request.GET:
156 if request.GET["forcedefault"] == "y":
157 forcedefault = True
159 if "gravatarredirect" in request.GET:
160 if request.GET["gravatarredirect"] == "y":
161 gravatarredirect = True
163 if "gravatarproxy" in request.GET:
164 if request.GET["gravatarproxy"] == "n":
165 gravatarproxy = False
167 try:
168 obj = model.objects.get(digest=kwargs["digest"])
169 except ObjectDoesNotExist:
170 try:
171 obj = model.objects.get(digest_sha256=kwargs["digest"])
172 except ObjectDoesNotExist:
173 model = ConfirmedOpenId
174 with contextlib.suppress(Exception):
175 d = kwargs["digest"] # pylint: disable=invalid-name
176 # OpenID is tricky. http vs. https, versus trailing slash or not
177 # However, some users eventually have added their variations already
178 # and therefore we need to use filter() and first()
179 obj = model.objects.filter(
180 Q(digest=d)
181 | Q(alt_digest1=d)
182 | Q(alt_digest2=d)
183 | Q(alt_digest3=d)
184 ).first()
185 # Handle the special case of Bluesky
186 if obj:
187 if obj.bluesky_handle:
188 return HttpResponseRedirect(
189 reverse_lazy("blueskyproxy", args=[kwargs["digest"]])
190 )
191 # If that mail/openid doesn't exist, or has no photo linked to it
192 if not obj or not obj.photo or forcedefault:
193 gravatar_url = (
194 "https://secure.gravatar.com/avatar/"
195 + kwargs["digest"]
196 + "?s=%i" % size
197 )
199 # If we have redirection to Gravatar enabled, this overrides all
200 # default= settings, except forcedefault!
201 if gravatarredirect and not forcedefault:
202 return HttpResponseRedirect(gravatar_url)
204 # Request to proxy Gravatar image - only if not forcedefault
205 if gravatarproxy and not forcedefault:
206 url = (
207 reverse_lazy("gravatarproxy", args=[kwargs["digest"]])
208 + "?s=%i" % size
209 )
210 # Ensure we do not convert None to string 'None'
211 if default:
212 url += f"&default={default}"
213 return HttpResponseRedirect(url)
215 # Return the default URL, as specified, or 404 Not Found, if default=404
216 if default:
217 # Proxy to gravatar to generate wavatar - lazy me
218 if str(default) == "wavatar":
219 url = (
220 reverse_lazy("gravatarproxy", args=[kwargs["digest"]])
221 + "?s=%i" % size
222 + f"&default={default}&f=y"
223 )
224 return HttpResponseRedirect(url)
226 if str(default) == str(404):
227 return HttpResponseNotFound(_("<h1>Image not found</h1>"))
229 if str(default) == "monsterid":
230 monsterdata = BuildMonster(seed=kwargs["digest"], size=(size, size))
231 data = BytesIO()
232 return self._return_cached_png(monsterdata, data, uri)
233 if str(default) == "robohash":
234 roboset = request.GET.get("robohash") or "any"
235 robohash = Robohash(kwargs["digest"])
236 robohash.assemble(roboset=roboset, sizex=size, sizey=size)
237 data = BytesIO()
238 robohash.img.save(data, format="png")
239 return self._return_cached_response(data, uri)
240 if str(default) == "retro":
241 identicon = Identicon.render(kwargs["digest"])
242 data = BytesIO()
243 img = Image.open(BytesIO(identicon))
244 img = img.resize((size, size), Image.LANCZOS)
245 return self._return_cached_png(img, data, uri)
246 if str(default) == "pagan":
247 paganobj = pagan.Avatar(kwargs["digest"])
248 data = BytesIO()
249 img = paganobj.img.resize((size, size), Image.LANCZOS)
250 return self._return_cached_png(img, data, uri)
251 if str(default) == "identicon":
252 p = Pydenticon5() # pylint: disable=invalid-name
253 # In order to make use of the whole 32 bytes digest, we need to redigest them.
254 newdigest = hashlib.md5(
255 bytes(kwargs["digest"], "utf-8")
256 ).hexdigest()
257 img = p.draw(newdigest, size, 0)
258 data = BytesIO()
259 return self._return_cached_png(img, data, uri)
260 if str(default) == "mmng":
261 mmngimg = mm_ng(idhash=kwargs["digest"], size=size)
262 data = BytesIO()
263 return self._return_cached_png(mmngimg, data, uri)
264 if str(default) in {"mm", "mp"}:
265 return self._redirect_static_w_size("mm", size)
266 return HttpResponseRedirect(default)
268 return self._redirect_static_w_size("nobody", size)
269 imgformat = obj.photo.format
270 photodata = Image.open(BytesIO(obj.photo.data))
272 data = BytesIO()
274 # Animated GIFs need additional handling
275 if imgformat == "gif" and photodata.is_animated:
276 # Debug only
277 # print("Object is animated and has %i frames" % photodata.n_frames)
278 data = resize_animated_gif(photodata, (size, size))
279 else:
280 # If the image is smaller than what was requested, we need
281 # to use the function resize
282 if photodata.size[0] < size or photodata.size[1] < size:
283 photodata = photodata.resize((size, size), Image.LANCZOS)
284 else:
285 photodata.thumbnail((size, size), Image.LANCZOS)
286 photodata.save(data, pil_format(imgformat), quality=JPEG_QUALITY)
288 data.seek(0)
289 obj.photo.access_count += 1
290 obj.photo.save()
291 obj.access_count += 1
292 obj.save()
293 if imgformat == "jpg":
294 imgformat = "jpeg"
295 response = CachingHttpResponse(uri, data, content_type=f"image/{imgformat}")
296 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
297 return response
299 def _redirect_static_w_size(self, arg0, size):
300 """
301 Helper method to redirect to static image with size i/a
302 """
303 # If mm is explicitly given, we need to catch that
304 static_img = path.join("static", "img", arg0, f"{str(size)}.png")
305 if not path.isfile(static_img):
306 # We trust this exists!!!
307 static_img = path.join("static", "img", arg0, "512.png")
308 # We trust static/ is mapped to /static/
309 return HttpResponseRedirect(f"/{static_img}")
311 def _return_cached_response(self, data, uri):
312 data.seek(0)
313 response = CachingHttpResponse(uri, data, content_type="image/png")
314 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
315 return response
317 def _return_cached_png(self, arg0, data, uri):
318 arg0.save(data, "PNG", quality=JPEG_QUALITY)
319 return self._return_cached_response(data, uri)
322class GravatarProxyView(View):
323 """
324 Proxy request to Gravatar and return the image from there
325 """
327 # TODO: Do cache images!! Memcached?
329 def get(
330 self, request, *args, **kwargs
331 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
332 """
333 Override get from parent class
334 """
336 def redir_default(default=None):
337 url = (
338 reverse_lazy("avatar_view", args=[kwargs["digest"]])
339 + "?s=%i" % size
340 + "&forcedefault=y"
341 )
342 if default is not None:
343 url += f"&default={default}"
344 return HttpResponseRedirect(url)
346 size = get_size(request)
347 gravatarimagedata = None
348 default = None
350 with contextlib.suppress(Exception):
351 if str(request.GET["default"]) != "None":
352 default = request.GET["default"]
353 if str(default) != "wavatar":
354 # This part is special/hackish
355 # Check if the image returned by Gravatar is their default image, if so,
356 # redirect to our default instead.
357 gravatar_test_url = (
358 "https://secure.gravatar.com/avatar/"
359 + kwargs["digest"]
360 + "?s=%i&d=%i" % (50, 404)
361 )
362 if cache.get(gravatar_test_url) == "default":
363 # DEBUG only
364 # print("Cached Gravatar response: Default.")
365 return redir_default(default)
366 try:
367 urlopen(gravatar_test_url)
368 except HTTPError as exc:
369 if exc.code == 404:
370 cache.set(gravatar_test_url, "default", 60)
371 else:
372 print(f"Gravatar test url fetch failed: {exc}")
373 return redir_default(default)
375 gravatar_url = (
376 "https://secure.gravatar.com/avatar/" + kwargs["digest"] + "?s=%i" % size
377 )
378 if default:
379 gravatar_url += f"&d={default}"
381 try:
382 if cache.get(gravatar_url) == "err":
383 print(f"Cached Gravatar fetch failed with URL error: {gravatar_url}")
384 return redir_default(default)
386 gravatarimagedata = urlopen(gravatar_url)
387 except HTTPError as exc:
388 if exc.code not in [404, 503]:
389 print(
390 f"Gravatar fetch failed with an unexpected {exc.code} HTTP error: {gravatar_url}"
391 )
392 cache.set(gravatar_url, "err", 30)
393 return redir_default(default)
394 except URLError as exc:
395 print(f"Gravatar fetch failed with URL error: {exc.reason}")
396 cache.set(gravatar_url, "err", 30)
397 return redir_default(default)
398 except SSLError as exc:
399 print(f"Gravatar fetch failed with SSL error: {exc.reason}")
400 cache.set(gravatar_url, "err", 30)
401 return redir_default(default)
402 try:
403 data = BytesIO(gravatarimagedata.read())
404 img = Image.open(data)
405 data.seek(0)
406 response = HttpResponse(
407 data.read(), content_type=f"image/{file_format(img.format)}"
408 )
409 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
410 return response
412 except ValueError as exc:
413 print(f"Value error: {exc}")
414 return redir_default(default)
416 # We shouldn't reach this point... But make sure we do something
417 return redir_default(default)
420class BlueskyProxyView(View):
421 """
422 Proxy request to Bluesky and return the image from there
423 """
425 def get(
426 self, request, *args, **kwargs
427 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
428 """
429 Override get from parent class
430 """
432 def redir_default(default=None):
433 url = (
434 reverse_lazy("avatar_view", args=[kwargs["digest"]])
435 + "?s=%i" % size
436 + "&forcedefault=y"
437 )
438 if default is not None:
439 url += f"&default={default}"
440 return HttpResponseRedirect(url)
442 size = get_size(request)
443 print(size)
444 blueskyimagedata = None
445 default = None
447 with contextlib.suppress(Exception):
448 if str(request.GET["default"]) != "None":
449 default = request.GET["default"]
450 identity = None
452 # First check for email, as this is the most common
453 try:
454 identity = ConfirmedEmail.objects.filter(
455 Q(digest=kwargs["digest"]) | Q(digest_sha256=kwargs["digest"])
456 ).first()
457 except Exception as exc:
458 print(exc)
460 # If no identity is found in the email table, try the openid table
461 if not identity:
462 try:
463 identity = ConfirmedOpenId.objects.filter(
464 Q(digest=kwargs["digest"])
465 | Q(alt_digest1=kwargs["digest"])
466 | Q(alt_digest2=kwargs["digest"])
467 | Q(alt_digest3=kwargs["digest"])
468 ).first()
469 except Exception as exc:
470 print(exc)
472 # If still no identity is found, redirect to the default
473 if not identity:
474 return redir_default(default)
476 bs = Bluesky()
477 bluesky_url = None
478 # Try with the cache first
479 with contextlib.suppress(Exception):
480 if cache.get(identity.bluesky_handle):
481 bluesky_url = cache.get(identity.bluesky_handle)
482 if not bluesky_url:
483 try:
484 bluesky_url = bs.get_avatar(identity.bluesky_handle)
485 cache.set(identity.bluesky_handle, bluesky_url)
486 except Exception: # pylint: disable=bare-except
487 return redir_default(default)
489 try:
490 if cache.get(bluesky_url) == "err":
491 print(f"Cached Bluesky fetch failed with URL error: {bluesky_url}")
492 return redir_default(default)
494 blueskyimagedata = urlopen(bluesky_url)
495 except HTTPError as exc:
496 if exc.code not in [404, 503]:
497 print(
498 f"Bluesky fetch failed with an unexpected {exc.code} HTTP error: {bluesky_url}"
499 )
500 cache.set(bluesky_url, "err", 30)
501 return redir_default(default)
502 except URLError as exc:
503 print(f"Bluesky fetch failed with URL error: {exc.reason}")
504 cache.set(bluesky_url, "err", 30)
505 return redir_default(default)
506 except SSLError as exc:
507 print(f"Bluesky fetch failed with SSL error: {exc.reason}")
508 cache.set(bluesky_url, "err", 30)
509 return redir_default(default)
510 try:
511 data = BytesIO(blueskyimagedata.read())
512 img = Image.open(data)
513 img_format = img.format
514 if max(img.size) > size:
515 aspect = img.size[0] / float(img.size[1])
516 if aspect > 1:
517 new_size = (size, int(size / aspect))
518 else:
519 new_size = (int(size * aspect), size)
520 img = img.resize(new_size)
521 data = BytesIO()
522 img.save(data, format=img_format)
524 data.seek(0)
525 response = HttpResponse(
526 data.read(), content_type=f"image/{file_format(format)}"
527 )
528 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
529 return response
530 except ValueError as exc:
531 print(f"Value error: {exc}")
532 return redir_default(default)
534 # We shouldn't reach this point... But make sure we do something
535 return redir_default(default)
538class StatsView(TemplateView, JsonResponse):
539 """
540 Return stats
541 """
543 def get(
544 self, request, *args, **kwargs
545 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
546 retval = {
547 "users": User.objects.count(),
548 "mails": ConfirmedEmail.objects.count(),
549 "openids": ConfirmedOpenId.objects.count(), # pylint: disable=no-member
550 "unconfirmed_mails": UnconfirmedEmail.objects.count(), # pylint: disable=no-member
551 "unconfirmed_openids": UnconfirmedOpenId.objects.count(), # pylint: disable=no-member
552 "avatars": Photo.objects.count(), # pylint: disable=no-member
553 }
555 return JsonResponse(retval)