Coverage for ivatar/views.py: 48%
499 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 23:06 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 23:06 +0000
1# -*- coding: utf-8 -*-
2"""
3views under /
4"""
6import contextlib
7from io import BytesIO
8from os import path
9import hashlib
10import logging
11import threading
12from ivatar.utils import urlopen, Bluesky
13from urllib.error import HTTPError, URLError
14from ssl import SSLError
15from django.views.generic.base import TemplateView, View
16from django.http import HttpResponse, HttpResponseRedirect
17from django.http import HttpResponseNotFound, JsonResponse
18from django.core.exceptions import ObjectDoesNotExist
19from django.core.cache import cache, caches
20from django.utils.translation import gettext_lazy as _
21from django.urls import reverse_lazy
22from django.db.models import Q
23from django.contrib.auth.models import User
25from PIL import Image
27from monsterid.id import build_monster as BuildMonster
28import Identicon
29from pydenticon5 import Pydenticon5
30import pagan
31from robohash import Robohash
33from ivatar.settings import AVATAR_MAX_SIZE, JPEG_QUALITY, DEFAULT_AVATAR_SIZE
34from ivatar.settings import CACHE_RESPONSE
35from ivatar.settings import CACHE_IMAGES_MAX_AGE
36from ivatar.settings import TRUSTED_DEFAULT_URLS
37from .ivataraccount.models import ConfirmedEmail, ConfirmedOpenId
38from .ivataraccount.models import UnconfirmedEmail, UnconfirmedOpenId
39from .ivataraccount.models import Photo
40from .ivataraccount.models import pil_format, file_format
41from .utils import is_trusted_url, mm_ng, resize_animated_gif
43# Import OpenTelemetry (always enabled, export controlled by OTEL_EXPORT_ENABLED)
44try:
45 from .opentelemetry_middleware import trace_avatar_operation, get_avatar_metrics
47 avatar_metrics = get_avatar_metrics()
48except ImportError:
49 # OpenTelemetry packages not installed
50 def trace_avatar_operation(operation_name):
51 def decorator(func):
52 return func
54 return decorator
56 class NoOpMetrics:
57 def record_avatar_generated(self, *args, **kwargs):
58 pass
60 def record_cache_hit(self, *args, **kwargs):
61 pass
63 def record_cache_miss(self, *args, **kwargs):
64 pass
66 def record_external_request(self, *args, **kwargs):
67 pass
69 def record_file_upload(self, *args, **kwargs):
70 pass
72 avatar_metrics = NoOpMetrics()
74# Initialize loggers
75logger = logging.getLogger("ivatar")
76security_logger = logging.getLogger("ivatar.security")
79def get_size(request, size=DEFAULT_AVATAR_SIZE):
80 """
81 Get size from the URL arguments
82 """
83 sizetemp = None
84 if "s" in request.GET:
85 sizetemp = request.GET["s"]
86 if "size" in request.GET:
87 sizetemp = request.GET["size"]
88 if sizetemp:
89 if sizetemp not in ["", "0"]:
90 with contextlib.suppress(ValueError):
91 if int(sizetemp) > 0:
92 size = int(sizetemp)
93 size = min(size, int(AVATAR_MAX_SIZE))
94 return size
97class CachingHttpResponse(HttpResponse):
98 """
99 Handle caching of response
100 """
102 def __init__(
103 self,
104 uri,
105 content=b"",
106 content_type=None,
107 status=200, # pylint: disable=too-many-arguments
108 reason=None,
109 charset=None,
110 ):
111 if CACHE_RESPONSE:
112 caches["filesystem"].set(
113 uri,
114 {
115 "content": content,
116 "content_type": content_type,
117 "status": status,
118 "reason": reason,
119 "charset": charset,
120 },
121 )
122 super().__init__(content, content_type, status, reason, charset)
125class AvatarImageView(TemplateView):
126 """
127 View to return (binary) image, based on OpenID/Email (both by digest)
128 """
130 # TODO: Do cache resize images!! Memcached?
132 def options(self, request, *args, **kwargs):
133 response = HttpResponse("", content_type="text/plain")
134 response["Allow"] = "404 mm mp retro pagan wavatar monsterid robohash identicon"
135 return response
137 def get(
138 self, request, *args, **kwargs
139 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,too-many-return-statements
140 """
141 Override get from parent class
142 """
143 model = ConfirmedEmail
144 size = get_size(request)
145 imgformat = "png"
146 obj = None
147 default = None
148 forcedefault = False
149 gravatarredirect = False
150 gravatarproxy = True
151 uri = request.build_absolute_uri()
153 # Check the cache first
154 if CACHE_RESPONSE:
155 if centry := caches["filesystem"].get(uri):
156 # Record cache hit
157 avatar_metrics.record_cache_hit(size=str(size), format_type=imgformat)
158 # For DEBUG purpose only
159 # print('Cached entry for %s' % uri)
160 return HttpResponse(
161 centry["content"],
162 content_type=centry["content_type"],
163 status=centry["status"],
164 reason=centry["reason"],
165 charset=centry["charset"],
166 )
167 else:
168 # Record cache miss
169 avatar_metrics.record_cache_miss(size=str(size), format_type=imgformat)
171 # In case no digest at all is provided, return to home page
172 if "digest" not in kwargs:
173 return HttpResponseRedirect(reverse_lazy("home"))
175 if "d" in request.GET:
176 default = request.GET["d"]
177 if "default" in request.GET:
178 default = request.GET["default"]
180 if default is not None:
181 if TRUSTED_DEFAULT_URLS is None:
182 logger.warning("Query parameter `default` is disabled.")
183 default = None
184 elif default.find("://") > 0:
185 # Check if it's trusted, if not, reset to None
186 trusted_url = is_trusted_url(default, TRUSTED_DEFAULT_URLS)
188 if not trusted_url:
189 security_logger.warning(
190 f"Default URL is not in trusted URLs: '{default}'; Kicking it!"
191 )
192 default = None
194 if "f" in request.GET:
195 if request.GET["f"] == "y":
196 forcedefault = True
197 if "forcedefault" in request.GET:
198 if request.GET["forcedefault"] == "y":
199 forcedefault = True
201 if "gravatarredirect" in request.GET:
202 if request.GET["gravatarredirect"] == "y":
203 gravatarredirect = True
205 if "gravatarproxy" in request.GET:
206 if request.GET["gravatarproxy"] == "n":
207 gravatarproxy = False
209 try:
210 obj = model.objects.get(digest=kwargs["digest"])
211 except ObjectDoesNotExist:
212 try:
213 obj = model.objects.get(digest_sha256=kwargs["digest"])
214 except ObjectDoesNotExist:
215 model = ConfirmedOpenId
216 with contextlib.suppress(Exception):
217 d = kwargs["digest"] # pylint: disable=invalid-name
218 # OpenID is tricky. http vs. https, versus trailing slash or not
219 # However, some users eventually have added their variations already
220 # and therefore we need to use filter() and first()
221 obj = model.objects.filter(
222 Q(digest=d)
223 | Q(alt_digest1=d)
224 | Q(alt_digest2=d)
225 | Q(alt_digest3=d)
226 ).first()
227 # Handle the special case of Bluesky
228 if obj:
229 if obj.bluesky_handle:
230 return HttpResponseRedirect(
231 reverse_lazy("blueskyproxy", args=[kwargs["digest"]])
232 )
233 # If that mail/openid doesn't exist, or has no photo linked to it
234 if not obj or not obj.photo or forcedefault:
235 gravatar_url = (
236 "https://secure.gravatar.com/avatar/"
237 + kwargs["digest"]
238 + "?s=%i" % size
239 )
241 # If we have redirection to Gravatar enabled, this overrides all
242 # default= settings, except forcedefault!
243 if gravatarredirect and not forcedefault:
244 return HttpResponseRedirect(gravatar_url)
246 # Request to proxy Gravatar image - only if not forcedefault
247 if gravatarproxy and not forcedefault:
248 url = (
249 reverse_lazy("gravatarproxy", args=[kwargs["digest"]])
250 + "?s=%i" % size
251 )
252 # Ensure we do not convert None to string 'None'
253 if default:
254 url += f"&default={default}"
255 return HttpResponseRedirect(url)
257 # Return the default URL, as specified, or 404 Not Found, if default=404
258 if default:
259 # Proxy to gravatar to generate wavatar - lazy me
260 if str(default) == "wavatar":
261 url = (
262 reverse_lazy("gravatarproxy", args=[kwargs["digest"]])
263 + "?s=%i" % size
264 + f"&default={default}&f=y"
265 )
266 return HttpResponseRedirect(url)
268 if str(default) == str(404):
269 return HttpResponseNotFound(_("<h1>Image not found</h1>"))
271 if str(default) == "monsterid":
272 monsterdata = BuildMonster(seed=kwargs["digest"], size=(size, size))
273 data = BytesIO()
274 return self._return_cached_png(monsterdata, data, uri)
275 if str(default) == "robohash":
276 roboset = request.GET.get("robohash") or "any"
277 robohash = Robohash(kwargs["digest"])
278 robohash.assemble(roboset=roboset, sizex=size, sizey=size)
279 data = BytesIO()
280 robohash.img.save(data, format="png")
281 return self._return_cached_response(data, uri)
282 if str(default) == "retro":
283 identicon = Identicon.render(kwargs["digest"])
284 data = BytesIO()
285 img = Image.open(BytesIO(identicon))
286 img = img.resize((size, size), Image.LANCZOS)
287 return self._return_cached_png(img, data, uri)
288 if str(default) == "pagan":
289 paganobj = pagan.Avatar(kwargs["digest"])
290 data = BytesIO()
291 img = paganobj.img.resize((size, size), Image.LANCZOS)
292 return self._return_cached_png(img, data, uri)
293 if str(default) == "identicon":
294 p = Pydenticon5() # pylint: disable=invalid-name
295 # In order to make use of the whole 32 bytes digest, we need to redigest them.
296 newdigest = hashlib.md5(
297 bytes(kwargs["digest"], "utf-8")
298 ).hexdigest()
299 img = p.draw(newdigest, size, 0)
300 data = BytesIO()
301 return self._return_cached_png(img, data, uri)
302 if str(default) == "mmng":
303 mmngimg = mm_ng(idhash=kwargs["digest"], size=size)
304 data = BytesIO()
305 return self._return_cached_png(mmngimg, data, uri)
306 if str(default) in {"mm", "mp"}:
307 return self._redirect_static_w_size("mm", size)
308 return HttpResponseRedirect(default)
310 return self._redirect_static_w_size("nobody", size)
311 imgformat = obj.photo.format
312 photodata = Image.open(BytesIO(obj.photo.data))
314 data = BytesIO()
316 # Animated GIFs need additional handling
317 if imgformat == "gif" and photodata.is_animated:
318 # Debug only
319 # print("Object is animated and has %i frames" % photodata.n_frames)
320 data = resize_animated_gif(photodata, (size, size))
321 else:
322 # If the image is smaller than what was requested, we need
323 # to use the function resize
324 if photodata.size[0] < size or photodata.size[1] < size:
325 photodata = photodata.resize((size, size), Image.LANCZOS)
326 else:
327 photodata.thumbnail((size, size), Image.LANCZOS)
328 photodata.save(data, pil_format(imgformat), quality=JPEG_QUALITY)
330 data.seek(0)
331 obj.photo.access_count += 1
332 obj.photo.save()
333 obj.access_count += 1
334 obj.save()
335 if imgformat == "jpg":
336 imgformat = "jpeg"
338 # Record avatar generation metrics
339 avatar_metrics.record_avatar_generated(
340 size=str(size),
341 format_type=imgformat,
342 source="uploaded" if obj else "generated",
343 )
345 response = CachingHttpResponse(uri, data, content_type=f"image/{imgformat}")
346 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
347 # Remove Vary header for images since language doesn't matter
348 response["Vary"] = ""
349 return response
351 def _redirect_static_w_size(self, arg0, size):
352 """
353 Helper method to redirect to static image with size i/a
354 """
355 # If mm is explicitly given, we need to catch that
356 static_img = path.join("static", "img", arg0, f"{str(size)}.png")
357 if not path.isfile(static_img):
358 # We trust this exists!!!
359 static_img = path.join("static", "img", arg0, "512.png")
360 # We trust static/ is mapped to /static/
361 return HttpResponseRedirect(f"/{static_img}")
363 def _return_cached_response(self, data, uri):
364 data.seek(0)
365 response = CachingHttpResponse(uri, data, content_type="image/png")
366 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
367 # Remove Vary header for images since language doesn't matter
368 response["Vary"] = ""
369 return response
371 @trace_avatar_operation("generate_png")
372 def _return_cached_png(self, arg0, data, uri):
373 arg0.save(data, "PNG", quality=JPEG_QUALITY)
374 return self._return_cached_response(data, uri)
377class GravatarProxyView(View):
378 """
379 Proxy request to Gravatar and return the image from there
380 """
382 # TODO: Do cache images!! Memcached?
384 @trace_avatar_operation("gravatar_proxy")
385 def get(
386 self, request, *args, **kwargs
387 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
388 """
389 Override get from parent class
390 """
392 def redir_default(default=None):
393 url = (
394 reverse_lazy("avatar_view", args=[kwargs["digest"]])
395 + "?s=%i" % size
396 + "&forcedefault=y"
397 )
398 if default is not None:
399 url += f"&default={default}"
400 return HttpResponseRedirect(url)
402 size = get_size(request)
403 gravatarimagedata = None
404 default = None
406 with contextlib.suppress(Exception):
407 if str(request.GET["default"]) != "None":
408 default = request.GET["default"]
409 if str(default) != "wavatar":
410 # This part is special/hackish
411 # Check if the image returned by Gravatar is their default image, if so,
412 # redirect to our default instead.
413 gravatar_test_url = (
414 "https://secure.gravatar.com/avatar/"
415 + kwargs["digest"]
416 + "?s=%i&d=%i" % (50, 404)
417 )
418 if cache.get(gravatar_test_url) == "default":
419 # DEBUG only
420 # print("Cached Gravatar response: Default.")
421 return redir_default(default)
422 try:
423 urlopen(gravatar_test_url)
424 except HTTPError as exc:
425 if exc.code == 404:
426 cache.set(gravatar_test_url, "default", 60)
427 else:
428 logger.warning(f"Gravatar test url fetch failed: {exc}")
429 return redir_default(default)
431 gravatar_url = (
432 "https://secure.gravatar.com/avatar/" + kwargs["digest"] + "?s=%i" % size
433 )
434 if default:
435 gravatar_url += f"&d={default}"
437 try:
438 if cache.get(gravatar_url) == "err":
439 logger.warning(
440 f"Cached Gravatar fetch failed with URL error: {gravatar_url}"
441 )
442 return redir_default(default)
444 gravatarimagedata = urlopen(gravatar_url)
445 except HTTPError as exc:
446 if exc.code not in [404, 503]:
447 logger.warning(
448 f"Gravatar fetch failed with an unexpected {exc.code} HTTP error: {gravatar_url}"
449 )
450 cache.set(gravatar_url, "err", 30)
451 return redir_default(default)
452 except URLError as exc:
453 logger.warning(f"Gravatar fetch failed with URL error: {exc.reason}")
454 cache.set(gravatar_url, "err", 30)
455 return redir_default(default)
456 except SSLError as exc:
457 logger.warning(f"Gravatar fetch failed with SSL error: {exc.reason}")
458 cache.set(gravatar_url, "err", 30)
459 return redir_default(default)
460 try:
461 data = BytesIO(gravatarimagedata.read())
462 img = Image.open(data)
463 data.seek(0)
464 response = HttpResponse(
465 data.read(), content_type=f"image/{file_format(img.format)}"
466 )
467 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
468 # Remove Vary header for images since language doesn't matter
469 response["Vary"] = ""
470 return response
472 except ValueError as exc:
473 logger.error(f"Value error: {exc}")
474 return redir_default(default)
476 # We shouldn't reach this point... But make sure we do something
477 return redir_default(default)
480class BlueskyProxyView(View):
481 """
482 Proxy request to Bluesky and return the image from there
483 """
485 def get(
486 self, request, *args, **kwargs
487 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
488 """
489 Override get from parent class
490 """
492 def redir_default(default=None):
493 url = (
494 reverse_lazy("avatar_view", args=[kwargs["digest"]])
495 + "?s=%i" % size
496 + "&forcedefault=y"
497 )
498 if default is not None:
499 url += f"&default={default}"
500 return HttpResponseRedirect(url)
502 size = get_size(request)
503 logger.debug(f"Bluesky avatar size requested: {size}")
504 blueskyimagedata = None
505 default = None
507 with contextlib.suppress(Exception):
508 if str(request.GET["default"]) != "None":
509 default = request.GET["default"]
510 identity = None
512 # First check for email, as this is the most common
513 try:
514 identity = ConfirmedEmail.objects.filter(
515 Q(digest=kwargs["digest"]) | Q(digest_sha256=kwargs["digest"])
516 ).first()
517 except Exception as exc:
518 logger.warning(f"Exception: {exc}")
520 # If no identity is found in the email table, try the openid table
521 if not identity:
522 try:
523 identity = ConfirmedOpenId.objects.filter(
524 Q(digest=kwargs["digest"])
525 | Q(alt_digest1=kwargs["digest"])
526 | Q(alt_digest2=kwargs["digest"])
527 | Q(alt_digest3=kwargs["digest"])
528 ).first()
529 except Exception as exc:
530 logger.warning(f"Exception: {exc}")
532 # If still no identity is found, redirect to the default
533 if not identity:
534 return redir_default(default)
536 bs = Bluesky()
537 bluesky_url = None
538 # Try with the cache first
539 with contextlib.suppress(Exception):
540 if cache.get(identity.bluesky_handle):
541 bluesky_url = cache.get(identity.bluesky_handle)
542 if not bluesky_url:
543 try:
544 bluesky_url = bs.get_avatar(identity.bluesky_handle)
545 cache.set(identity.bluesky_handle, bluesky_url)
546 except Exception: # pylint: disable=bare-except
547 return redir_default(default)
549 try:
550 if cache.get(bluesky_url) == "err":
551 logger.warning(
552 f"Cached Bluesky fetch failed with URL error: {bluesky_url}"
553 )
554 return redir_default(default)
556 blueskyimagedata = urlopen(bluesky_url)
557 except HTTPError as exc:
558 if exc.code not in [404, 503]:
559 print(
560 f"Bluesky fetch failed with an unexpected {exc.code} HTTP error: {bluesky_url}"
561 )
562 cache.set(bluesky_url, "err", 30)
563 return redir_default(default)
564 except URLError as exc:
565 logger.warning(f"Bluesky fetch failed with URL error: {exc.reason}")
566 cache.set(bluesky_url, "err", 30)
567 return redir_default(default)
568 except SSLError as exc:
569 logger.warning(f"Bluesky fetch failed with SSL error: {exc.reason}")
570 cache.set(bluesky_url, "err", 30)
571 return redir_default(default)
572 try:
573 data = BytesIO(blueskyimagedata.read())
574 img = Image.open(data)
575 img_format = img.format
576 if max(img.size) > size:
577 aspect = img.size[0] / float(img.size[1])
578 if aspect > 1:
579 new_size = (size, int(size / aspect))
580 else:
581 new_size = (int(size * aspect), size)
582 img = img.resize(new_size)
583 data = BytesIO()
584 img.save(data, format=img_format)
586 data.seek(0)
587 response = HttpResponse(
588 data.read(), content_type=f"image/{file_format(format)}"
589 )
590 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
591 # Remove Vary header for images since language doesn't matter
592 response["Vary"] = ""
593 return response
594 except ValueError as exc:
595 logger.error(f"Value error: {exc}")
596 return redir_default(default)
598 # We shouldn't reach this point... But make sure we do something
599 return redir_default(default)
602class StatsView(TemplateView, JsonResponse):
603 """
604 Return stats
605 """
607 def get(
608 self, request, *args, **kwargs
609 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
610 retval = {
611 "users": User.objects.count(),
612 "mails": ConfirmedEmail.objects.count(),
613 "openids": ConfirmedOpenId.objects.count(), # pylint: disable=no-member
614 "unconfirmed_mails": UnconfirmedEmail.objects.count(), # pylint: disable=no-member
615 "unconfirmed_openids": UnconfirmedOpenId.objects.count(), # pylint: disable=no-member
616 "avatars": Photo.objects.count(), # pylint: disable=no-member
617 }
619 # Top 10 viewed avatars
620 top_photos = Photo.objects.order_by("-access_count")[:10]
621 top_photos_data = []
622 for photo in top_photos:
623 # Find the associated email or openid with highest access count
624 associated_emails = photo.emails.all().order_by("-access_count")
625 associated_openids = photo.openids.all().order_by("-access_count")
627 # Get the one with highest access count
628 top_associated = None
629 if associated_emails and associated_openids:
630 if (
631 associated_emails[0].access_count
632 >= associated_openids[0].access_count
633 ):
634 top_associated = associated_emails[0]
635 else:
636 top_associated = associated_openids[0]
637 elif associated_emails:
638 top_associated = associated_emails[0]
639 elif associated_openids:
640 top_associated = associated_openids[0]
642 if top_associated:
643 if hasattr(top_associated, "email"):
644 # It's a ConfirmedEmail
645 top_photos_data.append(
646 {
647 "access_count": top_associated.access_count,
648 "avatar_url": f"https://libravatar.org/avatar/{top_associated.digest_sha256}",
649 }
650 )
651 else:
652 # It's a ConfirmedOpenId
653 top_photos_data.append(
654 {
655 "access_count": top_associated.access_count,
656 "avatar_url": f"https://libravatar.org/avatar/{top_associated.digest}",
657 }
658 )
660 retval["top_viewed_avatars"] = top_photos_data
662 # Top 10 queried email addresses
663 top_emails = ConfirmedEmail.objects.order_by("-access_count")[:10]
664 top_emails_data = []
665 for email in top_emails:
666 top_emails_data.append(
667 {
668 "access_count": email.access_count,
669 "avatar_url": f"https://libravatar.org/avatar/{email.digest_sha256}",
670 }
671 )
673 retval["top_queried_emails"] = top_emails_data
675 # Top 10 queried OpenIDs
676 top_openids = ConfirmedOpenId.objects.order_by("-access_count")[:10]
677 top_openids_data = []
678 for openid in top_openids:
679 top_openids_data.append(
680 {
681 "access_count": openid.access_count,
682 "avatar_url": f"https://libravatar.org/avatar/{openid.digest}",
683 }
684 )
686 retval["top_queried_openids"] = top_openids_data
688 # Photo format distribution
689 from django.db.models import Count
691 format_distribution = (
692 Photo.objects.values("format")
693 .annotate(count=Count("format"))
694 .order_by("-count")
695 )
696 retval["photo_format_distribution"] = list(format_distribution)
698 # User activity statistics
699 users_with_multiple_photos = (
700 User.objects.annotate(photo_count=Count("photo"))
701 .filter(photo_count__gt=1)
702 .count()
703 )
704 users_with_both_email_and_openid = (
705 User.objects.filter(
706 confirmedemail__isnull=False, confirmedopenid__isnull=False
707 )
708 .distinct()
709 .count()
710 )
712 # Calculate average photos per user
713 total_photos = Photo.objects.count()
714 total_users = User.objects.count()
715 avg_photos_per_user = total_photos / total_users if total_users > 0 else 0
717 retval["user_activity"] = {
718 "users_with_multiple_photos": users_with_multiple_photos,
719 "users_with_both_email_and_openid": users_with_both_email_and_openid,
720 "average_photos_per_user": round(avg_photos_per_user, 2),
721 }
723 # Bluesky handles statistics
724 bluesky_emails = ConfirmedEmail.objects.filter(
725 bluesky_handle__isnull=False
726 ).count()
727 bluesky_openids = ConfirmedOpenId.objects.filter(
728 bluesky_handle__isnull=False
729 ).count()
730 total_bluesky_handles = bluesky_emails + bluesky_openids
732 # Top Bluesky handles by access count
733 retval["bluesky_handles"] = {
734 "total_bluesky_handles": total_bluesky_handles,
735 "bluesky_emails": bluesky_emails,
736 "bluesky_openids": bluesky_openids,
737 }
739 # Average photo size statistics using raw SQL
740 from django.db import connection
742 with connection.cursor() as cursor:
743 # SQL to calculate average photo size
744 cursor.execute(
745 """
746 SELECT
747 COUNT(*) as photo_count,
748 AVG(LENGTH(data)) as avg_size_bytes
749 FROM ivataraccount_photo
750 WHERE data IS NOT NULL
751 """
752 )
753 result = cursor.fetchone()
755 if result and result[0] > 0:
756 photo_count, avg_size_bytes = result
757 # Convert to float in case database returns string
758 avg_size_bytes = float(avg_size_bytes) if avg_size_bytes else 0
759 avg_size_kb = round(avg_size_bytes / 1024, 2) if avg_size_bytes else 0
760 avg_size_mb = (
761 round(avg_size_bytes / (1024 * 1024), 2) if avg_size_bytes else 0
762 )
764 retval["photo_size_stats"] = {
765 "average_size_bytes": round(avg_size_bytes, 2)
766 if avg_size_bytes
767 else 0,
768 "average_size_kb": avg_size_kb,
769 "average_size_mb": avg_size_mb,
770 "total_photos_analyzed": photo_count,
771 }
772 else:
773 retval["photo_size_stats"] = {
774 "average_size_bytes": 0,
775 "average_size_kb": 0,
776 "average_size_mb": 0,
777 "total_photos_analyzed": 0,
778 }
780 # For potential duplicate photos, we'll check for photos with the same format and size
781 # Note: This is not definitive - different images can have the same format and size
782 # but it's a good indicator of potential duplicates that might warrant investigation
783 with connection.cursor() as cursor:
784 cursor.execute(
785 """
786 SELECT
787 format,
788 LENGTH(data) as file_size,
789 COUNT(*) as count
790 FROM ivataraccount_photo
791 WHERE data IS NOT NULL
792 GROUP BY format, LENGTH(data)
793 HAVING COUNT(*) > 1
794 ORDER BY count DESC
795 LIMIT 10
796 """
797 )
798 duplicate_groups = cursor.fetchall()
800 total_potential_duplicate_photos = sum(
801 group[2] for group in duplicate_groups
802 )
804 # Convert to list of dictionaries for JSON serialization
805 duplicate_groups_detail = [
806 {"format": group[0], "file_size": group[1], "count": group[2]}
807 for group in duplicate_groups
808 ]
810 retval["potential_duplicate_photos"] = {
811 "potential_duplicate_groups": len(duplicate_groups),
812 "total_potential_duplicate_photos": total_potential_duplicate_photos,
813 "potential_duplicate_groups_detail": duplicate_groups_detail,
814 "note": "Potential duplicates are identified by matching file format and size - not definitive duplicates",
815 }
817 return JsonResponse(retval)
820# Thread-safe version cache - cached indefinitely since container restarts on changes
821_version_cache = None
822_version_cache_lock = threading.Lock()
825def _get_git_info_from_files():
826 """
827 Safely extract git information from .git files without subprocess calls
828 """
829 try:
830 # Get the project root directory
831 project_root = path.dirname(path.dirname(path.abspath(__file__)))
832 git_dir = path.join(project_root, ".git")
834 if not path.exists(git_dir):
835 return None
837 # Read HEAD to get current branch/commit
838 head_file = path.join(git_dir, "HEAD")
839 if not path.exists(head_file):
840 return None
842 with open(head_file, "r") as f:
843 head_content = f.read().strip()
845 # Parse HEAD content
846 if head_content.startswith("ref: "):
847 # We're on a branch
848 branch_ref = head_content[5:] # Remove 'ref: '
849 branch_name = path.basename(branch_ref)
851 # Read the commit hash from the ref
852 ref_file = path.join(git_dir, branch_ref)
853 if path.exists(ref_file):
854 with open(ref_file, "r") as f:
855 commit_hash = f.read().strip()
856 else:
857 return None
858 else:
859 # Detached HEAD state
860 commit_hash = head_content
861 branch_name = "detached"
863 # Try to get commit date from git log file (if available)
864 # Optimize: read only the last line instead of entire file
865 commit_date = None
866 log_file = path.join(git_dir, "logs", "HEAD")
867 if path.exists(log_file):
868 try:
869 with open(log_file, "rb") as f:
870 # Seek to end and read backwards to find last line
871 f.seek(0, 2) # Seek to end
872 file_size = f.tell()
874 # Read backwards in chunks to find the last line
875 chunk_size = min(1024, file_size)
876 f.seek(max(0, file_size - chunk_size))
877 chunk = f.read().decode("utf-8", errors="ignore")
879 # Find the last non-empty line
880 lines = chunk.split("\n")
881 last_line = None
882 for line in reversed(lines):
883 if line.strip():
884 last_line = line.strip()
885 break
887 if last_line:
888 # Git log format: <old_hash> <new_hash> <author> <timestamp> <timezone> <message>
889 # The format uses spaces, not tabs
890 parts = last_line.split()
891 if len(parts) >= 6:
892 # Extract timestamp and convert to readable date
893 # Format: <old_hash> <new_hash> <author_name> <author_email> <timestamp> <timezone> <message>
894 # We need to find the timestamp which is after the author email
895 for i, part in enumerate(parts):
896 if part.isdigit() and len(part) == 10: # Unix timestamp
897 import datetime
899 timestamp = int(part)
900 commit_date = datetime.datetime.fromtimestamp(
901 timestamp
902 ).strftime("%Y-%m-%d %H:%M:%S %z")
903 break
904 except (ValueError, IndexError, UnicodeDecodeError):
905 pass
907 # Fallback: try to get date from commit object if available
908 if not commit_date and len(commit_hash) == 40:
909 try:
910 commit_dir = path.join(git_dir, "objects", commit_hash[:2])
911 commit_file = path.join(commit_dir, commit_hash[2:])
912 if path.exists(commit_file):
913 # This would require decompressing the git object, which is complex
914 # For now, we'll use a placeholder
915 commit_date = "unknown"
916 except Exception:
917 commit_date = "unknown"
919 # Get deployment date from file modification time
920 # Use manage.py as it's always updated during deployment
921 deployment_date = None
922 manage_py_path = path.join(project_root, "manage.py")
923 if path.exists(manage_py_path):
924 try:
925 import datetime
927 mtime = path.getmtime(manage_py_path)
928 deployment_date = datetime.datetime.fromtimestamp(mtime).strftime(
929 "%Y-%m-%d %H:%M:%S %z"
930 )
931 except Exception:
932 deployment_date = "unknown"
934 return {
935 "commit_hash": commit_hash,
936 "short_hash": commit_hash[:7] if len(commit_hash) >= 7 else commit_hash,
937 "branch": branch_name,
938 "commit_date": commit_date or "unknown",
939 "deployment_date": deployment_date or "unknown",
940 "deployment_status": "active",
941 "version": f"{branch_name}-{commit_hash[:7] if len(commit_hash) >= 7 else commit_hash}",
942 }
944 except Exception as exc:
945 logger.warning(f"Failed to read git info from files: {exc}")
946 return None
949def _get_cached_version_info():
950 """
951 Get cached version information, loading it if not available
952 Since containers restart on content changes, cache indefinitely
953 """
954 global _version_cache
956 with _version_cache_lock:
957 if _version_cache is None:
958 # Get version info from git files
959 _version_cache = _get_git_info_from_files()
961 # If that fails, return error
962 if _version_cache is None:
963 _version_cache = {
964 "error": "Unable to determine version - .git directory not found",
965 "deployment_status": "unknown",
966 }
968 return _version_cache
971class DeploymentVersionView(View):
972 """
973 View to return deployment version information for CI/CD verification
974 Uses cached version info to prevent DDoS attacks and improve performance
975 """
977 def get(self, request, *args, **kwargs):
978 """
979 Return cached deployment version information
980 """
981 version_info = _get_cached_version_info()
983 if "error" in version_info:
984 return JsonResponse(version_info, status=500)
986 return JsonResponse(version_info)