Coverage for ivatar/views.py: 59%
499 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 00:07 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-07 00:07 +0000
1"""
2views under /
3"""
5import contextlib
6from io import BytesIO
7from os import path
8import hashlib
9import logging
10import threading
11from ivatar.utils import urlopen, Bluesky
12from urllib.error import HTTPError, URLError
13from ssl import SSLError
14from django.views.generic.base import TemplateView, View
15from django.http import HttpResponse, HttpResponseRedirect
16from django.http import HttpResponseNotFound, JsonResponse
17from django.core.exceptions import ObjectDoesNotExist
18from django.core.cache import cache, caches
19from django.utils.translation import gettext_lazy as _
20from django.urls import reverse_lazy
21from django.db.models import Q
22from django.contrib.auth.models import User
24from PIL import Image
26from monsterid.id import build_monster as BuildMonster
27import Identicon
28from pydenticon5 import Pydenticon5
29from .robohash import create_robohash
30from .pagan_optimized import create_optimized_pagan
32from ivatar.settings import AVATAR_MAX_SIZE, JPEG_QUALITY, DEFAULT_AVATAR_SIZE
33from ivatar.settings import CACHE_RESPONSE
34from ivatar.settings import TRUSTED_DEFAULT_URLS
35from ivatar.settings import (
36 DEFAULT_GRAVATARPROXY,
37 DEFAULT_GRAVATARREDIRECT,
38 FORCEDEFAULT,
39)
40from .ivataraccount.models import ConfirmedEmail, ConfirmedOpenId
41from .ivataraccount.models import UnconfirmedEmail, UnconfirmedOpenId
42from .ivataraccount.models import Photo
43from .ivataraccount.models import pil_format, file_format
44from .utils import is_trusted_url, mm_ng, resize_animated_gif
46# Import OpenTelemetry with graceful degradation
47from .telemetry_utils import trace_avatar_operation, get_telemetry_metrics
49avatar_metrics = get_telemetry_metrics()
51# Initialize loggers
52logger = logging.getLogger("ivatar")
53security_logger = logging.getLogger("ivatar.security")
56def get_size(request, size=DEFAULT_AVATAR_SIZE):
57 """
58 Get size from the URL arguments
59 """
60 sizetemp = None
61 if "s" in request.GET:
62 sizetemp = request.GET["s"]
63 if "size" in request.GET:
64 sizetemp = request.GET["size"]
65 if sizetemp:
66 if sizetemp not in ["", "0"]:
67 with contextlib.suppress(ValueError):
68 if int(sizetemp) > 0:
69 size = int(sizetemp)
70 size = min(size, int(AVATAR_MAX_SIZE))
71 return size
74class CachingHttpResponse(HttpResponse):
75 """
76 Handle caching of response
77 """
79 def __init__(
80 self,
81 uri,
82 content=b"",
83 content_type=None,
84 status=200, # pylint: disable=too-many-arguments
85 reason=None,
86 charset=None,
87 ):
88 if CACHE_RESPONSE:
89 caches["filesystem"].set(
90 uri,
91 {
92 "content": content,
93 "content_type": content_type,
94 "status": status,
95 "reason": reason,
96 "charset": charset,
97 },
98 )
99 super().__init__(content, content_type, status, reason, charset)
102class AvatarImageView(TemplateView):
103 """
104 View to return (binary) image, based on OpenID/Email (both by digest)
105 """
107 # TODO: Do cache resize images!! Memcached?
109 def options(self, request, *args, **kwargs):
110 response = HttpResponse("", content_type="text/plain")
111 response["Allow"] = "404 mm mp retro pagan wavatar monsterid robohash identicon"
112 return response
114 @trace_avatar_operation("avatar_request")
115 def get(
116 self, request, *args, **kwargs
117 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,too-many-return-statements
118 """
119 Override get from parent class
120 """
121 model = ConfirmedEmail
122 size = get_size(request)
123 imgformat = "png"
124 obj = None
125 default = None
126 forcedefault = FORCEDEFAULT
127 gravatarredirect = DEFAULT_GRAVATARREDIRECT
128 gravatarproxy = DEFAULT_GRAVATARPROXY
129 uri = request.build_absolute_uri()
131 # Check the cache first
132 if CACHE_RESPONSE:
133 if centry := caches["filesystem"].get(uri):
134 # Record cache hit
135 avatar_metrics.record_cache_hit(size=str(size), format_type=imgformat)
136 # For DEBUG purpose only
137 # print('Cached entry for %s' % uri)
138 return HttpResponse(
139 centry["content"],
140 content_type=centry["content_type"],
141 status=centry["status"],
142 reason=centry["reason"],
143 charset=centry["charset"],
144 )
145 else:
146 # Record cache miss
147 avatar_metrics.record_cache_miss(size=str(size), format_type=imgformat)
149 # In case no digest at all is provided, return to home page
150 if "digest" not in kwargs:
151 return HttpResponseRedirect(reverse_lazy("home"))
153 if "d" in request.GET:
154 default = request.GET["d"]
155 if "default" in request.GET:
156 default = request.GET["default"]
158 if default is not None:
159 if TRUSTED_DEFAULT_URLS is None:
160 logger.warning("Query parameter `default` is disabled.")
161 default = None
162 elif default.find("://") > 0:
163 # Check if it's trusted, if not, reset to None
164 trusted_url = is_trusted_url(default, TRUSTED_DEFAULT_URLS)
166 if not trusted_url:
167 security_logger.warning(
168 f"Default URL is not in trusted URLs: '{default}'; Kicking it!"
169 )
170 default = None
172 if "f" in request.GET:
173 if request.GET["f"] == "y":
174 forcedefault = True
175 if "forcedefault" in request.GET:
176 if request.GET["forcedefault"] == "y":
177 forcedefault = True
179 if "gravatarredirect" in request.GET:
180 if request.GET["gravatarredirect"] == "y":
181 gravatarredirect = True
183 if "gravatarproxy" in request.GET:
184 if request.GET["gravatarproxy"] == "n":
185 gravatarproxy = False
187 try:
188 obj = model.objects.get(digest=kwargs["digest"])
189 except ObjectDoesNotExist:
190 try:
191 obj = model.objects.get(digest_sha256=kwargs["digest"])
192 except ObjectDoesNotExist:
193 model = ConfirmedOpenId
194 with contextlib.suppress(Exception):
195 d = kwargs["digest"] # pylint: disable=invalid-name
196 # OpenID is tricky. http vs. https, versus trailing slash or not
197 # However, some users eventually have added their variations already
198 # and therefore we need to use filter() and first()
199 obj = model.objects.filter(
200 Q(digest=d)
201 | Q(alt_digest1=d)
202 | Q(alt_digest2=d)
203 | Q(alt_digest3=d)
204 ).first()
205 # Handle the special case of Bluesky
206 if obj:
207 if obj.bluesky_handle:
208 return HttpResponseRedirect(
209 reverse_lazy("blueskyproxy", args=[kwargs["digest"]])
210 )
211 # If that mail/openid doesn't exist, or has no photo linked to it
212 if not obj or not obj.photo or forcedefault:
213 gravatar_url = (
214 "https://secure.gravatar.com/avatar/"
215 + kwargs["digest"]
216 + "?s=%i" % size
217 )
219 # If we have redirection to Gravatar enabled, this overrides all
220 # default= settings, except forcedefault!
221 if gravatarredirect and not forcedefault:
222 return HttpResponseRedirect(gravatar_url)
224 # Request to proxy Gravatar image - only if not forcedefault
225 if gravatarproxy and not forcedefault:
226 url = (
227 reverse_lazy("gravatarproxy", args=[kwargs["digest"]])
228 + "?s=%i" % size
229 )
230 # Ensure we do not convert None to string 'None'
231 if default:
232 url += f"&default={default}"
233 return HttpResponseRedirect(url)
235 # Return the default URL, as specified, or 404 Not Found, if default=404
236 if default:
237 # Proxy to gravatar to generate wavatar - lazy me
238 if str(default) == "wavatar":
239 url = (
240 reverse_lazy("gravatarproxy", args=[kwargs["digest"]])
241 + "?s=%i" % size
242 + f"&default={default}&f=y"
243 )
244 return HttpResponseRedirect(url)
246 if str(default) == str(404):
247 return HttpResponseNotFound(_("<h1>Image not found</h1>"))
249 if str(default) == "monsterid":
250 monsterdata = BuildMonster(seed=kwargs["digest"], size=(size, size))
251 data = BytesIO()
252 avatar_metrics.record_avatar_generated(
253 size=str(size), format_type="png", source="monsterid"
254 )
255 return self._return_cached_png(monsterdata, data, uri)
256 if str(default) == "robohash":
257 roboset = request.GET.get("robohash") or "any"
258 data = create_robohash(kwargs["digest"], size, roboset)
259 avatar_metrics.record_avatar_generated(
260 size=str(size), format_type="png", source="robohash"
261 )
262 return self._return_cached_response(data, uri)
263 if str(default) == "retro":
264 identicon = Identicon.render(kwargs["digest"])
265 data = BytesIO()
266 img = Image.open(BytesIO(identicon))
267 img = img.resize((size, size), Image.LANCZOS)
268 avatar_metrics.record_avatar_generated(
269 size=str(size), format_type="png", source="retro"
270 )
271 return self._return_cached_png(img, data, uri)
272 if str(default) == "pagan":
273 data = create_optimized_pagan(kwargs["digest"], size)
274 avatar_metrics.record_avatar_generated(
275 size=str(size), format_type="png", source="pagan"
276 )
277 return self._return_cached_response(data, uri)
278 if str(default) == "identicon":
279 p = Pydenticon5() # pylint: disable=invalid-name
280 # In order to make use of the whole 32 bytes digest, we need to redigest them.
281 newdigest = hashlib.md5(
282 bytes(kwargs["digest"], "utf-8")
283 ).hexdigest()
284 img = p.draw(newdigest, size, 0)
285 data = BytesIO()
286 avatar_metrics.record_avatar_generated(
287 size=str(size), format_type="png", source="identicon"
288 )
289 return self._return_cached_png(img, data, uri)
290 if str(default) == "mmng":
291 mmngimg = mm_ng(idhash=kwargs["digest"], size=size)
292 data = BytesIO()
293 avatar_metrics.record_avatar_generated(
294 size=str(size), format_type="png", source="mmng"
295 )
296 return self._return_cached_png(mmngimg, data, uri)
297 if str(default) in {"mm", "mp"}:
298 return self._redirect_static_w_size("mm", size)
299 return HttpResponseRedirect(default)
301 return self._redirect_static_w_size("nobody", size)
302 imgformat = obj.photo.format
303 photodata = Image.open(BytesIO(obj.photo.data))
305 data = BytesIO()
307 # Animated GIFs need additional handling
308 if imgformat == "gif" and photodata.is_animated:
309 # Debug only
310 # print("Object is animated and has %i frames" % photodata.n_frames)
311 data = resize_animated_gif(photodata, (size, size))
312 else:
313 # If the image is smaller than what was requested, we need
314 # to use the function resize
315 if photodata.size[0] < size or photodata.size[1] < size:
316 photodata = photodata.resize((size, size), Image.LANCZOS)
317 else:
318 photodata.thumbnail((size, size), Image.LANCZOS)
319 photodata.save(data, pil_format(imgformat), quality=JPEG_QUALITY)
321 data.seek(0)
322 obj.photo.access_count += 1
323 obj.photo.save()
324 obj.access_count += 1
325 obj.save()
326 if imgformat == "jpg":
327 imgformat = "jpeg"
329 # Record avatar generation metrics
330 avatar_metrics.record_avatar_generated(
331 size=str(size),
332 format_type=imgformat,
333 source="uploaded" if obj else "generated",
334 )
336 response = CachingHttpResponse(uri, data, content_type=f"image/{imgformat}")
337 # Remove Vary header for images since language doesn't matter
338 response["Vary"] = ""
339 return response
341 def _redirect_static_w_size(self, arg0, size):
342 """
343 Helper method to redirect to static image with size i/a
344 """
345 # If mm is explicitly given, we need to catch that
346 static_img = path.join("static", "img", arg0, f"{str(size)}.png")
347 if not path.isfile(static_img):
348 # We trust this exists!!!
349 static_img = path.join("static", "img", arg0, "512.png")
350 # We trust static/ is mapped to /static/
351 return HttpResponseRedirect(f"/{static_img}")
353 def _return_cached_response(self, data, uri):
354 data.seek(0)
355 response = CachingHttpResponse(uri, data, content_type="image/png")
356 # Remove Vary header for images since language doesn't matter
357 response["Vary"] = ""
358 return response
360 @trace_avatar_operation("generate_png")
361 def _return_cached_png(self, arg0, data, uri):
362 arg0.save(data, "PNG", quality=JPEG_QUALITY)
363 return self._return_cached_response(data, uri)
366class GravatarProxyView(View):
367 """
368 Proxy request to Gravatar and return the image from there
369 """
371 # TODO: Do cache images!! Memcached?
373 @trace_avatar_operation("gravatar_proxy")
374 def get(
375 self, request, *args, **kwargs
376 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
377 """
378 Override get from parent class
379 """
381 def redir_default(default=None):
382 url = (
383 reverse_lazy("avatar_view", args=[kwargs["digest"]])
384 + "?s=%i" % size
385 + "&forcedefault=y"
386 )
387 if default is not None:
388 url += f"&default={default}"
389 return HttpResponseRedirect(url)
391 size = get_size(request)
392 gravatarimagedata = None
393 default = None
395 with contextlib.suppress(Exception):
396 if str(request.GET["default"]) != "None":
397 default = request.GET["default"]
398 if str(default) != "wavatar":
399 # This part is special/hackish
400 # Check if the image returned by Gravatar is their default image, if so,
401 # redirect to our default instead.
402 gravatar_test_url = (
403 "https://secure.gravatar.com/avatar/"
404 + kwargs["digest"]
405 + "?s=%i&d=%i" % (50, 404)
406 )
407 if cache.get(gravatar_test_url) == "default":
408 # DEBUG only
409 # print("Cached Gravatar response: Default.")
410 return redir_default(default)
411 try:
412 urlopen(gravatar_test_url)
413 except HTTPError as exc:
414 if exc.code == 404:
415 cache.set(gravatar_test_url, "default", 60)
416 else:
417 logger.warning(f"Gravatar test url fetch failed: {exc}")
418 return redir_default(default)
420 gravatar_url = (
421 "https://secure.gravatar.com/avatar/" + kwargs["digest"] + "?s=%i" % size
422 )
423 if default:
424 gravatar_url += f"&d={default}"
426 try:
427 if cache.get(gravatar_url) == "err":
428 logger.warning(
429 f"Cached Gravatar fetch failed with URL error: {gravatar_url}"
430 )
431 avatar_metrics.record_external_request("gravatar", 0) # Cached error
432 return redir_default(default)
434 gravatarimagedata = urlopen(gravatar_url)
435 avatar_metrics.record_external_request("gravatar", 200)
436 except HTTPError as exc:
437 if exc.code not in [404, 503]:
438 logger.warning(
439 f"Gravatar fetch failed with an unexpected {exc.code} HTTP error: {gravatar_url}"
440 )
441 avatar_metrics.record_external_request("gravatar", exc.code)
442 cache.set(gravatar_url, "err", 30)
443 return redir_default(default)
444 except URLError as exc:
445 logger.warning(f"Gravatar fetch failed with URL error: {exc.reason}")
446 avatar_metrics.record_external_request("gravatar", 0) # Network error
447 cache.set(gravatar_url, "err", 30)
448 return redir_default(default)
449 except SSLError as exc:
450 logger.warning(f"Gravatar fetch failed with SSL error: {exc.reason}")
451 avatar_metrics.record_external_request("gravatar", 0) # SSL error
452 cache.set(gravatar_url, "err", 30)
453 return redir_default(default)
454 try:
455 data = BytesIO(gravatarimagedata.read())
456 img = Image.open(data)
457 data.seek(0)
458 response = HttpResponse(
459 data.read(), content_type=f"image/{file_format(img.format)}"
460 )
461 # Remove Vary header for images since language doesn't matter
462 response["Vary"] = ""
463 return response
465 except ValueError as exc:
466 logger.error(f"Value error: {exc}")
467 return redir_default(default)
469 # We shouldn't reach this point... But make sure we do something
470 return redir_default(default)
473class BlueskyProxyView(View):
474 """
475 Proxy request to Bluesky and return the image from there
476 """
478 @trace_avatar_operation("bluesky_proxy")
479 def get(
480 self, request, *args, **kwargs
481 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
482 """
483 Override get from parent class
484 """
486 def redir_default(default=None):
487 url = (
488 reverse_lazy("avatar_view", args=[kwargs["digest"]])
489 + "?s=%i" % size
490 + "&forcedefault=y"
491 )
492 if default is not None:
493 url += f"&default={default}"
494 return HttpResponseRedirect(url)
496 size = get_size(request)
497 logger.debug(f"Bluesky avatar size requested: {size}")
498 blueskyimagedata = None
499 default = None
501 with contextlib.suppress(Exception):
502 if str(request.GET["default"]) != "None":
503 default = request.GET["default"]
504 identity = None
506 # First check for email, as this is the most common
507 try:
508 identity = ConfirmedEmail.objects.filter(
509 Q(digest=kwargs["digest"]) | Q(digest_sha256=kwargs["digest"])
510 ).first()
511 except Exception as exc:
512 logger.warning(f"Exception: {exc}")
514 # If no identity is found in the email table, try the openid table
515 if not identity:
516 try:
517 identity = ConfirmedOpenId.objects.filter(
518 Q(digest=kwargs["digest"])
519 | Q(alt_digest1=kwargs["digest"])
520 | Q(alt_digest2=kwargs["digest"])
521 | Q(alt_digest3=kwargs["digest"])
522 ).first()
523 except Exception as exc:
524 logger.warning(f"Exception: {exc}")
526 # If still no identity is found, redirect to the default
527 if not identity:
528 return redir_default(default)
530 bs = Bluesky()
531 bluesky_url = None
532 # Try with the cache first
533 with contextlib.suppress(Exception):
534 if cache.get(identity.bluesky_handle):
535 bluesky_url = cache.get(identity.bluesky_handle)
536 if not bluesky_url:
537 try:
538 bluesky_url = bs.get_avatar(identity.bluesky_handle)
539 cache.set(identity.bluesky_handle, bluesky_url)
540 except Exception: # pylint: disable=bare-except
541 return redir_default(default)
543 try:
544 if cache.get(bluesky_url) == "err":
545 logger.warning(
546 f"Cached Bluesky fetch failed with URL error: {bluesky_url}"
547 )
548 avatar_metrics.record_external_request("bluesky", 0) # Cached error
549 return redir_default(default)
551 blueskyimagedata = urlopen(bluesky_url)
552 avatar_metrics.record_external_request("bluesky", 200)
553 except HTTPError as exc:
554 if exc.code not in [404, 503]:
555 print(
556 f"Bluesky fetch failed with an unexpected {exc.code} HTTP error: {bluesky_url}"
557 )
558 avatar_metrics.record_external_request("bluesky", exc.code)
559 cache.set(bluesky_url, "err", 30)
560 return redir_default(default)
561 except URLError as exc:
562 logger.warning(f"Bluesky fetch failed with URL error: {exc.reason}")
563 avatar_metrics.record_external_request("bluesky", 0) # Network error
564 cache.set(bluesky_url, "err", 30)
565 return redir_default(default)
566 except SSLError as exc:
567 logger.warning(f"Bluesky fetch failed with SSL error: {exc.reason}")
568 avatar_metrics.record_external_request("bluesky", 0) # SSL error
569 cache.set(bluesky_url, "err", 30)
570 return redir_default(default)
571 try:
572 data = BytesIO(blueskyimagedata.read())
573 img = Image.open(data)
574 img_format = img.format
575 if max(img.size) > size:
576 aspect = img.size[0] / float(img.size[1])
577 if aspect > 1:
578 new_size = (size, int(size / aspect))
579 else:
580 new_size = (int(size * aspect), size)
581 img = img.resize(new_size)
582 data = BytesIO()
583 img.save(data, format=img_format)
585 data.seek(0)
586 response = HttpResponse(
587 data.read(), content_type=f"image/{file_format(format)}"
588 )
589 # Remove Vary header for images since language doesn't matter
590 response["Vary"] = ""
591 return response
592 except ValueError as exc:
593 logger.error(f"Value error: {exc}")
594 return redir_default(default)
596 # We shouldn't reach this point... But make sure we do something
597 return redir_default(default)
600class StatsView(TemplateView, JsonResponse):
601 """
602 Return stats
603 """
605 def get(
606 self, request, *args, **kwargs
607 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
608 retval = {
609 "users": User.objects.count(),
610 "mails": ConfirmedEmail.objects.count(),
611 "openids": ConfirmedOpenId.objects.count(), # pylint: disable=no-member
612 "unconfirmed_mails": UnconfirmedEmail.objects.count(), # pylint: disable=no-member
613 "unconfirmed_openids": UnconfirmedOpenId.objects.count(), # pylint: disable=no-member
614 "avatars": Photo.objects.count(), # pylint: disable=no-member
615 }
617 # Top 10 viewed avatars
618 top_photos = Photo.objects.order_by("-access_count")[:10]
619 top_photos_data = []
620 for photo in top_photos:
621 # Find the associated email or openid with highest access count
622 associated_emails = photo.emails.all().order_by("-access_count")
623 associated_openids = photo.openids.all().order_by("-access_count")
625 # Get the one with highest access count
626 top_associated = None
627 if associated_emails and associated_openids:
628 if (
629 associated_emails[0].access_count
630 >= associated_openids[0].access_count
631 ):
632 top_associated = associated_emails[0]
633 else:
634 top_associated = associated_openids[0]
635 elif associated_emails:
636 top_associated = associated_emails[0]
637 elif associated_openids:
638 top_associated = associated_openids[0]
640 if top_associated:
641 if hasattr(top_associated, "email"):
642 # It's a ConfirmedEmail
643 top_photos_data.append(
644 {
645 "access_count": top_associated.access_count,
646 "avatar_url": f"https://libravatar.org/avatar/{top_associated.digest_sha256}",
647 }
648 )
649 else:
650 # It's a ConfirmedOpenId
651 top_photos_data.append(
652 {
653 "access_count": top_associated.access_count,
654 "avatar_url": f"https://libravatar.org/avatar/{top_associated.digest}",
655 }
656 )
658 retval["top_viewed_avatars"] = top_photos_data
660 # Top 10 queried email addresses
661 top_emails = ConfirmedEmail.objects.order_by("-access_count")[:10]
662 top_emails_data = []
663 for email in top_emails:
664 top_emails_data.append(
665 {
666 "access_count": email.access_count,
667 "avatar_url": f"https://libravatar.org/avatar/{email.digest_sha256}",
668 }
669 )
671 retval["top_queried_emails"] = top_emails_data
673 # Top 10 queried OpenIDs
674 top_openids = ConfirmedOpenId.objects.order_by("-access_count")[:10]
675 top_openids_data = []
676 for openid in top_openids:
677 top_openids_data.append(
678 {
679 "access_count": openid.access_count,
680 "avatar_url": f"https://libravatar.org/avatar/{openid.digest}",
681 }
682 )
684 retval["top_queried_openids"] = top_openids_data
686 # Photo format distribution
687 from django.db.models import Count
689 format_distribution = (
690 Photo.objects.values("format")
691 .annotate(count=Count("format"))
692 .order_by("-count")
693 )
694 retval["photo_format_distribution"] = list(format_distribution)
696 # User activity statistics
697 users_with_multiple_photos = (
698 User.objects.annotate(photo_count=Count("photo"))
699 .filter(photo_count__gt=1)
700 .count()
701 )
702 users_with_both_email_and_openid = (
703 User.objects.filter(
704 confirmedemail__isnull=False, confirmedopenid__isnull=False
705 )
706 .distinct()
707 .count()
708 )
710 # Calculate average photos per user
711 total_photos = Photo.objects.count()
712 total_users = User.objects.count()
713 avg_photos_per_user = total_photos / total_users if total_users > 0 else 0
715 retval["user_activity"] = {
716 "users_with_multiple_photos": users_with_multiple_photos,
717 "users_with_both_email_and_openid": users_with_both_email_and_openid,
718 "average_photos_per_user": round(avg_photos_per_user, 2),
719 }
721 # Bluesky handles statistics
722 bluesky_emails = ConfirmedEmail.objects.filter(
723 bluesky_handle__isnull=False
724 ).count()
725 bluesky_openids = ConfirmedOpenId.objects.filter(
726 bluesky_handle__isnull=False
727 ).count()
728 total_bluesky_handles = bluesky_emails + bluesky_openids
730 # Top Bluesky handles by access count
731 retval["bluesky_handles"] = {
732 "total_bluesky_handles": total_bluesky_handles,
733 "bluesky_emails": bluesky_emails,
734 "bluesky_openids": bluesky_openids,
735 }
737 # Average photo size statistics using raw SQL
738 from django.db import connection
740 with connection.cursor() as cursor:
741 # SQL to calculate average photo size
742 cursor.execute(
743 """
744 SELECT
745 COUNT(*) as photo_count,
746 AVG(LENGTH(data)) as avg_size_bytes
747 FROM ivataraccount_photo
748 WHERE data IS NOT NULL
749 """
750 )
751 result = cursor.fetchone()
753 if result and result[0] > 0:
754 photo_count, avg_size_bytes = result
755 # Convert to float in case database returns string
756 avg_size_bytes = float(avg_size_bytes) if avg_size_bytes else 0
757 avg_size_kb = round(avg_size_bytes / 1024, 2) if avg_size_bytes else 0
758 avg_size_mb = (
759 round(avg_size_bytes / (1024 * 1024), 2) if avg_size_bytes else 0
760 )
762 retval["photo_size_stats"] = {
763 "average_size_bytes": (
764 round(avg_size_bytes, 2) if avg_size_bytes else 0
765 ),
766 "average_size_kb": avg_size_kb,
767 "average_size_mb": avg_size_mb,
768 "total_photos_analyzed": photo_count,
769 }
770 else:
771 retval["photo_size_stats"] = {
772 "average_size_bytes": 0,
773 "average_size_kb": 0,
774 "average_size_mb": 0,
775 "total_photos_analyzed": 0,
776 }
778 # For potential duplicate photos, we'll check for photos with the same format and size
779 # Note: This is not definitive - different images can have the same format and size
780 # but it's a good indicator of potential duplicates that might warrant investigation
781 with connection.cursor() as cursor:
782 cursor.execute(
783 """
784 SELECT
785 format,
786 LENGTH(data) as file_size,
787 COUNT(*) as count
788 FROM ivataraccount_photo
789 WHERE data IS NOT NULL
790 GROUP BY format, LENGTH(data)
791 HAVING COUNT(*) > 1
792 ORDER BY count DESC
793 LIMIT 10
794 """
795 )
796 duplicate_groups = cursor.fetchall()
798 total_potential_duplicate_photos = sum(
799 group[2] for group in duplicate_groups
800 )
802 # Convert to list of dictionaries for JSON serialization
803 duplicate_groups_detail = [
804 {"format": group[0], "file_size": group[1], "count": group[2]}
805 for group in duplicate_groups
806 ]
808 retval["potential_duplicate_photos"] = {
809 "potential_duplicate_groups": len(duplicate_groups),
810 "total_potential_duplicate_photos": total_potential_duplicate_photos,
811 "potential_duplicate_groups_detail": duplicate_groups_detail,
812 "note": "Potential duplicates are identified by matching file format and size - not definitive duplicates",
813 }
815 return JsonResponse(retval)
818# Thread-safe version cache - cached indefinitely since container restarts on changes
819_version_cache = None
820_version_cache_lock = threading.Lock()
823def _get_git_info_from_files():
824 """
825 Safely extract git information from .git files without subprocess calls
826 """
827 try:
828 # Get the project root directory
829 project_root = path.dirname(path.dirname(path.abspath(__file__)))
830 git_dir = path.join(project_root, ".git")
832 if not path.exists(git_dir):
833 return None
835 # Read HEAD to get current branch/commit
836 head_file = path.join(git_dir, "HEAD")
837 if not path.exists(head_file):
838 return None
840 with open(head_file) as f:
841 head_content = f.read().strip()
843 # Parse HEAD content
844 if head_content.startswith("ref: "):
845 # We're on a branch
846 branch_ref = head_content[5:] # Remove 'ref: '
847 branch_name = path.basename(branch_ref)
849 # Read the commit hash from the ref
850 ref_file = path.join(git_dir, branch_ref)
851 if path.exists(ref_file):
852 with open(ref_file) as f:
853 commit_hash = f.read().strip()
854 else:
855 return None
856 else:
857 # Detached HEAD state
858 commit_hash = head_content
859 branch_name = "detached"
861 # Try to get commit date from git log file (if available)
862 # Optimize: read only the last line instead of entire file
863 commit_date = None
864 log_file = path.join(git_dir, "logs", "HEAD")
865 if path.exists(log_file):
866 try:
867 with open(log_file, "rb") as f:
868 # Seek to end and read backwards to find last line
869 f.seek(0, 2) # Seek to end
870 file_size = f.tell()
872 # Read backwards in chunks to find the last line
873 chunk_size = min(1024, file_size)
874 f.seek(max(0, file_size - chunk_size))
875 chunk = f.read().decode("utf-8", errors="ignore")
877 # Find the last non-empty line
878 lines = chunk.split("\n")
879 last_line = None
880 for line in reversed(lines):
881 if line.strip():
882 last_line = line.strip()
883 break
885 if last_line:
886 # Git log format: <old_hash> <new_hash> <author> <timestamp> <timezone> <message>
887 # The format uses spaces, not tabs
888 parts = last_line.split()
889 if len(parts) >= 6:
890 # Extract timestamp and convert to readable date
891 # Format: <old_hash> <new_hash> <author_name> <author_email> <timestamp> <timezone> <message>
892 # We need to find the timestamp which is after the author email
893 for i, part in enumerate(parts):
894 if part.isdigit() and len(part) == 10: # Unix timestamp
895 import datetime
897 timestamp = int(part)
898 commit_date = datetime.datetime.fromtimestamp(
899 timestamp
900 ).strftime("%Y-%m-%d %H:%M:%S %z")
901 break
902 except (ValueError, IndexError, UnicodeDecodeError):
903 pass
905 # Fallback: try to get date from commit object if available
906 if not commit_date and len(commit_hash) == 40:
907 try:
908 commit_dir = path.join(git_dir, "objects", commit_hash[:2])
909 commit_file = path.join(commit_dir, commit_hash[2:])
910 if path.exists(commit_file):
911 # This would require decompressing the git object, which is complex
912 # For now, we'll use a placeholder
913 commit_date = "unknown"
914 except Exception:
915 commit_date = "unknown"
917 # Get deployment date from file modification time
918 # Use manage.py as it's always updated during deployment
919 deployment_date = None
920 manage_py_path = path.join(project_root, "manage.py")
921 if path.exists(manage_py_path):
922 try:
923 import datetime
925 mtime = path.getmtime(manage_py_path)
926 deployment_date = datetime.datetime.fromtimestamp(mtime).strftime(
927 "%Y-%m-%d %H:%M:%S %z"
928 )
929 except Exception:
930 deployment_date = "unknown"
932 return {
933 "commit_hash": commit_hash,
934 "short_hash": commit_hash[:7] if len(commit_hash) >= 7 else commit_hash,
935 "branch": branch_name,
936 "commit_date": commit_date or "unknown",
937 "deployment_date": deployment_date or "unknown",
938 "deployment_status": "active",
939 "version": f"{branch_name}-{commit_hash[:7] if len(commit_hash) >= 7 else commit_hash}",
940 }
942 except Exception as exc:
943 logger.warning(f"Failed to read git info from files: {exc}")
944 return None
947def _get_cached_version_info():
948 """
949 Get cached version information, loading it if not available
950 Since containers restart on content changes, cache indefinitely
951 """
952 global _version_cache
954 with _version_cache_lock:
955 if _version_cache is None:
956 # Get version info from git files
957 _version_cache = _get_git_info_from_files()
959 # If that fails, return error
960 if _version_cache is None:
961 _version_cache = {
962 "error": "Unable to determine version - .git directory not found",
963 "deployment_status": "unknown",
964 }
966 return _version_cache
969class DeploymentVersionView(View):
970 """
971 View to return deployment version information for CI/CD verification
972 Uses cached version info to prevent DDoS attacks and improve performance
973 """
975 def get(self, request, *args, **kwargs):
976 """
977 Return cached deployment version information including application version
978 """
979 from django.conf import settings
981 version_info = _get_cached_version_info()
983 if "error" in version_info:
984 # Even on error, include the application version if available
985 try:
986 version_info["application_version"] = getattr(
987 settings, "IVATAR_VERSION", "unknown"
988 )
989 except Exception:
990 pass
991 return JsonResponse(version_info, status=500)
993 # Add application version to the response
994 try:
995 version_info["application_version"] = getattr(
996 settings, "IVATAR_VERSION", "unknown"
997 )
998 except Exception:
999 version_info["application_version"] = "unknown"
1001 return JsonResponse(version_info)