Coverage for ivatar / ivataraccount / models.py: 86%
307 statements
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-30 00:08 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2025-12-30 00:08 +0000
1"""
2Our models for ivatar.ivataraccount
3"""
5import base64
6import hashlib
7import time
8from io import BytesIO
9from os import urandom
10from urllib.error import HTTPError, URLError
11from ivatar.utils import urlopen, Bluesky
12from urllib.parse import urlsplit, urlunsplit, quote
13import logging
15from PIL import Image
16from django.contrib.auth.models import User
17from django.contrib import messages
18from django.db import models
19from django.utils import timezone
20from django.http import HttpResponseRedirect
21from django.urls import reverse_lazy, reverse
22from django.utils.translation import gettext_lazy as _
23from django.core.cache import cache
24from django.core.exceptions import ObjectDoesNotExist
25from django.core.mail import send_mail
26from django.template.loader import render_to_string
27from openid.association import Association as OIDAssociation
28from openid.store import nonce as oidnonce
29from openid.store.interface import OpenIDStore
31from libravatar import libravatar_url
33from ivatar.settings import MAX_LENGTH_EMAIL
34from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY
35from ivatar.settings import MAX_LENGTH_URL
36from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL
37from ivatar.utils import openid_variations
38from .gravatar import get_photo as get_gravatar_photo
40# Initialize logger
41logger = logging.getLogger("ivatar")
44def file_format(image_type):
45 """
46 Helper method returning a short image type
47 """
48 if image_type in ("JPEG", "MPO"):
49 return "jpg"
50 elif image_type == "PNG":
51 return "png"
52 elif image_type == "GIF":
53 return "gif"
54 elif image_type == "WEBP":
55 return "webp"
56 return None
59def pil_format(image_type):
60 """
61 Helper method returning the 'encoder name' for PIL
62 """
63 if image_type in ("jpg", "jpeg", "mpo"):
64 return "JPEG"
65 elif image_type == "png":
66 return "PNG"
67 elif image_type == "gif":
68 return "GIF"
69 elif image_type == "webp":
70 return "WEBP"
72 logger.info("Unsupported file format: %s", image_type)
73 return None
76class UserPreference(models.Model):
77 """
78 Holds the user users preferences
79 """
81 THEMES = (
82 ("default", "Default theme"),
83 ("clime", "climes theme"),
84 ("green", "green theme"),
85 ("red", "red theme"),
86 )
88 theme = models.CharField(
89 max_length=10,
90 choices=THEMES,
91 default="default",
92 )
94 user = models.OneToOneField(
95 User,
96 on_delete=models.deletion.CASCADE,
97 primary_key=True,
98 )
100 def __str__(self):
101 return "Preference (%i) for %s" % (self.pk, self.user)
104class BaseAccountModel(models.Model):
105 """
106 Base, abstract model, holding fields we use in all cases
107 """
109 user = models.ForeignKey(
110 User,
111 on_delete=models.deletion.CASCADE,
112 )
113 ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True)
114 add_date = models.DateTimeField(default=timezone.now)
116 class Meta: # pylint: disable=too-few-public-methods
117 """
118 Class attributes
119 """
121 abstract = True
124class Photo(BaseAccountModel):
125 """
126 Model holding the photos and information about them
127 """
129 ip_address = models.GenericIPAddressField(unpack_ipv4=True)
130 data = models.BinaryField()
131 format = models.CharField(max_length=4)
132 access_count = models.BigIntegerField(default=0, editable=False)
134 class Meta: # pylint: disable=too-few-public-methods
135 """
136 Class attributes
137 """
139 verbose_name = _("photo")
140 verbose_name_plural = _("photos")
141 indexes = [
142 models.Index(fields=["format"], name="idx_photo_format"),
143 models.Index(fields=["access_count"], name="idx_photo_access_count"),
144 models.Index(fields=["user_id", "format"], name="idx_photo_user_format"),
145 ]
147 def import_image(self, service_name, email_address):
148 """
149 Allow to import image from other (eg. Gravatar) service
150 """
151 image_url = False
153 if service_name == "Gravatar":
154 if gravatar := get_gravatar_photo(email_address):
155 image_url = gravatar["image_url"]
157 if service_name == "Libravatar":
158 image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE)
160 if not image_url:
161 return False # pragma: no cover
162 try:
163 image = urlopen(image_url)
164 except HTTPError as exc:
165 logger.warning(
166 f"{service_name} import failed with an HTTP error: {exc.code}"
167 )
168 return False
169 except URLError as exc:
170 logger.warning(f"{service_name} import failed: {exc.reason}")
171 return False
172 data = image.read()
174 try:
175 img = Image.open(BytesIO(data))
176 # How am I supposed to test this?
177 except ValueError: # pragma: no cover
178 return False # pragma: no cover
180 self.format = file_format(img.format)
181 if not self.format:
182 logger.warning(f"Unable to determine format: {img}")
183 return False # pragma: no cover
184 self.data = data
185 super().save()
186 return True
188 def save(
189 self, force_insert=False, force_update=False, using=None, update_fields=None
190 ):
191 """
192 Override save from parent, taking care about the image
193 """
194 # Use PIL to read the file format
195 try:
196 img = Image.open(BytesIO(self.data))
197 except Exception as exc: # pylint: disable=broad-except
198 # For debugging only
199 logger.error(f"Exception caught in Photo.save(): {exc}")
200 return False
201 self.format = file_format(img.format)
202 if not self.format:
203 logger.error("Format not recognized")
204 return False
205 return super().save(
206 force_insert=force_insert,
207 force_update=force_update,
208 using=using,
209 update_fields=update_fields,
210 )
212 def perform_crop(self, request, dimensions, email, openid):
213 """
214 Helper to crop the image
215 """
216 if request.user.photo_set.count() == 1:
217 # This is the first photo, assign to all confirmed addresses
218 for addr in request.user.confirmedemail_set.all():
219 addr.photo = self
220 addr.save()
222 for addr in request.user.confirmedopenid_set.all():
223 addr.photo = self
224 addr.save()
226 if email:
227 # Explicitly asked
228 email.photo = self
229 email.save()
231 if openid:
232 # Explicitly asked
233 openid.photo = self
234 openid.save()
236 # Do the real work cropping
237 img = Image.open(BytesIO(self.data))
239 # This should be anyway checked during save...
240 dimensions["a"], dimensions["b"] = img.size # pylint: disable=invalid-name
241 if dimensions["a"] > MAX_PIXELS or dimensions["b"] > MAX_PIXELS:
242 messages.error(
243 request,
244 _(
245 "Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s"
246 % {
247 "max_pixels": MAX_PIXELS,
248 }
249 ),
250 )
251 return HttpResponseRedirect(reverse_lazy("profile"))
253 if dimensions["w"] == 0 and dimensions["h"] == 0:
254 dimensions["w"], dimensions["h"] = dimensions["a"], dimensions["b"]
255 min_from_w_h = min(dimensions["w"], dimensions["h"])
256 dimensions["w"], dimensions["h"] = min_from_w_h, min_from_w_h
257 elif (
258 (dimensions["w"] < 0)
259 or ((dimensions["x"] + dimensions["w"]) > dimensions["a"])
260 or (dimensions["h"] < 0)
261 or ((dimensions["y"] + dimensions["h"]) > dimensions["b"])
262 ):
263 messages.error(request, _("Crop outside of original image bounding box"))
264 return HttpResponseRedirect(reverse_lazy("profile"))
266 cropped = img.crop(
267 (
268 dimensions["x"],
269 dimensions["y"],
270 dimensions["x"] + dimensions["w"],
271 dimensions["y"] + dimensions["h"],
272 )
273 )
274 # cropped.load()
275 # Resize the image only if it's larger than the specified max width.
276 cropped_w, cropped_h = cropped.size
277 max_w = AVATAR_MAX_SIZE
278 if cropped_w > max_w or cropped_h > max_w:
279 cropped = cropped.resize((max_w, max_w), Image.LANCZOS)
281 data = BytesIO()
282 cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY)
283 data.seek(0)
285 # Overwrite the existing image
286 self.data = data.read()
287 self.save()
289 return HttpResponseRedirect(reverse_lazy("profile"))
291 def __str__(self):
292 return "%s (%i) from %s" % (self.format, self.pk or 0, self.user)
295# pylint: disable=too-few-public-methods
296class ConfirmedEmailManager(models.Manager):
297 """
298 Manager for our confirmed email addresses model
299 """
301 @staticmethod
302 def create_confirmed_email(user, email_address, is_logged_in):
303 """
304 Helper method to create confirmed email address
305 """
306 confirmed = ConfirmedEmail()
307 confirmed.user = user
308 confirmed.ip_address = "0.0.0.0"
309 confirmed.email = email_address
310 confirmed.save()
312 external_photos = []
313 if is_logged_in:
314 if gravatar := get_gravatar_photo(confirmed.email):
315 external_photos.append(gravatar)
317 return (confirmed.pk, external_photos)
320class ConfirmedEmail(BaseAccountModel):
321 """
322 Model holding our confirmed email addresses, as well as the relation
323 to the assigned photo
324 """
326 email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL)
327 photo = models.ForeignKey(
328 Photo,
329 related_name="emails",
330 blank=True,
331 null=True,
332 on_delete=models.deletion.SET_NULL,
333 )
334 # Alternative assignment - use Bluesky handle
335 bluesky_handle = models.CharField(max_length=256, null=True, blank=True)
336 digest = models.CharField(max_length=32)
337 digest_sha256 = models.CharField(max_length=64)
338 objects = ConfirmedEmailManager()
339 access_count = models.BigIntegerField(default=0, editable=False)
341 class Meta: # pylint: disable=too-few-public-methods
342 """
343 Class attributes
344 """
346 verbose_name = _("confirmed email")
347 verbose_name_plural = _("confirmed emails")
348 indexes = [
349 models.Index(fields=["digest"], name="idx_cemail_digest"),
350 models.Index(fields=["digest_sha256"], name="idx_cemail_digest_sha256"),
351 models.Index(fields=["access_count"], name="idx_cemail_access_count"),
352 models.Index(fields=["bluesky_handle"], name="idx_cemail_bluesky_handle"),
353 models.Index(
354 fields=["user_id", "access_count"],
355 name="idx_cemail_user_access",
356 ),
357 models.Index(
358 fields=["photo_id", "access_count"],
359 name="idx_cemail_photo_access",
360 ),
361 ]
363 def set_photo(self, photo):
364 """
365 Helper method to set photo
366 """
367 self.photo = photo
368 self.save()
370 def set_bluesky_handle(self, handle):
371 """
372 Helper method to set Bluesky handle
373 """
375 bs = Bluesky()
376 handle = bs.normalize_handle(handle)
377 avatar = bs.get_profile(handle)
378 if not avatar:
379 raise ValueError("Invalid Bluesky handle")
380 self.bluesky_handle = handle
381 self.save()
383 def save(
384 self, force_insert=False, force_update=False, using=None, update_fields=None
385 ):
386 """
387 Override save from parent, add digest
388 """
389 self.digest = hashlib.md5(
390 self.email.strip().lower().encode("utf-8")
391 ).hexdigest()
392 self.digest_sha256 = hashlib.sha256(
393 self.email.strip().lower().encode("utf-8")
394 ).hexdigest()
396 # We need to manually expire the page caches
397 # TODO: Verify this works as expected
398 # First check if we already have an ID
399 if self.pk:
400 cache_url = reverse_lazy(
401 "assign_photo_email", kwargs={"email_id": int(self.pk)}
402 )
404 cache_key = f"views.decorators.cache.cache_page.{quote(str(cache_url))}"
405 try:
406 if cache.has_key(cache_key):
407 cache.delete(cache_key)
408 logger.debug("Successfully cleaned up cached page: %s" % cache_key)
409 except Exception as exc:
410 logger.warning(
411 "Failed to clean up cached page {}: {}".format(cache_key, exc)
412 )
414 # Invalidate Bluesky avatar URL cache if bluesky_handle changed
415 if hasattr(self, "bluesky_handle") and self.bluesky_handle:
416 try:
417 cache.delete(self.bluesky_handle)
418 logger.debug(
419 "Successfully cleaned up Bluesky avatar cache for handle: %s"
420 % self.bluesky_handle
421 )
422 except Exception as exc:
423 logger.warning(
424 "Failed to clean up Bluesky avatar cache for handle %s: %s"
425 % (self.bluesky_handle, exc)
426 )
428 return super().save(
429 force_insert=force_insert,
430 force_update=force_update,
431 using=using,
432 update_fields=update_fields,
433 )
435 def __str__(self):
436 return "%s (%i) from %s" % (self.email, self.pk, self.user)
439class UnconfirmedEmail(BaseAccountModel):
440 """
441 Model holding unconfirmed email addresses as well as the verification key
442 """
444 email = models.EmailField(max_length=MAX_LENGTH_EMAIL)
445 verification_key = models.CharField(max_length=64)
446 last_send_date = models.DateTimeField(null=True, blank=True)
447 last_status = models.TextField(max_length=2047, null=True, blank=True)
449 class Meta: # pylint: disable=too-few-public-methods
450 """
451 Class attributes
452 """
454 verbose_name = _("unconfirmed email")
455 verbose_name_plural = _("unconfirmed emails")
457 def save(
458 self, force_insert=False, force_update=False, using=None, update_fields=None
459 ):
460 if not self.verification_key:
461 hash_object = hashlib.new("sha256")
462 hash_object.update(
463 urandom(1024)
464 + self.user.username.encode("utf-8") # pylint: disable=no-member
465 ) # pylint: disable=no-member
466 self.verification_key = hash_object.hexdigest()
467 super().save(
468 force_insert=force_insert,
469 force_update=force_update,
470 using=using,
471 update_fields=update_fields,
472 )
474 def send_confirmation_mail(self, url=SECURE_BASE_URL):
475 """
476 Send confirmation mail to that mail address
477 """
478 link = url + reverse(
479 "confirm_email", kwargs={"verification_key": self.verification_key}
480 )
481 email_subject = _("Confirm your email address on %s") % SITE_NAME
482 email_body = render_to_string(
483 "email_confirmation.txt",
484 {
485 "verification_link": link,
486 "site_name": SITE_NAME,
487 },
488 )
489 self.last_send_date = timezone.now()
490 self.last_status = "OK"
491 # if settings.DEBUG:
492 # print('DEBUG: %s' % link)
493 try:
494 send_mail(email_subject, email_body, DEFAULT_FROM_EMAIL, [self.email])
495 except Exception as e:
496 self.last_status = f"{e}"
497 self.save()
498 return True
500 def __str__(self):
501 return "%s (%i) from %s" % (self.email, self.pk, self.user)
504class UnconfirmedOpenId(BaseAccountModel):
505 """
506 Model holding unconfirmed OpenIDs
507 """
509 openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL)
511 class Meta: # pylint: disable=too-few-public-methods
512 """
513 Meta class
514 """
516 verbose_name = _("unconfirmed OpenID")
517 verbose_name_plural = "unconfirmed_OpenIDs"
519 def __str__(self):
520 return "%s (%i) from %s" % (self.openid, self.pk, self.user)
523class ConfirmedOpenId(BaseAccountModel):
524 """
525 Model holding confirmed OpenIDs, as well as the relation to
526 the assigned photo
527 """
529 openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL)
530 photo = models.ForeignKey(
531 Photo,
532 related_name="openids",
533 blank=True,
534 null=True,
535 on_delete=models.deletion.SET_NULL,
536 )
537 # http://<id>/ base version - http w/ trailing slash
538 digest = models.CharField(max_length=64)
539 # http://<id> - http w/o trailing slash
540 alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None)
541 # https://<id>/ - https w/ trailing slash
542 alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None)
543 # https://<id> - https w/o trailing slash
544 alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None)
545 # Alternative assignment - use Bluesky handle
546 bluesky_handle = models.CharField(max_length=256, null=True, blank=True)
548 access_count = models.BigIntegerField(default=0, editable=False)
550 class Meta: # pylint: disable=too-few-public-methods
551 """
552 Meta class
553 """
555 verbose_name = _("confirmed OpenID")
556 verbose_name_plural = _("confirmed OpenIDs")
558 def set_photo(self, photo):
559 """
560 Helper method to save photo
561 """
562 self.photo = photo
563 self.save()
565 def set_bluesky_handle(self, handle):
566 """
567 Helper method to set Bluesky handle
568 """
569 bs = Bluesky()
570 handle = bs.normalize_handle(handle)
571 avatar = bs.get_profile(handle)
572 if not avatar:
573 raise ValueError("Invalid Bluesky handle")
574 self.bluesky_handle = handle
575 self.save()
577 def save(
578 self, force_insert=False, force_update=False, using=None, update_fields=None
579 ):
580 url = urlsplit(self.openid)
581 if url.username: # pragma: no cover
582 password = url.password or ""
583 netloc = f"{url.username}:{password}@{url.hostname}"
584 else:
585 netloc = url.hostname
586 lowercase_url = urlunsplit(
587 (url.scheme.lower(), netloc, url.path, url.query, url.fragment)
588 )
589 self.openid = lowercase_url
591 self.digest = hashlib.sha256(
592 openid_variations(lowercase_url)[0].encode("utf-8")
593 ).hexdigest()
594 self.alt_digest1 = hashlib.sha256(
595 openid_variations(lowercase_url)[1].encode("utf-8")
596 ).hexdigest()
597 self.alt_digest2 = hashlib.sha256(
598 openid_variations(lowercase_url)[2].encode("utf-8")
599 ).hexdigest()
600 self.alt_digest3 = hashlib.sha256(
601 openid_variations(lowercase_url)[3].encode("utf-8")
602 ).hexdigest()
604 # Invalidate page caches and Bluesky avatar cache
605 if self.pk:
606 # Invalidate assign_photo_openid page cache
607 cache_url = reverse_lazy(
608 "assign_photo_openid", kwargs={"openid_id": int(self.pk)}
609 )
610 cache_key = f"views.decorators.cache.cache_page.{quote(str(cache_url))}"
611 try:
612 if cache.has_key(cache_key):
613 cache.delete(cache_key)
614 logger.debug("Successfully cleaned up cached page: %s" % cache_key)
615 except Exception as exc:
616 logger.warning(
617 "Failed to clean up cached page {}: {}".format(cache_key, exc)
618 )
620 # Invalidate Bluesky avatar URL cache if bluesky_handle exists
621 if hasattr(self, "bluesky_handle") and self.bluesky_handle:
622 try:
623 cache.delete(self.bluesky_handle)
624 logger.debug(
625 "Successfully cleaned up Bluesky avatar cache for handle: %s"
626 % self.bluesky_handle
627 )
628 except Exception as exc:
629 logger.warning(
630 "Failed to clean up Bluesky avatar cache for handle %s: %s"
631 % (self.bluesky_handle, exc)
632 )
634 return super().save(
635 force_insert=force_insert,
636 force_update=force_update,
637 using=using,
638 update_fields=update_fields,
639 )
641 def __str__(self):
642 return "%s (%i) (%s)" % (self.openid, self.pk, self.user)
645class OpenIDNonce(models.Model):
646 """
647 Model holding OpenID Nonces
648 See also: https://github.com/edx/django-openid-auth/
649 """
651 server_url = models.CharField(max_length=255)
652 timestamp = models.IntegerField()
653 salt = models.CharField(max_length=128)
655 def __str__(self):
656 return "%s (%i) (timestamp: %i)" % (self.server_url, self.pk, self.timestamp)
659class OpenIDAssociation(models.Model):
660 """
661 Model holding the relation/association about OpenIDs
662 """
664 server_url = models.TextField(max_length=2047)
665 handle = models.CharField(max_length=255)
666 secret = models.TextField(max_length=255) # stored base64 encoded
667 issued = models.IntegerField()
668 lifetime = models.IntegerField()
669 assoc_type = models.TextField(max_length=64)
671 def __str__(self):
672 return "%s (%i) (%s, lifetime: %i)" % (
673 self.server_url,
674 self.pk,
675 self.assoc_type,
676 self.lifetime,
677 )
680class DjangoOpenIDStore(OpenIDStore):
681 """
682 The Python openid library needs an OpenIDStore subclass to persist data
683 related to OpenID authentications. This one uses our Django models.
684 """
686 @staticmethod
687 def storeAssociation(server_url, association): # pragma: no cover
688 """
689 Helper method to store associations
690 """
691 assoc = OpenIDAssociation(
692 server_url=server_url,
693 handle=association.handle,
694 secret=base64.encodebytes(association.secret),
695 issued=association.issued,
696 lifetime=association.issued,
697 assoc_type=association.assoc_type,
698 )
699 assoc.save()
701 def getAssociation(self, server_url, handle=None): # pragma: no cover
702 """
703 Helper method to get associations
704 """
705 assocs = []
706 if handle is not None:
707 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
708 server_url=server_url, handle=handle
709 )
710 else:
711 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
712 server_url=server_url
713 )
714 if not assocs:
715 return None
716 associations = []
717 for assoc in assocs:
718 if isinstance(assoc.secret, str):
719 assoc.secret = assoc.secret.split("b'")[1].split("'")[0]
720 assoc.secret = bytes(assoc.secret, "utf-8")
721 association = OIDAssociation(
722 assoc.handle,
723 base64.decodebytes(assoc.secret),
724 assoc.issued,
725 assoc.lifetime,
726 assoc.assoc_type,
727 )
728 expires = 0
729 try:
730 # pylint: disable=no-member
731 expires = association.getExpiresIn()
732 except AttributeError:
733 expires = association.expiresIn
734 if expires == 0:
735 self.removeAssociation(server_url, assoc.handle)
736 else:
737 associations.append((association.issued, association))
738 return associations[-1][1] if associations else None
740 @staticmethod
741 def removeAssociation(server_url, handle): # pragma: no cover
742 """
743 Helper method to remove associations
744 """
745 assocs = list(
746 OpenIDAssociation.objects.filter( # pylint: disable=no-member
747 server_url=server_url, handle=handle
748 )
749 )
750 assocs_exist = len(assocs) > 0
751 for assoc in assocs:
752 assoc.delete()
753 return assocs_exist
755 @staticmethod
756 def useNonce(server_url, timestamp, salt): # pragma: no cover
757 """
758 Helper method to 'use' nonces
759 """
760 # Has nonce expired?
761 if abs(timestamp - time.time()) > oidnonce.SKEW:
762 return False
763 try:
764 nonce = OpenIDNonce.objects.get( # pylint: disable=no-member
765 server_url__exact=server_url,
766 timestamp__exact=timestamp,
767 salt__exact=salt,
768 )
769 except ObjectDoesNotExist:
770 nonce = OpenIDNonce.objects.create( # pylint: disable=no-member
771 server_url=server_url, timestamp=timestamp, salt=salt
772 )
773 return True
774 nonce.delete()
775 return False
777 @staticmethod
778 def cleanupNonces(): # pragma: no cover
779 """
780 Helper method to cleanup nonces
781 """
782 timestamp = int(time.time()) - oidnonce.SKEW
783 # pylint: disable=no-member
784 OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete()
786 @staticmethod
787 def cleanupAssociations(): # pragma: no cover
788 """
789 Helper method to cleanup associations
790 """
791 OpenIDAssociation.objects.extra(
792 where=[f"issued + lifetimeint < ({time.time()})"]
793 ).delete()