Coverage for ivatar/ivataraccount/models.py: 88%
279 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-14 23:06 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-14 23:06 +0000
1# -*- coding: utf-8 -*-
2"""
3Our models for ivatar.ivataraccount
4"""
6import base64
7import hashlib
8import time
9from io import BytesIO
10from os import urandom
11from urllib.error import HTTPError, URLError
12from ivatar.utils import urlopen, Bluesky
13from urllib.parse import urlsplit, urlunsplit, quote
15from PIL import Image
16from django.contrib.auth.models import User
17from django.contrib import messages
18from django.db import models
19from django.utils import timezone
20from django.http import HttpResponseRedirect
21from django.urls import reverse_lazy, reverse
22from django.utils.translation import gettext_lazy as _
23from django.core.cache import cache
24from django.core.exceptions import ObjectDoesNotExist
25from django.core.mail import send_mail
26from django.template.loader import render_to_string
27from openid.association import Association as OIDAssociation
28from openid.store import nonce as oidnonce
29from openid.store.interface import OpenIDStore
31from libravatar import libravatar_url
33from ivatar.settings import MAX_LENGTH_EMAIL, logger
34from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY
35from ivatar.settings import MAX_LENGTH_URL
36from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL
37from ivatar.utils import openid_variations
38from .gravatar import get_photo as get_gravatar_photo
41def file_format(image_type):
42 """
43 Helper method returning a short image type
44 """
45 if image_type in ("JPEG", "MPO"):
46 return "jpg"
47 elif image_type == "PNG":
48 return "png"
49 elif image_type == "GIF":
50 return "gif"
51 elif image_type == "WEBP":
52 return "webp"
53 return None
56def pil_format(image_type):
57 """
58 Helper method returning the 'encoder name' for PIL
59 """
60 if image_type in ("jpg", "jpeg", "mpo"):
61 return "JPEG"
62 elif image_type == "png":
63 return "PNG"
64 elif image_type == "gif":
65 return "GIF"
66 elif image_type == "webp":
67 return "WEBP"
69 logger.info("Unsupported file format: %s", image_type)
70 return None
73class UserPreference(models.Model):
74 """
75 Holds the user users preferences
76 """
78 THEMES = (
79 ("default", "Default theme"),
80 ("clime", "climes theme"),
81 ("green", "green theme"),
82 ("red", "red theme"),
83 )
85 theme = models.CharField(
86 max_length=10,
87 choices=THEMES,
88 default="default",
89 )
91 user = models.OneToOneField(
92 User,
93 on_delete=models.deletion.CASCADE,
94 primary_key=True,
95 )
97 def __str__(self):
98 return "Preference (%i) for %s" % (self.pk, self.user)
101class BaseAccountModel(models.Model):
102 """
103 Base, abstract model, holding fields we use in all cases
104 """
106 user = models.ForeignKey(
107 User,
108 on_delete=models.deletion.CASCADE,
109 )
110 ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True)
111 add_date = models.DateTimeField(default=timezone.now)
113 class Meta: # pylint: disable=too-few-public-methods
114 """
115 Class attributes
116 """
118 abstract = True
121class Photo(BaseAccountModel):
122 """
123 Model holding the photos and information about them
124 """
126 ip_address = models.GenericIPAddressField(unpack_ipv4=True)
127 data = models.BinaryField()
128 format = models.CharField(max_length=4)
129 access_count = models.BigIntegerField(default=0, editable=False)
131 class Meta: # pylint: disable=too-few-public-methods
132 """
133 Class attributes
134 """
136 verbose_name = _("photo")
137 verbose_name_plural = _("photos")
139 def import_image(self, service_name, email_address):
140 """
141 Allow to import image from other (eg. Gravatar) service
142 """
143 image_url = False
145 if service_name == "Gravatar":
146 if gravatar := get_gravatar_photo(email_address):
147 image_url = gravatar["image_url"]
149 if service_name == "Libravatar":
150 image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE)
152 if not image_url:
153 return False # pragma: no cover
154 try:
155 image = urlopen(image_url)
156 except HTTPError as exc:
157 print(f"{service_name} import failed with an HTTP error: {exc.code}")
158 return False
159 except URLError as exc:
160 print(f"{service_name} import failed: {exc.reason}")
161 return False
162 data = image.read()
164 try:
165 img = Image.open(BytesIO(data))
166 # How am I supposed to test this?
167 except ValueError: # pragma: no cover
168 return False # pragma: no cover
170 self.format = file_format(img.format)
171 if not self.format:
172 print(f"Unable to determine format: {img}")
173 return False # pragma: no cover
174 self.data = data
175 super().save()
176 return True
178 def save(
179 self, force_insert=False, force_update=False, using=None, update_fields=None
180 ):
181 """
182 Override save from parent, taking care about the image
183 """
184 # Use PIL to read the file format
185 try:
186 img = Image.open(BytesIO(self.data))
187 except Exception as exc: # pylint: disable=broad-except
188 # For debugging only
189 print(f"Exception caught in Photo.save(): {exc}")
190 return False
191 self.format = file_format(img.format)
192 if not self.format:
193 print("Format not recognized")
194 return False
195 return super().save(force_insert, force_update, using, update_fields)
197 def perform_crop(self, request, dimensions, email, openid):
198 """
199 Helper to crop the image
200 """
201 if request.user.photo_set.count() == 1:
202 # This is the first photo, assign to all confirmed addresses
203 for addr in request.user.confirmedemail_set.all():
204 addr.photo = self
205 addr.save()
207 for addr in request.user.confirmedopenid_set.all():
208 addr.photo = self
209 addr.save()
211 if email:
212 # Explicitly asked
213 email.photo = self
214 email.save()
216 if openid:
217 # Explicitly asked
218 openid.photo = self
219 openid.save()
221 # Do the real work cropping
222 img = Image.open(BytesIO(self.data))
224 # This should be anyway checked during save...
225 dimensions["a"], dimensions["b"] = img.size # pylint: disable=invalid-name
226 if dimensions["a"] > MAX_PIXELS or dimensions["b"] > MAX_PIXELS:
227 messages.error(
228 request,
229 _(
230 "Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s"
231 % {
232 "max_pixels": MAX_PIXELS,
233 }
234 ),
235 )
236 return HttpResponseRedirect(reverse_lazy("profile"))
238 if dimensions["w"] == 0 and dimensions["h"] == 0:
239 dimensions["w"], dimensions["h"] = dimensions["a"], dimensions["b"]
240 min_from_w_h = min(dimensions["w"], dimensions["h"])
241 dimensions["w"], dimensions["h"] = min_from_w_h, min_from_w_h
242 elif (
243 (dimensions["w"] < 0)
244 or ((dimensions["x"] + dimensions["w"]) > dimensions["a"])
245 or (dimensions["h"] < 0)
246 or ((dimensions["y"] + dimensions["h"]) > dimensions["b"])
247 ):
248 messages.error(request, _("Crop outside of original image bounding box"))
249 return HttpResponseRedirect(reverse_lazy("profile"))
251 cropped = img.crop(
252 (
253 dimensions["x"],
254 dimensions["y"],
255 dimensions["x"] + dimensions["w"],
256 dimensions["y"] + dimensions["h"],
257 )
258 )
259 # cropped.load()
260 # Resize the image only if it's larger than the specified max width.
261 cropped_w, cropped_h = cropped.size
262 max_w = AVATAR_MAX_SIZE
263 if cropped_w > max_w or cropped_h > max_w:
264 cropped = cropped.resize((max_w, max_w), Image.LANCZOS)
266 data = BytesIO()
267 cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY)
268 data.seek(0)
270 # Overwrite the existing image
271 self.data = data.read()
272 self.save()
274 return HttpResponseRedirect(reverse_lazy("profile"))
276 def __str__(self):
277 return "%s (%i) from %s" % (self.format, self.pk or 0, self.user)
280# pylint: disable=too-few-public-methods
281class ConfirmedEmailManager(models.Manager):
282 """
283 Manager for our confirmed email addresses model
284 """
286 @staticmethod
287 def create_confirmed_email(user, email_address, is_logged_in):
288 """
289 Helper method to create confirmed email address
290 """
291 confirmed = ConfirmedEmail()
292 confirmed.user = user
293 confirmed.ip_address = "0.0.0.0"
294 confirmed.email = email_address
295 confirmed.save()
297 external_photos = []
298 if is_logged_in:
299 if gravatar := get_gravatar_photo(confirmed.email):
300 external_photos.append(gravatar)
302 return (confirmed.pk, external_photos)
305class ConfirmedEmail(BaseAccountModel):
306 """
307 Model holding our confirmed email addresses, as well as the relation
308 to the assigned photo
309 """
311 email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL)
312 photo = models.ForeignKey(
313 Photo,
314 related_name="emails",
315 blank=True,
316 null=True,
317 on_delete=models.deletion.SET_NULL,
318 )
319 # Alternative assignment - use Bluesky handle
320 bluesky_handle = models.CharField(max_length=256, null=True, blank=True)
321 digest = models.CharField(max_length=32)
322 digest_sha256 = models.CharField(max_length=64)
323 objects = ConfirmedEmailManager()
324 access_count = models.BigIntegerField(default=0, editable=False)
326 class Meta: # pylint: disable=too-few-public-methods
327 """
328 Class attributes
329 """
331 verbose_name = _("confirmed email")
332 verbose_name_plural = _("confirmed emails")
334 def set_photo(self, photo):
335 """
336 Helper method to set photo
337 """
338 self.photo = photo
339 self.save()
341 def set_bluesky_handle(self, handle):
342 """
343 Helper method to set Bluesky handle
344 """
346 bs = Bluesky()
347 handle = bs.normalize_handle(handle)
348 avatar = bs.get_profile(handle)
349 if not avatar:
350 raise ValueError("Invalid Bluesky handle")
351 self.bluesky_handle = handle
352 self.save()
354 def save(
355 self, force_insert=False, force_update=False, using=None, update_fields=None
356 ):
357 """
358 Override save from parent, add digest
359 """
360 self.digest = hashlib.md5(
361 self.email.strip().lower().encode("utf-8")
362 ).hexdigest()
363 self.digest_sha256 = hashlib.sha256(
364 self.email.strip().lower().encode("utf-8")
365 ).hexdigest()
367 # We need to manually expire the page caches
368 # TODO: Verify this works as expected
369 # First check if we already have an ID
370 if self.pk:
371 cache_url = reverse_lazy(
372 "assign_photo_email", kwargs={"email_id": int(self.pk)}
373 )
375 cache_key = f"views.decorators.cache.cache_page.{quote(str(cache_url))}"
376 if cache.has_key(cache_key):
377 cache.delete(cache_key)
378 logger.debug("Successfully cleaned up cached page: %s" % cache_key)
380 return super().save(force_insert, force_update, using, update_fields)
382 def __str__(self):
383 return "%s (%i) from %s" % (self.email, self.pk, self.user)
386class UnconfirmedEmail(BaseAccountModel):
387 """
388 Model holding unconfirmed email addresses as well as the verification key
389 """
391 email = models.EmailField(max_length=MAX_LENGTH_EMAIL)
392 verification_key = models.CharField(max_length=64)
393 last_send_date = models.DateTimeField(null=True, blank=True)
394 last_status = models.TextField(max_length=2047, null=True, blank=True)
396 class Meta: # pylint: disable=too-few-public-methods
397 """
398 Class attributes
399 """
401 verbose_name = _("unconfirmed email")
402 verbose_name_plural = _("unconfirmed emails")
404 def save(
405 self, force_insert=False, force_update=False, using=None, update_fields=None
406 ):
407 if not self.verification_key:
408 hash_object = hashlib.new("sha256")
409 hash_object.update(
410 urandom(1024)
411 + self.user.username.encode("utf-8") # pylint: disable=no-member
412 ) # pylint: disable=no-member
413 self.verification_key = hash_object.hexdigest()
414 super(UnconfirmedEmail, self).save(
415 force_insert, force_update, using, update_fields
416 )
418 def send_confirmation_mail(self, url=SECURE_BASE_URL):
419 """
420 Send confirmation mail to that mail address
421 """
422 link = url + reverse(
423 "confirm_email", kwargs={"verification_key": self.verification_key}
424 )
425 email_subject = _("Confirm your email address on %s") % SITE_NAME
426 email_body = render_to_string(
427 "email_confirmation.txt",
428 {
429 "verification_link": link,
430 "site_name": SITE_NAME,
431 },
432 )
433 self.last_send_date = timezone.now()
434 self.last_status = "OK"
435 # if settings.DEBUG:
436 # print('DEBUG: %s' % link)
437 try:
438 send_mail(email_subject, email_body, DEFAULT_FROM_EMAIL, [self.email])
439 except Exception as e:
440 self.last_status = f"{e}"
441 self.save()
442 return True
444 def __str__(self):
445 return "%s (%i) from %s" % (self.email, self.pk, self.user)
448class UnconfirmedOpenId(BaseAccountModel):
449 """
450 Model holding unconfirmed OpenIDs
451 """
453 openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL)
455 class Meta: # pylint: disable=too-few-public-methods
456 """
457 Meta class
458 """
460 verbose_name = _("unconfirmed OpenID")
461 verbose_name_plural = "unconfirmed_OpenIDs"
463 def __str__(self):
464 return "%s (%i) from %s" % (self.openid, self.pk, self.user)
467class ConfirmedOpenId(BaseAccountModel):
468 """
469 Model holding confirmed OpenIDs, as well as the relation to
470 the assigned photo
471 """
473 openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL)
474 photo = models.ForeignKey(
475 Photo,
476 related_name="openids",
477 blank=True,
478 null=True,
479 on_delete=models.deletion.SET_NULL,
480 )
481 # http://<id>/ base version - http w/ trailing slash
482 digest = models.CharField(max_length=64)
483 # http://<id> - http w/o trailing slash
484 alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None)
485 # https://<id>/ - https w/ trailing slash
486 alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None)
487 # https://<id> - https w/o trailing slash
488 alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None)
489 # Alternative assignment - use Bluesky handle
490 bluesky_handle = models.CharField(max_length=256, null=True, blank=True)
492 access_count = models.BigIntegerField(default=0, editable=False)
494 class Meta: # pylint: disable=too-few-public-methods
495 """
496 Meta class
497 """
499 verbose_name = _("confirmed OpenID")
500 verbose_name_plural = _("confirmed OpenIDs")
502 def set_photo(self, photo):
503 """
504 Helper method to save photo
505 """
506 self.photo = photo
507 self.save()
509 def set_bluesky_handle(self, handle):
510 """
511 Helper method to set Bluesky handle
512 """
513 bs = Bluesky()
514 handle = bs.normalize_handle(handle)
515 avatar = bs.get_profile(handle)
516 if not avatar:
517 raise ValueError("Invalid Bluesky handle")
518 self.bluesky_handle = handle
519 self.save()
521 def save(
522 self, force_insert=False, force_update=False, using=None, update_fields=None
523 ):
524 url = urlsplit(self.openid)
525 if url.username: # pragma: no cover
526 password = url.password or ""
527 netloc = f"{url.username}:{password}@{url.hostname}"
528 else:
529 netloc = url.hostname
530 lowercase_url = urlunsplit(
531 (url.scheme.lower(), netloc, url.path, url.query, url.fragment)
532 )
533 self.openid = lowercase_url
535 self.digest = hashlib.sha256(
536 openid_variations(lowercase_url)[0].encode("utf-8")
537 ).hexdigest()
538 self.alt_digest1 = hashlib.sha256(
539 openid_variations(lowercase_url)[1].encode("utf-8")
540 ).hexdigest()
541 self.alt_digest2 = hashlib.sha256(
542 openid_variations(lowercase_url)[2].encode("utf-8")
543 ).hexdigest()
544 self.alt_digest3 = hashlib.sha256(
545 openid_variations(lowercase_url)[3].encode("utf-8")
546 ).hexdigest()
548 return super().save(force_insert, force_update, using, update_fields)
550 def __str__(self):
551 return "%s (%i) (%s)" % (self.openid, self.pk, self.user)
554class OpenIDNonce(models.Model):
555 """
556 Model holding OpenID Nonces
557 See also: https://github.com/edx/django-openid-auth/
558 """
560 server_url = models.CharField(max_length=255)
561 timestamp = models.IntegerField()
562 salt = models.CharField(max_length=128)
564 def __str__(self):
565 return "%s (%i) (timestamp: %i)" % (self.server_url, self.pk, self.timestamp)
568class OpenIDAssociation(models.Model):
569 """
570 Model holding the relation/association about OpenIDs
571 """
573 server_url = models.TextField(max_length=2047)
574 handle = models.CharField(max_length=255)
575 secret = models.TextField(max_length=255) # stored base64 encoded
576 issued = models.IntegerField()
577 lifetime = models.IntegerField()
578 assoc_type = models.TextField(max_length=64)
580 def __str__(self):
581 return "%s (%i) (%s, lifetime: %i)" % (
582 self.server_url,
583 self.pk,
584 self.assoc_type,
585 self.lifetime,
586 )
589class DjangoOpenIDStore(OpenIDStore):
590 """
591 The Python openid library needs an OpenIDStore subclass to persist data
592 related to OpenID authentications. This one uses our Django models.
593 """
595 @staticmethod
596 def storeAssociation(server_url, association): # pragma: no cover
597 """
598 Helper method to store associations
599 """
600 assoc = OpenIDAssociation(
601 server_url=server_url,
602 handle=association.handle,
603 secret=base64.encodebytes(association.secret),
604 issued=association.issued,
605 lifetime=association.issued,
606 assoc_type=association.assoc_type,
607 )
608 assoc.save()
610 def getAssociation(self, server_url, handle=None): # pragma: no cover
611 """
612 Helper method to get associations
613 """
614 assocs = []
615 if handle is not None:
616 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
617 server_url=server_url, handle=handle
618 )
619 else:
620 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
621 server_url=server_url
622 )
623 if not assocs:
624 return None
625 associations = []
626 for assoc in assocs:
627 if isinstance(assoc.secret, str):
628 assoc.secret = assoc.secret.split("b'")[1].split("'")[0]
629 assoc.secret = bytes(assoc.secret, "utf-8")
630 association = OIDAssociation(
631 assoc.handle,
632 base64.decodebytes(assoc.secret),
633 assoc.issued,
634 assoc.lifetime,
635 assoc.assoc_type,
636 )
637 expires = 0
638 try:
639 # pylint: disable=no-member
640 expires = association.getExpiresIn()
641 except AttributeError:
642 expires = association.expiresIn
643 if expires == 0:
644 self.removeAssociation(server_url, assoc.handle)
645 else:
646 associations.append((association.issued, association))
647 return associations[-1][1] if associations else None
649 @staticmethod
650 def removeAssociation(server_url, handle): # pragma: no cover
651 """
652 Helper method to remove associations
653 """
654 assocs = list(
655 OpenIDAssociation.objects.filter( # pylint: disable=no-member
656 server_url=server_url, handle=handle
657 )
658 )
659 assocs_exist = len(assocs) > 0
660 for assoc in assocs:
661 assoc.delete()
662 return assocs_exist
664 @staticmethod
665 def useNonce(server_url, timestamp, salt): # pragma: no cover
666 """
667 Helper method to 'use' nonces
668 """
669 # Has nonce expired?
670 if abs(timestamp - time.time()) > oidnonce.SKEW:
671 return False
672 try:
673 nonce = OpenIDNonce.objects.get( # pylint: disable=no-member
674 server_url__exact=server_url,
675 timestamp__exact=timestamp,
676 salt__exact=salt,
677 )
678 except ObjectDoesNotExist:
679 nonce = OpenIDNonce.objects.create( # pylint: disable=no-member
680 server_url=server_url, timestamp=timestamp, salt=salt
681 )
682 return True
683 nonce.delete()
684 return False
686 @staticmethod
687 def cleanupNonces(): # pragma: no cover
688 """
689 Helper method to cleanup nonces
690 """
691 timestamp = int(time.time()) - oidnonce.SKEW
692 # pylint: disable=no-member
693 OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete()
695 @staticmethod
696 def cleanupAssociations(): # pragma: no cover
697 """
698 Helper method to cleanup associations
699 """
700 OpenIDAssociation.objects.extra(
701 where=[f"issued + lifetimeint < ({time.time()})"]
702 ).delete()