Coverage for ivatar/ivataraccount/models.py: 88%
280 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-04 23:12 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-04 23:12 +0000
1# -*- coding: utf-8 -*-
2"""
3Our models for ivatar.ivataraccount
4"""
6import base64
7import hashlib
8import time
9from io import BytesIO
10from os import urandom
11from urllib.error import HTTPError, URLError
12from ivatar.utils import urlopen, Bluesky
13from urllib.parse import urlsplit, urlunsplit, quote
15from PIL import Image
16from django.contrib.auth.models import User
17from django.contrib import messages
18from django.db import models
19from django.utils import timezone
20from django.http import HttpResponseRedirect
21from django.urls import reverse_lazy, reverse
22from django.utils.translation import gettext_lazy as _
23from django.core.cache import cache
24from django.core.exceptions import ObjectDoesNotExist
25from django.core.mail import send_mail
26from django.template.loader import render_to_string
27from openid.association import Association as OIDAssociation
28from openid.store import nonce as oidnonce
29from openid.store.interface import OpenIDStore
31from libravatar import libravatar_url
33from ivatar.settings import MAX_LENGTH_EMAIL, logger
34from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY
35from ivatar.settings import MAX_LENGTH_URL
36from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL
37from ivatar.utils import openid_variations
38from .gravatar import get_photo as get_gravatar_photo
41def file_format(image_type):
42 """
43 Helper method returning a short image type
44 """
45 if image_type in ("JPEG", "MPO"):
46 return "jpg"
47 elif image_type == "PNG":
48 return "png"
49 elif image_type == "GIF":
50 return "gif"
51 elif image_type == "WEBP":
52 return "webp"
53 return None
56def pil_format(image_type):
57 """
58 Helper method returning the 'encoder name' for PIL
59 """
60 if image_type in ("jpg", "jpeg", "mpo"):
61 return "JPEG"
62 elif image_type == "png":
63 return "PNG"
64 elif image_type == "gif":
65 return "GIF"
66 elif image_type == "webp":
67 return "WEBP"
69 logger.info("Unsupported file format: %s", image_type)
70 return None
73class UserPreference(models.Model):
74 """
75 Holds the user users preferences
76 """
78 THEMES = (
79 ("default", "Default theme"),
80 ("clime", "climes theme"),
81 ("green", "green theme"),
82 ("red", "red theme"),
83 )
85 theme = models.CharField(
86 max_length=10,
87 choices=THEMES,
88 default="default",
89 )
91 user = models.OneToOneField(
92 User,
93 on_delete=models.deletion.CASCADE,
94 primary_key=True,
95 )
97 def __str__(self):
98 return "Preference (%i) for %s" % (self.pk, self.user)
101class BaseAccountModel(models.Model):
102 """
103 Base, abstract model, holding fields we use in all cases
104 """
106 user = models.ForeignKey(
107 User,
108 on_delete=models.deletion.CASCADE,
109 )
110 ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True)
111 add_date = models.DateTimeField(default=timezone.now)
113 class Meta: # pylint: disable=too-few-public-methods
114 """
115 Class attributes
116 """
118 abstract = True
121class Photo(BaseAccountModel):
122 """
123 Model holding the photos and information about them
124 """
126 ip_address = models.GenericIPAddressField(unpack_ipv4=True)
127 data = models.BinaryField()
128 format = models.CharField(max_length=4)
129 access_count = models.BigIntegerField(default=0, editable=False)
131 class Meta: # pylint: disable=too-few-public-methods
132 """
133 Class attributes
134 """
136 verbose_name = _("photo")
137 verbose_name_plural = _("photos")
139 def import_image(self, service_name, email_address):
140 """
141 Allow to import image from other (eg. Gravatar) service
142 """
143 image_url = False
145 if service_name == "Gravatar":
146 if gravatar := get_gravatar_photo(email_address):
147 image_url = gravatar["image_url"]
149 if service_name == "Libravatar":
150 image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE)
152 if not image_url:
153 return False # pragma: no cover
154 try:
155 image = urlopen(image_url)
156 except HTTPError as exc:
157 print(f"{service_name} import failed with an HTTP error: {exc.code}")
158 return False
159 except URLError as exc:
160 print(f"{service_name} import failed: {exc.reason}")
161 return False
162 data = image.read()
164 try:
165 img = Image.open(BytesIO(data))
166 # How am I supposed to test this?
167 except ValueError: # pragma: no cover
168 return False # pragma: no cover
170 self.format = file_format(img.format)
171 if not self.format:
172 print(f"Unable to determine format: {img}")
173 return False # pragma: no cover
174 self.data = data
175 super().save()
176 return True
178 def save(
179 self, force_insert=False, force_update=False, using=None, update_fields=None
180 ):
181 """
182 Override save from parent, taking care about the image
183 """
184 # Use PIL to read the file format
185 try:
186 img = Image.open(BytesIO(self.data))
187 except Exception as exc: # pylint: disable=broad-except
188 # For debugging only
189 print(f"Exception caught in Photo.save(): {exc}")
190 return False
191 self.format = file_format(img.format)
192 if not self.format:
193 print("Format not recognized")
194 return False
195 return super().save(force_insert, force_update, using, update_fields)
197 def perform_crop(self, request, dimensions, email, openid):
198 """
199 Helper to crop the image
200 """
201 if request.user.photo_set.count() == 1:
202 # This is the first photo, assign to all confirmed addresses
203 for addr in request.user.confirmedemail_set.all():
204 addr.photo = self
205 addr.save()
207 for addr in request.user.confirmedopenid_set.all():
208 addr.photo = self
209 addr.save()
211 if email:
212 # Explicitly asked
213 email.photo = self
214 email.save()
216 if openid:
217 # Explicitly asked
218 openid.photo = self
219 openid.save()
221 # Do the real work cropping
222 img = Image.open(BytesIO(self.data))
224 # This should be anyway checked during save...
225 dimensions["a"], dimensions["b"] = img.size # pylint: disable=invalid-name
226 if dimensions["a"] > MAX_PIXELS or dimensions["b"] > MAX_PIXELS:
227 messages.error(
228 request,
229 _(
230 "Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s"
231 % {
232 "max_pixels": MAX_PIXELS,
233 }
234 ),
235 )
236 return HttpResponseRedirect(reverse_lazy("profile"))
238 if dimensions["w"] == 0 and dimensions["h"] == 0:
239 dimensions["w"], dimensions["h"] = dimensions["a"], dimensions["b"]
240 min_from_w_h = min(dimensions["w"], dimensions["h"])
241 dimensions["w"], dimensions["h"] = min_from_w_h, min_from_w_h
242 elif (
243 (dimensions["w"] < 0)
244 or ((dimensions["x"] + dimensions["w"]) > dimensions["a"])
245 or (dimensions["h"] < 0)
246 or ((dimensions["y"] + dimensions["h"]) > dimensions["b"])
247 ):
248 messages.error(request, _("Crop outside of original image bounding box"))
249 return HttpResponseRedirect(reverse_lazy("profile"))
251 cropped = img.crop(
252 (
253 dimensions["x"],
254 dimensions["y"],
255 dimensions["x"] + dimensions["w"],
256 dimensions["y"] + dimensions["h"],
257 )
258 )
259 # cropped.load()
260 # Resize the image only if it's larger than the specified max width.
261 cropped_w, cropped_h = cropped.size
262 max_w = AVATAR_MAX_SIZE
263 if cropped_w > max_w or cropped_h > max_w:
264 cropped = cropped.resize((max_w, max_w), Image.LANCZOS)
266 data = BytesIO()
267 cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY)
268 data.seek(0)
270 # Overwrite the existing image
271 self.data = data.read()
272 self.save()
274 return HttpResponseRedirect(reverse_lazy("profile"))
276 def __str__(self):
277 return "%s (%i) from %s" % (self.format, self.pk or 0, self.user)
280# pylint: disable=too-few-public-methods
281class ConfirmedEmailManager(models.Manager):
282 """
283 Manager for our confirmed email addresses model
284 """
286 @staticmethod
287 def create_confirmed_email(user, email_address, is_logged_in):
288 """
289 Helper method to create confirmed email address
290 """
291 confirmed = ConfirmedEmail()
292 confirmed.user = user
293 confirmed.ip_address = "0.0.0.0"
294 confirmed.email = email_address
295 confirmed.save()
297 external_photos = []
298 if is_logged_in:
299 if gravatar := get_gravatar_photo(confirmed.email):
300 external_photos.append(gravatar)
302 return (confirmed.pk, external_photos)
305class ConfirmedEmail(BaseAccountModel):
306 """
307 Model holding our confirmed email addresses, as well as the relation
308 to the assigned photo
309 """
311 email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL)
312 photo = models.ForeignKey(
313 Photo,
314 related_name="emails",
315 blank=True,
316 null=True,
317 on_delete=models.deletion.SET_NULL,
318 )
319 # Alternative assignment - use Bluesky handle
320 bluesky_handle = models.CharField(max_length=256, null=True, blank=True)
321 digest = models.CharField(max_length=32)
322 digest_sha256 = models.CharField(max_length=64)
323 objects = ConfirmedEmailManager()
324 access_count = models.BigIntegerField(default=0, editable=False)
326 class Meta: # pylint: disable=too-few-public-methods
327 """
328 Class attributes
329 """
331 verbose_name = _("confirmed email")
332 verbose_name_plural = _("confirmed emails")
334 def set_photo(self, photo):
335 """
336 Helper method to set photo
337 """
338 self.photo = photo
339 self.save()
341 def set_bluesky_handle(self, handle):
342 """
343 Helper method to set Bluesky handle
344 """
346 bs = Bluesky()
347 handle = bs.normalize_handle(handle)
348 avatar = bs.get_profile(handle)
349 if not avatar:
350 raise ValueError("Invalid Bluesky handle")
351 self.bluesky_handle = handle
352 self.save()
354 def save(
355 self, force_insert=False, force_update=False, using=None, update_fields=None
356 ):
357 """
358 Override save from parent, add digest
359 """
360 self.digest = hashlib.md5(
361 self.email.strip().lower().encode("utf-8")
362 ).hexdigest()
363 self.digest_sha256 = hashlib.sha256(
364 self.email.strip().lower().encode("utf-8")
365 ).hexdigest()
367 # We need to manually expire the page caches
368 # TODO: Verify this works as expected
369 # First check if we already have an ID
370 if self.pk:
371 cache_url = reverse_lazy(
372 "assign_photo_email", kwargs={"email_id": int(self.pk)}
373 )
375 cache_key = f"views.decorators.cache.cache_page.{quote(str(cache_url))}"
376 if cache.has_key(cache_key):
377 cache.delete(cache_key)
378 logger.error("Successfully cleaned up cached page: %s" % cache_key)
379 else:
380 logger.error("Page %s wasn't cached.", cache_key)
382 return super().save(force_insert, force_update, using, update_fields)
384 def __str__(self):
385 return "%s (%i) from %s" % (self.email, self.pk, self.user)
388class UnconfirmedEmail(BaseAccountModel):
389 """
390 Model holding unconfirmed email addresses as well as the verification key
391 """
393 email = models.EmailField(max_length=MAX_LENGTH_EMAIL)
394 verification_key = models.CharField(max_length=64)
395 last_send_date = models.DateTimeField(null=True, blank=True)
396 last_status = models.TextField(max_length=2047, null=True, blank=True)
398 class Meta: # pylint: disable=too-few-public-methods
399 """
400 Class attributes
401 """
403 verbose_name = _("unconfirmed email")
404 verbose_name_plural = _("unconfirmed emails")
406 def save(
407 self, force_insert=False, force_update=False, using=None, update_fields=None
408 ):
409 if not self.verification_key:
410 hash_object = hashlib.new("sha256")
411 hash_object.update(
412 urandom(1024)
413 + self.user.username.encode("utf-8") # pylint: disable=no-member
414 ) # pylint: disable=no-member
415 self.verification_key = hash_object.hexdigest()
416 super(UnconfirmedEmail, self).save(
417 force_insert, force_update, using, update_fields
418 )
420 def send_confirmation_mail(self, url=SECURE_BASE_URL):
421 """
422 Send confirmation mail to that mail address
423 """
424 link = url + reverse(
425 "confirm_email", kwargs={"verification_key": self.verification_key}
426 )
427 email_subject = _("Confirm your email address on %s") % SITE_NAME
428 email_body = render_to_string(
429 "email_confirmation.txt",
430 {
431 "verification_link": link,
432 "site_name": SITE_NAME,
433 },
434 )
435 self.last_send_date = timezone.now()
436 self.last_status = "OK"
437 # if settings.DEBUG:
438 # print('DEBUG: %s' % link)
439 try:
440 send_mail(email_subject, email_body, DEFAULT_FROM_EMAIL, [self.email])
441 except Exception as e:
442 self.last_status = f"{e}"
443 self.save()
444 return True
446 def __str__(self):
447 return "%s (%i) from %s" % (self.email, self.pk, self.user)
450class UnconfirmedOpenId(BaseAccountModel):
451 """
452 Model holding unconfirmed OpenIDs
453 """
455 openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL)
457 class Meta: # pylint: disable=too-few-public-methods
458 """
459 Meta class
460 """
462 verbose_name = _("unconfirmed OpenID")
463 verbose_name_plural = "unconfirmed_OpenIDs"
465 def __str__(self):
466 return "%s (%i) from %s" % (self.openid, self.pk, self.user)
469class ConfirmedOpenId(BaseAccountModel):
470 """
471 Model holding confirmed OpenIDs, as well as the relation to
472 the assigned photo
473 """
475 openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL)
476 photo = models.ForeignKey(
477 Photo,
478 related_name="openids",
479 blank=True,
480 null=True,
481 on_delete=models.deletion.SET_NULL,
482 )
483 # http://<id>/ base version - http w/ trailing slash
484 digest = models.CharField(max_length=64)
485 # http://<id> - http w/o trailing slash
486 alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None)
487 # https://<id>/ - https w/ trailing slash
488 alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None)
489 # https://<id> - https w/o trailing slash
490 alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None)
491 # Alternative assignment - use Bluesky handle
492 bluesky_handle = models.CharField(max_length=256, null=True, blank=True)
494 access_count = models.BigIntegerField(default=0, editable=False)
496 class Meta: # pylint: disable=too-few-public-methods
497 """
498 Meta class
499 """
501 verbose_name = _("confirmed OpenID")
502 verbose_name_plural = _("confirmed OpenIDs")
504 def set_photo(self, photo):
505 """
506 Helper method to save photo
507 """
508 self.photo = photo
509 self.save()
511 def set_bluesky_handle(self, handle):
512 """
513 Helper method to set Bluesky handle
514 """
515 bs = Bluesky()
516 handle = bs.normalize_handle(handle)
517 avatar = bs.get_profile(handle)
518 if not avatar:
519 raise ValueError("Invalid Bluesky handle")
520 self.bluesky_handle = handle
521 self.save()
523 def save(
524 self, force_insert=False, force_update=False, using=None, update_fields=None
525 ):
526 url = urlsplit(self.openid)
527 if url.username: # pragma: no cover
528 password = url.password or ""
529 netloc = f"{url.username}:{password}@{url.hostname}"
530 else:
531 netloc = url.hostname
532 lowercase_url = urlunsplit(
533 (url.scheme.lower(), netloc, url.path, url.query, url.fragment)
534 )
535 self.openid = lowercase_url
537 self.digest = hashlib.sha256(
538 openid_variations(lowercase_url)[0].encode("utf-8")
539 ).hexdigest()
540 self.alt_digest1 = hashlib.sha256(
541 openid_variations(lowercase_url)[1].encode("utf-8")
542 ).hexdigest()
543 self.alt_digest2 = hashlib.sha256(
544 openid_variations(lowercase_url)[2].encode("utf-8")
545 ).hexdigest()
546 self.alt_digest3 = hashlib.sha256(
547 openid_variations(lowercase_url)[3].encode("utf-8")
548 ).hexdigest()
550 return super().save(force_insert, force_update, using, update_fields)
552 def __str__(self):
553 return "%s (%i) (%s)" % (self.openid, self.pk, self.user)
556class OpenIDNonce(models.Model):
557 """
558 Model holding OpenID Nonces
559 See also: https://github.com/edx/django-openid-auth/
560 """
562 server_url = models.CharField(max_length=255)
563 timestamp = models.IntegerField()
564 salt = models.CharField(max_length=128)
566 def __str__(self):
567 return "%s (%i) (timestamp: %i)" % (self.server_url, self.pk, self.timestamp)
570class OpenIDAssociation(models.Model):
571 """
572 Model holding the relation/association about OpenIDs
573 """
575 server_url = models.TextField(max_length=2047)
576 handle = models.CharField(max_length=255)
577 secret = models.TextField(max_length=255) # stored base64 encoded
578 issued = models.IntegerField()
579 lifetime = models.IntegerField()
580 assoc_type = models.TextField(max_length=64)
582 def __str__(self):
583 return "%s (%i) (%s, lifetime: %i)" % (
584 self.server_url,
585 self.pk,
586 self.assoc_type,
587 self.lifetime,
588 )
591class DjangoOpenIDStore(OpenIDStore):
592 """
593 The Python openid library needs an OpenIDStore subclass to persist data
594 related to OpenID authentications. This one uses our Django models.
595 """
597 @staticmethod
598 def storeAssociation(server_url, association): # pragma: no cover
599 """
600 Helper method to store associations
601 """
602 assoc = OpenIDAssociation(
603 server_url=server_url,
604 handle=association.handle,
605 secret=base64.encodebytes(association.secret),
606 issued=association.issued,
607 lifetime=association.issued,
608 assoc_type=association.assoc_type,
609 )
610 assoc.save()
612 def getAssociation(self, server_url, handle=None): # pragma: no cover
613 """
614 Helper method to get associations
615 """
616 assocs = []
617 if handle is not None:
618 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
619 server_url=server_url, handle=handle
620 )
621 else:
622 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
623 server_url=server_url
624 )
625 if not assocs:
626 return None
627 associations = []
628 for assoc in assocs:
629 if isinstance(assoc.secret, str):
630 assoc.secret = assoc.secret.split("b'")[1].split("'")[0]
631 assoc.secret = bytes(assoc.secret, "utf-8")
632 association = OIDAssociation(
633 assoc.handle,
634 base64.decodebytes(assoc.secret),
635 assoc.issued,
636 assoc.lifetime,
637 assoc.assoc_type,
638 )
639 expires = 0
640 try:
641 # pylint: disable=no-member
642 expires = association.getExpiresIn()
643 except AttributeError:
644 expires = association.expiresIn
645 if expires == 0:
646 self.removeAssociation(server_url, assoc.handle)
647 else:
648 associations.append((association.issued, association))
649 return associations[-1][1] if associations else None
651 @staticmethod
652 def removeAssociation(server_url, handle): # pragma: no cover
653 """
654 Helper method to remove associations
655 """
656 assocs = list(
657 OpenIDAssociation.objects.filter( # pylint: disable=no-member
658 server_url=server_url, handle=handle
659 )
660 )
661 assocs_exist = len(assocs) > 0
662 for assoc in assocs:
663 assoc.delete()
664 return assocs_exist
666 @staticmethod
667 def useNonce(server_url, timestamp, salt): # pragma: no cover
668 """
669 Helper method to 'use' nonces
670 """
671 # Has nonce expired?
672 if abs(timestamp - time.time()) > oidnonce.SKEW:
673 return False
674 try:
675 nonce = OpenIDNonce.objects.get( # pylint: disable=no-member
676 server_url__exact=server_url,
677 timestamp__exact=timestamp,
678 salt__exact=salt,
679 )
680 except ObjectDoesNotExist:
681 nonce = OpenIDNonce.objects.create( # pylint: disable=no-member
682 server_url=server_url, timestamp=timestamp, salt=salt
683 )
684 return True
685 nonce.delete()
686 return False
688 @staticmethod
689 def cleanupNonces(): # pragma: no cover
690 """
691 Helper method to cleanup nonces
692 """
693 timestamp = int(time.time()) - oidnonce.SKEW
694 # pylint: disable=no-member
695 OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete()
697 @staticmethod
698 def cleanupAssociations(): # pragma: no cover
699 """
700 Helper method to cleanup associations
701 """
702 OpenIDAssociation.objects.extra(
703 where=[f"issued + lifetimeint < ({time.time()})"]
704 ).delete()