Coverage for ivatar/ivataraccount/models.py: 86%
307 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 00:07 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 00:07 +0000
1"""
2Our models for ivatar.ivataraccount
3"""
5import base64
6import hashlib
7import time
8from io import BytesIO
9from os import urandom
10from urllib.error import HTTPError, URLError
11from ivatar.utils import urlopen, Bluesky
12from urllib.parse import urlsplit, urlunsplit, quote
13import logging
15from PIL import Image
16from django.contrib.auth.models import User
17from django.contrib import messages
18from django.db import models
19from django.utils import timezone
20from django.http import HttpResponseRedirect
21from django.urls import reverse_lazy, reverse
22from django.utils.translation import gettext_lazy as _
23from django.core.cache import cache
24from django.core.exceptions import ObjectDoesNotExist
25from django.core.mail import send_mail
26from django.template.loader import render_to_string
27from openid.association import Association as OIDAssociation
28from openid.store import nonce as oidnonce
29from openid.store.interface import OpenIDStore
31from libravatar import libravatar_url
33from ivatar.settings import MAX_LENGTH_EMAIL
34from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY
35from ivatar.settings import MAX_LENGTH_URL
36from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL
37from ivatar.utils import openid_variations
38from .gravatar import get_photo as get_gravatar_photo
40# Initialize logger
41logger = logging.getLogger("ivatar")
44def file_format(image_type):
45 """
46 Helper method returning a short image type
47 """
48 if image_type in ("JPEG", "MPO"):
49 return "jpg"
50 elif image_type == "PNG":
51 return "png"
52 elif image_type == "GIF":
53 return "gif"
54 elif image_type == "WEBP":
55 return "webp"
56 return None
59def pil_format(image_type):
60 """
61 Helper method returning the 'encoder name' for PIL
62 """
63 if image_type in ("jpg", "jpeg", "mpo"):
64 return "JPEG"
65 elif image_type == "png":
66 return "PNG"
67 elif image_type == "gif":
68 return "GIF"
69 elif image_type == "webp":
70 return "WEBP"
72 logger.info("Unsupported file format: %s", image_type)
73 return None
76class UserPreference(models.Model):
77 """
78 Holds the user users preferences
79 """
81 THEMES = (
82 ("default", "Default theme"),
83 ("clime", "climes theme"),
84 ("green", "green theme"),
85 ("red", "red theme"),
86 )
88 theme = models.CharField(
89 max_length=10,
90 choices=THEMES,
91 default="default",
92 )
94 user = models.OneToOneField(
95 User,
96 on_delete=models.deletion.CASCADE,
97 primary_key=True,
98 )
100 def __str__(self):
101 return "Preference (%i) for %s" % (self.pk, self.user)
104class BaseAccountModel(models.Model):
105 """
106 Base, abstract model, holding fields we use in all cases
107 """
109 user = models.ForeignKey(
110 User,
111 on_delete=models.deletion.CASCADE,
112 )
113 ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True)
114 add_date = models.DateTimeField(default=timezone.now)
116 class Meta: # pylint: disable=too-few-public-methods
117 """
118 Class attributes
119 """
121 abstract = True
124class Photo(BaseAccountModel):
125 """
126 Model holding the photos and information about them
127 """
129 ip_address = models.GenericIPAddressField(unpack_ipv4=True)
130 data = models.BinaryField()
131 format = models.CharField(max_length=4)
132 access_count = models.BigIntegerField(default=0, editable=False)
134 class Meta: # pylint: disable=too-few-public-methods
135 """
136 Class attributes
137 """
139 verbose_name = _("photo")
140 verbose_name_plural = _("photos")
141 indexes = [
142 models.Index(fields=["format"], name="idx_photo_format"),
143 models.Index(fields=["access_count"], name="idx_photo_access_count"),
144 models.Index(fields=["user_id", "format"], name="idx_photo_user_format"),
145 ]
147 def import_image(self, service_name, email_address):
148 """
149 Allow to import image from other (eg. Gravatar) service
150 """
151 image_url = False
153 if service_name == "Gravatar":
154 if gravatar := get_gravatar_photo(email_address):
155 image_url = gravatar["image_url"]
157 if service_name == "Libravatar":
158 image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE)
160 if not image_url:
161 return False # pragma: no cover
162 try:
163 image = urlopen(image_url)
164 except HTTPError as exc:
165 logger.warning(
166 f"{service_name} import failed with an HTTP error: {exc.code}"
167 )
168 return False
169 except URLError as exc:
170 logger.warning(f"{service_name} import failed: {exc.reason}")
171 return False
172 data = image.read()
174 try:
175 img = Image.open(BytesIO(data))
176 # How am I supposed to test this?
177 except ValueError: # pragma: no cover
178 return False # pragma: no cover
180 self.format = file_format(img.format)
181 if not self.format:
182 logger.warning(f"Unable to determine format: {img}")
183 return False # pragma: no cover
184 self.data = data
185 super().save()
186 return True
188 def save(
189 self, force_insert=False, force_update=False, using=None, update_fields=None
190 ):
191 """
192 Override save from parent, taking care about the image
193 """
194 # Use PIL to read the file format
195 try:
196 img = Image.open(BytesIO(self.data))
197 except Exception as exc: # pylint: disable=broad-except
198 # For debugging only
199 logger.error(f"Exception caught in Photo.save(): {exc}")
200 return False
201 self.format = file_format(img.format)
202 if not self.format:
203 logger.error("Format not recognized")
204 return False
205 return super().save(force_insert, force_update, using, update_fields)
207 def perform_crop(self, request, dimensions, email, openid):
208 """
209 Helper to crop the image
210 """
211 if request.user.photo_set.count() == 1:
212 # This is the first photo, assign to all confirmed addresses
213 for addr in request.user.confirmedemail_set.all():
214 addr.photo = self
215 addr.save()
217 for addr in request.user.confirmedopenid_set.all():
218 addr.photo = self
219 addr.save()
221 if email:
222 # Explicitly asked
223 email.photo = self
224 email.save()
226 if openid:
227 # Explicitly asked
228 openid.photo = self
229 openid.save()
231 # Do the real work cropping
232 img = Image.open(BytesIO(self.data))
234 # This should be anyway checked during save...
235 dimensions["a"], dimensions["b"] = img.size # pylint: disable=invalid-name
236 if dimensions["a"] > MAX_PIXELS or dimensions["b"] > MAX_PIXELS:
237 messages.error(
238 request,
239 _(
240 "Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s"
241 % {
242 "max_pixels": MAX_PIXELS,
243 }
244 ),
245 )
246 return HttpResponseRedirect(reverse_lazy("profile"))
248 if dimensions["w"] == 0 and dimensions["h"] == 0:
249 dimensions["w"], dimensions["h"] = dimensions["a"], dimensions["b"]
250 min_from_w_h = min(dimensions["w"], dimensions["h"])
251 dimensions["w"], dimensions["h"] = min_from_w_h, min_from_w_h
252 elif (
253 (dimensions["w"] < 0)
254 or ((dimensions["x"] + dimensions["w"]) > dimensions["a"])
255 or (dimensions["h"] < 0)
256 or ((dimensions["y"] + dimensions["h"]) > dimensions["b"])
257 ):
258 messages.error(request, _("Crop outside of original image bounding box"))
259 return HttpResponseRedirect(reverse_lazy("profile"))
261 cropped = img.crop(
262 (
263 dimensions["x"],
264 dimensions["y"],
265 dimensions["x"] + dimensions["w"],
266 dimensions["y"] + dimensions["h"],
267 )
268 )
269 # cropped.load()
270 # Resize the image only if it's larger than the specified max width.
271 cropped_w, cropped_h = cropped.size
272 max_w = AVATAR_MAX_SIZE
273 if cropped_w > max_w or cropped_h > max_w:
274 cropped = cropped.resize((max_w, max_w), Image.LANCZOS)
276 data = BytesIO()
277 cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY)
278 data.seek(0)
280 # Overwrite the existing image
281 self.data = data.read()
282 self.save()
284 return HttpResponseRedirect(reverse_lazy("profile"))
286 def __str__(self):
287 return "%s (%i) from %s" % (self.format, self.pk or 0, self.user)
290# pylint: disable=too-few-public-methods
291class ConfirmedEmailManager(models.Manager):
292 """
293 Manager for our confirmed email addresses model
294 """
296 @staticmethod
297 def create_confirmed_email(user, email_address, is_logged_in):
298 """
299 Helper method to create confirmed email address
300 """
301 confirmed = ConfirmedEmail()
302 confirmed.user = user
303 confirmed.ip_address = "0.0.0.0"
304 confirmed.email = email_address
305 confirmed.save()
307 external_photos = []
308 if is_logged_in:
309 if gravatar := get_gravatar_photo(confirmed.email):
310 external_photos.append(gravatar)
312 return (confirmed.pk, external_photos)
315class ConfirmedEmail(BaseAccountModel):
316 """
317 Model holding our confirmed email addresses, as well as the relation
318 to the assigned photo
319 """
321 email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL)
322 photo = models.ForeignKey(
323 Photo,
324 related_name="emails",
325 blank=True,
326 null=True,
327 on_delete=models.deletion.SET_NULL,
328 )
329 # Alternative assignment - use Bluesky handle
330 bluesky_handle = models.CharField(max_length=256, null=True, blank=True)
331 digest = models.CharField(max_length=32)
332 digest_sha256 = models.CharField(max_length=64)
333 objects = ConfirmedEmailManager()
334 access_count = models.BigIntegerField(default=0, editable=False)
336 class Meta: # pylint: disable=too-few-public-methods
337 """
338 Class attributes
339 """
341 verbose_name = _("confirmed email")
342 verbose_name_plural = _("confirmed emails")
343 indexes = [
344 models.Index(fields=["digest"], name="idx_cemail_digest"),
345 models.Index(fields=["digest_sha256"], name="idx_cemail_digest_sha256"),
346 models.Index(fields=["access_count"], name="idx_cemail_access_count"),
347 models.Index(fields=["bluesky_handle"], name="idx_cemail_bluesky_handle"),
348 models.Index(
349 fields=["user_id", "access_count"],
350 name="idx_cemail_user_access",
351 ),
352 models.Index(
353 fields=["photo_id", "access_count"],
354 name="idx_cemail_photo_access",
355 ),
356 ]
358 def set_photo(self, photo):
359 """
360 Helper method to set photo
361 """
362 self.photo = photo
363 self.save()
365 def set_bluesky_handle(self, handle):
366 """
367 Helper method to set Bluesky handle
368 """
370 bs = Bluesky()
371 handle = bs.normalize_handle(handle)
372 avatar = bs.get_profile(handle)
373 if not avatar:
374 raise ValueError("Invalid Bluesky handle")
375 self.bluesky_handle = handle
376 self.save()
378 def save(
379 self, force_insert=False, force_update=False, using=None, update_fields=None
380 ):
381 """
382 Override save from parent, add digest
383 """
384 self.digest = hashlib.md5(
385 self.email.strip().lower().encode("utf-8")
386 ).hexdigest()
387 self.digest_sha256 = hashlib.sha256(
388 self.email.strip().lower().encode("utf-8")
389 ).hexdigest()
391 # We need to manually expire the page caches
392 # TODO: Verify this works as expected
393 # First check if we already have an ID
394 if self.pk:
395 cache_url = reverse_lazy(
396 "assign_photo_email", kwargs={"email_id": int(self.pk)}
397 )
399 cache_key = f"views.decorators.cache.cache_page.{quote(str(cache_url))}"
400 try:
401 if cache.has_key(cache_key):
402 cache.delete(cache_key)
403 logger.debug("Successfully cleaned up cached page: %s" % cache_key)
404 except Exception as exc:
405 logger.warning(
406 "Failed to clean up cached page {}: {}".format(cache_key, exc)
407 )
409 # Invalidate Bluesky avatar URL cache if bluesky_handle changed
410 if hasattr(self, "bluesky_handle") and self.bluesky_handle:
411 try:
412 cache.delete(self.bluesky_handle)
413 logger.debug(
414 "Successfully cleaned up Bluesky avatar cache for handle: %s"
415 % self.bluesky_handle
416 )
417 except Exception as exc:
418 logger.warning(
419 "Failed to clean up Bluesky avatar cache for handle %s: %s"
420 % (self.bluesky_handle, exc)
421 )
423 return super().save(force_insert, force_update, using, update_fields)
425 def __str__(self):
426 return "%s (%i) from %s" % (self.email, self.pk, self.user)
429class UnconfirmedEmail(BaseAccountModel):
430 """
431 Model holding unconfirmed email addresses as well as the verification key
432 """
434 email = models.EmailField(max_length=MAX_LENGTH_EMAIL)
435 verification_key = models.CharField(max_length=64)
436 last_send_date = models.DateTimeField(null=True, blank=True)
437 last_status = models.TextField(max_length=2047, null=True, blank=True)
439 class Meta: # pylint: disable=too-few-public-methods
440 """
441 Class attributes
442 """
444 verbose_name = _("unconfirmed email")
445 verbose_name_plural = _("unconfirmed emails")
447 def save(
448 self, force_insert=False, force_update=False, using=None, update_fields=None
449 ):
450 if not self.verification_key:
451 hash_object = hashlib.new("sha256")
452 hash_object.update(
453 urandom(1024)
454 + self.user.username.encode("utf-8") # pylint: disable=no-member
455 ) # pylint: disable=no-member
456 self.verification_key = hash_object.hexdigest()
457 super().save(force_insert, force_update, using, update_fields)
459 def send_confirmation_mail(self, url=SECURE_BASE_URL):
460 """
461 Send confirmation mail to that mail address
462 """
463 link = url + reverse(
464 "confirm_email", kwargs={"verification_key": self.verification_key}
465 )
466 email_subject = _("Confirm your email address on %s") % SITE_NAME
467 email_body = render_to_string(
468 "email_confirmation.txt",
469 {
470 "verification_link": link,
471 "site_name": SITE_NAME,
472 },
473 )
474 self.last_send_date = timezone.now()
475 self.last_status = "OK"
476 # if settings.DEBUG:
477 # print('DEBUG: %s' % link)
478 try:
479 send_mail(email_subject, email_body, DEFAULT_FROM_EMAIL, [self.email])
480 except Exception as e:
481 self.last_status = f"{e}"
482 self.save()
483 return True
485 def __str__(self):
486 return "%s (%i) from %s" % (self.email, self.pk, self.user)
489class UnconfirmedOpenId(BaseAccountModel):
490 """
491 Model holding unconfirmed OpenIDs
492 """
494 openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL)
496 class Meta: # pylint: disable=too-few-public-methods
497 """
498 Meta class
499 """
501 verbose_name = _("unconfirmed OpenID")
502 verbose_name_plural = "unconfirmed_OpenIDs"
504 def __str__(self):
505 return "%s (%i) from %s" % (self.openid, self.pk, self.user)
508class ConfirmedOpenId(BaseAccountModel):
509 """
510 Model holding confirmed OpenIDs, as well as the relation to
511 the assigned photo
512 """
514 openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL)
515 photo = models.ForeignKey(
516 Photo,
517 related_name="openids",
518 blank=True,
519 null=True,
520 on_delete=models.deletion.SET_NULL,
521 )
522 # http://<id>/ base version - http w/ trailing slash
523 digest = models.CharField(max_length=64)
524 # http://<id> - http w/o trailing slash
525 alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None)
526 # https://<id>/ - https w/ trailing slash
527 alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None)
528 # https://<id> - https w/o trailing slash
529 alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None)
530 # Alternative assignment - use Bluesky handle
531 bluesky_handle = models.CharField(max_length=256, null=True, blank=True)
533 access_count = models.BigIntegerField(default=0, editable=False)
535 class Meta: # pylint: disable=too-few-public-methods
536 """
537 Meta class
538 """
540 verbose_name = _("confirmed OpenID")
541 verbose_name_plural = _("confirmed OpenIDs")
543 def set_photo(self, photo):
544 """
545 Helper method to save photo
546 """
547 self.photo = photo
548 self.save()
550 def set_bluesky_handle(self, handle):
551 """
552 Helper method to set Bluesky handle
553 """
554 bs = Bluesky()
555 handle = bs.normalize_handle(handle)
556 avatar = bs.get_profile(handle)
557 if not avatar:
558 raise ValueError("Invalid Bluesky handle")
559 self.bluesky_handle = handle
560 self.save()
562 def save(
563 self, force_insert=False, force_update=False, using=None, update_fields=None
564 ):
565 url = urlsplit(self.openid)
566 if url.username: # pragma: no cover
567 password = url.password or ""
568 netloc = f"{url.username}:{password}@{url.hostname}"
569 else:
570 netloc = url.hostname
571 lowercase_url = urlunsplit(
572 (url.scheme.lower(), netloc, url.path, url.query, url.fragment)
573 )
574 self.openid = lowercase_url
576 self.digest = hashlib.sha256(
577 openid_variations(lowercase_url)[0].encode("utf-8")
578 ).hexdigest()
579 self.alt_digest1 = hashlib.sha256(
580 openid_variations(lowercase_url)[1].encode("utf-8")
581 ).hexdigest()
582 self.alt_digest2 = hashlib.sha256(
583 openid_variations(lowercase_url)[2].encode("utf-8")
584 ).hexdigest()
585 self.alt_digest3 = hashlib.sha256(
586 openid_variations(lowercase_url)[3].encode("utf-8")
587 ).hexdigest()
589 # Invalidate page caches and Bluesky avatar cache
590 if self.pk:
591 # Invalidate assign_photo_openid page cache
592 cache_url = reverse_lazy(
593 "assign_photo_openid", kwargs={"openid_id": int(self.pk)}
594 )
595 cache_key = f"views.decorators.cache.cache_page.{quote(str(cache_url))}"
596 try:
597 if cache.has_key(cache_key):
598 cache.delete(cache_key)
599 logger.debug("Successfully cleaned up cached page: %s" % cache_key)
600 except Exception as exc:
601 logger.warning(
602 "Failed to clean up cached page {}: {}".format(cache_key, exc)
603 )
605 # Invalidate Bluesky avatar URL cache if bluesky_handle exists
606 if hasattr(self, "bluesky_handle") and self.bluesky_handle:
607 try:
608 cache.delete(self.bluesky_handle)
609 logger.debug(
610 "Successfully cleaned up Bluesky avatar cache for handle: %s"
611 % self.bluesky_handle
612 )
613 except Exception as exc:
614 logger.warning(
615 "Failed to clean up Bluesky avatar cache for handle %s: %s"
616 % (self.bluesky_handle, exc)
617 )
619 return super().save(force_insert, force_update, using, update_fields)
621 def __str__(self):
622 return "%s (%i) (%s)" % (self.openid, self.pk, self.user)
625class OpenIDNonce(models.Model):
626 """
627 Model holding OpenID Nonces
628 See also: https://github.com/edx/django-openid-auth/
629 """
631 server_url = models.CharField(max_length=255)
632 timestamp = models.IntegerField()
633 salt = models.CharField(max_length=128)
635 def __str__(self):
636 return "%s (%i) (timestamp: %i)" % (self.server_url, self.pk, self.timestamp)
639class OpenIDAssociation(models.Model):
640 """
641 Model holding the relation/association about OpenIDs
642 """
644 server_url = models.TextField(max_length=2047)
645 handle = models.CharField(max_length=255)
646 secret = models.TextField(max_length=255) # stored base64 encoded
647 issued = models.IntegerField()
648 lifetime = models.IntegerField()
649 assoc_type = models.TextField(max_length=64)
651 def __str__(self):
652 return "%s (%i) (%s, lifetime: %i)" % (
653 self.server_url,
654 self.pk,
655 self.assoc_type,
656 self.lifetime,
657 )
660class DjangoOpenIDStore(OpenIDStore):
661 """
662 The Python openid library needs an OpenIDStore subclass to persist data
663 related to OpenID authentications. This one uses our Django models.
664 """
666 @staticmethod
667 def storeAssociation(server_url, association): # pragma: no cover
668 """
669 Helper method to store associations
670 """
671 assoc = OpenIDAssociation(
672 server_url=server_url,
673 handle=association.handle,
674 secret=base64.encodebytes(association.secret),
675 issued=association.issued,
676 lifetime=association.issued,
677 assoc_type=association.assoc_type,
678 )
679 assoc.save()
681 def getAssociation(self, server_url, handle=None): # pragma: no cover
682 """
683 Helper method to get associations
684 """
685 assocs = []
686 if handle is not None:
687 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
688 server_url=server_url, handle=handle
689 )
690 else:
691 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
692 server_url=server_url
693 )
694 if not assocs:
695 return None
696 associations = []
697 for assoc in assocs:
698 if isinstance(assoc.secret, str):
699 assoc.secret = assoc.secret.split("b'")[1].split("'")[0]
700 assoc.secret = bytes(assoc.secret, "utf-8")
701 association = OIDAssociation(
702 assoc.handle,
703 base64.decodebytes(assoc.secret),
704 assoc.issued,
705 assoc.lifetime,
706 assoc.assoc_type,
707 )
708 expires = 0
709 try:
710 # pylint: disable=no-member
711 expires = association.getExpiresIn()
712 except AttributeError:
713 expires = association.expiresIn
714 if expires == 0:
715 self.removeAssociation(server_url, assoc.handle)
716 else:
717 associations.append((association.issued, association))
718 return associations[-1][1] if associations else None
720 @staticmethod
721 def removeAssociation(server_url, handle): # pragma: no cover
722 """
723 Helper method to remove associations
724 """
725 assocs = list(
726 OpenIDAssociation.objects.filter( # pylint: disable=no-member
727 server_url=server_url, handle=handle
728 )
729 )
730 assocs_exist = len(assocs) > 0
731 for assoc in assocs:
732 assoc.delete()
733 return assocs_exist
735 @staticmethod
736 def useNonce(server_url, timestamp, salt): # pragma: no cover
737 """
738 Helper method to 'use' nonces
739 """
740 # Has nonce expired?
741 if abs(timestamp - time.time()) > oidnonce.SKEW:
742 return False
743 try:
744 nonce = OpenIDNonce.objects.get( # pylint: disable=no-member
745 server_url__exact=server_url,
746 timestamp__exact=timestamp,
747 salt__exact=salt,
748 )
749 except ObjectDoesNotExist:
750 nonce = OpenIDNonce.objects.create( # pylint: disable=no-member
751 server_url=server_url, timestamp=timestamp, salt=salt
752 )
753 return True
754 nonce.delete()
755 return False
757 @staticmethod
758 def cleanupNonces(): # pragma: no cover
759 """
760 Helper method to cleanup nonces
761 """
762 timestamp = int(time.time()) - oidnonce.SKEW
763 # pylint: disable=no-member
764 OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete()
766 @staticmethod
767 def cleanupAssociations(): # pragma: no cover
768 """
769 Helper method to cleanup associations
770 """
771 OpenIDAssociation.objects.extra(
772 where=[f"issued + lifetimeint < ({time.time()})"]
773 ).delete()