Coverage for ivatar/ivataraccount/models.py: 89%
272 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-12 23:12 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-12 23:12 +0000
1# -*- coding: utf-8 -*-
2"""
3Our models for ivatar.ivataraccount
4"""
6import base64
7import hashlib
8import time
9from io import BytesIO
10from os import urandom
11from urllib.error import HTTPError, URLError
12from ivatar.utils import urlopen, Bluesky
13from urllib.parse import urlsplit, urlunsplit
15from PIL import Image
16from django.contrib.auth.models import User
17from django.contrib import messages
18from django.db import models
19from django.utils import timezone
20from django.http import HttpResponseRedirect
21from django.urls import reverse_lazy, reverse
22from django.utils.translation import gettext_lazy as _
23from django.core.exceptions import ObjectDoesNotExist
24from django.core.mail import send_mail
25from django.template.loader import render_to_string
26from openid.association import Association as OIDAssociation
27from openid.store import nonce as oidnonce
28from openid.store.interface import OpenIDStore
30from libravatar import libravatar_url
32from ivatar.settings import MAX_LENGTH_EMAIL, logger
33from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY
34from ivatar.settings import MAX_LENGTH_URL
35from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL
36from ivatar.utils import openid_variations
37from .gravatar import get_photo as get_gravatar_photo
40def file_format(image_type):
41 """
42 Helper method returning a short image type
43 """
44 if image_type in ("JPEG", "MPO"):
45 return "jpg"
46 elif image_type == "PNG":
47 return "png"
48 elif image_type == "GIF":
49 return "gif"
50 elif image_type == "WEBP":
51 return "webp"
52 return None
55def pil_format(image_type):
56 """
57 Helper method returning the 'encoder name' for PIL
58 """
59 if image_type in ("jpg", "jpeg", "mpo"):
60 return "JPEG"
61 elif image_type == "png":
62 return "PNG"
63 elif image_type == "gif":
64 return "GIF"
65 elif image_type == "webp":
66 return "WEBP"
68 logger.info("Unsupported file format: %s", image_type)
69 return None
72class UserPreference(models.Model):
73 """
74 Holds the user users preferences
75 """
77 THEMES = (
78 ("default", "Default theme"),
79 ("clime", "climes theme"),
80 ("green", "green theme"),
81 ("red", "red theme"),
82 )
84 theme = models.CharField(
85 max_length=10,
86 choices=THEMES,
87 default="default",
88 )
90 user = models.OneToOneField(
91 User,
92 on_delete=models.deletion.CASCADE,
93 primary_key=True,
94 )
96 def __str__(self):
97 return "Preference (%i) for %s" % (self.pk, self.user)
100class BaseAccountModel(models.Model):
101 """
102 Base, abstract model, holding fields we use in all cases
103 """
105 user = models.ForeignKey(
106 User,
107 on_delete=models.deletion.CASCADE,
108 )
109 ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True)
110 add_date = models.DateTimeField(default=timezone.now)
112 class Meta: # pylint: disable=too-few-public-methods
113 """
114 Class attributes
115 """
117 abstract = True
120class Photo(BaseAccountModel):
121 """
122 Model holding the photos and information about them
123 """
125 ip_address = models.GenericIPAddressField(unpack_ipv4=True)
126 data = models.BinaryField()
127 format = models.CharField(max_length=4)
128 access_count = models.BigIntegerField(default=0, editable=False)
130 class Meta: # pylint: disable=too-few-public-methods
131 """
132 Class attributes
133 """
135 verbose_name = _("photo")
136 verbose_name_plural = _("photos")
138 def import_image(self, service_name, email_address):
139 """
140 Allow to import image from other (eg. Gravatar) service
141 """
142 image_url = False
144 if service_name == "Gravatar":
145 if gravatar := get_gravatar_photo(email_address):
146 image_url = gravatar["image_url"]
148 if service_name == "Libravatar":
149 image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE)
151 if not image_url:
152 return False # pragma: no cover
153 try:
154 image = urlopen(image_url)
155 except HTTPError as exc:
156 print(f"{service_name} import failed with an HTTP error: {exc.code}")
157 return False
158 except URLError as exc:
159 print(f"{service_name} import failed: {exc.reason}")
160 return False
161 data = image.read()
163 try:
164 img = Image.open(BytesIO(data))
165 # How am I supposed to test this?
166 except ValueError: # pragma: no cover
167 return False # pragma: no cover
169 self.format = file_format(img.format)
170 if not self.format:
171 print(f"Unable to determine format: {img}")
172 return False # pragma: no cover
173 self.data = data
174 super().save()
175 return True
177 def save(
178 self, force_insert=False, force_update=False, using=None, update_fields=None
179 ):
180 """
181 Override save from parent, taking care about the image
182 """
183 # Use PIL to read the file format
184 try:
185 img = Image.open(BytesIO(self.data))
186 except Exception as exc: # pylint: disable=broad-except
187 # For debugging only
188 print(f"Exception caught in Photo.save(): {exc}")
189 return False
190 self.format = file_format(img.format)
191 if not self.format:
192 print("Format not recognized")
193 return False
194 return super().save(force_insert, force_update, using, update_fields)
196 def perform_crop(self, request, dimensions, email, openid):
197 """
198 Helper to crop the image
199 """
200 if request.user.photo_set.count() == 1:
201 # This is the first photo, assign to all confirmed addresses
202 for addr in request.user.confirmedemail_set.all():
203 addr.photo = self
204 addr.save()
206 for addr in request.user.confirmedopenid_set.all():
207 addr.photo = self
208 addr.save()
210 if email:
211 # Explicitly asked
212 email.photo = self
213 email.save()
215 if openid:
216 # Explicitly asked
217 openid.photo = self
218 openid.save()
220 # Do the real work cropping
221 img = Image.open(BytesIO(self.data))
223 # This should be anyway checked during save...
224 dimensions["a"], dimensions["b"] = img.size # pylint: disable=invalid-name
225 if dimensions["a"] > MAX_PIXELS or dimensions["b"] > MAX_PIXELS:
226 messages.error(
227 request,
228 _(
229 "Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s"
230 % {
231 "max_pixels": MAX_PIXELS,
232 }
233 ),
234 )
235 return HttpResponseRedirect(reverse_lazy("profile"))
237 if dimensions["w"] == 0 and dimensions["h"] == 0:
238 dimensions["w"], dimensions["h"] = dimensions["a"], dimensions["b"]
239 min_from_w_h = min(dimensions["w"], dimensions["h"])
240 dimensions["w"], dimensions["h"] = min_from_w_h, min_from_w_h
241 elif (
242 (dimensions["w"] < 0)
243 or ((dimensions["x"] + dimensions["w"]) > dimensions["a"])
244 or (dimensions["h"] < 0)
245 or ((dimensions["y"] + dimensions["h"]) > dimensions["b"])
246 ):
247 messages.error(request, _("Crop outside of original image bounding box"))
248 return HttpResponseRedirect(reverse_lazy("profile"))
250 cropped = img.crop(
251 (
252 dimensions["x"],
253 dimensions["y"],
254 dimensions["x"] + dimensions["w"],
255 dimensions["y"] + dimensions["h"],
256 )
257 )
258 # cropped.load()
259 # Resize the image only if it's larger than the specified max width.
260 cropped_w, cropped_h = cropped.size
261 max_w = AVATAR_MAX_SIZE
262 if cropped_w > max_w or cropped_h > max_w:
263 cropped = cropped.resize((max_w, max_w), Image.LANCZOS)
265 data = BytesIO()
266 cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY)
267 data.seek(0)
269 # Overwrite the existing image
270 self.data = data.read()
271 self.save()
273 return HttpResponseRedirect(reverse_lazy("profile"))
275 def __str__(self):
276 return "%s (%i) from %s" % (self.format, self.pk or 0, self.user)
279# pylint: disable=too-few-public-methods
280class ConfirmedEmailManager(models.Manager):
281 """
282 Manager for our confirmed email addresses model
283 """
285 @staticmethod
286 def create_confirmed_email(user, email_address, is_logged_in):
287 """
288 Helper method to create confirmed email address
289 """
290 confirmed = ConfirmedEmail()
291 confirmed.user = user
292 confirmed.ip_address = "0.0.0.0"
293 confirmed.email = email_address
294 confirmed.save()
296 external_photos = []
297 if is_logged_in:
298 if gravatar := get_gravatar_photo(confirmed.email):
299 external_photos.append(gravatar)
301 return (confirmed.pk, external_photos)
304class ConfirmedEmail(BaseAccountModel):
305 """
306 Model holding our confirmed email addresses, as well as the relation
307 to the assigned photo
308 """
310 email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL)
311 photo = models.ForeignKey(
312 Photo,
313 related_name="emails",
314 blank=True,
315 null=True,
316 on_delete=models.deletion.SET_NULL,
317 )
318 # Alternative assignment - use Bluesky handle
319 bluesky_handle = models.CharField(max_length=256, null=True, blank=True)
320 digest = models.CharField(max_length=32)
321 digest_sha256 = models.CharField(max_length=64)
322 objects = ConfirmedEmailManager()
323 access_count = models.BigIntegerField(default=0, editable=False)
325 class Meta: # pylint: disable=too-few-public-methods
326 """
327 Class attributes
328 """
330 verbose_name = _("confirmed email")
331 verbose_name_plural = _("confirmed emails")
333 def set_photo(self, photo):
334 """
335 Helper method to set photo
336 """
337 self.photo = photo
338 self.save()
340 def set_bluesky_handle(self, handle):
341 """
342 Helper method to set Bluesky handle
343 """
345 bs = Bluesky()
346 handle = bs.normalize_handle(handle)
347 avatar = bs.get_profile(handle)
348 if not avatar:
349 raise ValueError("Invalid Bluesky handle")
350 self.bluesky_handle = handle
351 self.save()
353 def save(
354 self, force_insert=False, force_update=False, using=None, update_fields=None
355 ):
356 """
357 Override save from parent, add digest
358 """
359 self.digest = hashlib.md5(
360 self.email.strip().lower().encode("utf-8")
361 ).hexdigest()
362 self.digest_sha256 = hashlib.sha256(
363 self.email.strip().lower().encode("utf-8")
364 ).hexdigest()
365 return super().save(force_insert, force_update, using, update_fields)
367 def __str__(self):
368 return "%s (%i) from %s" % (self.email, self.pk, self.user)
371class UnconfirmedEmail(BaseAccountModel):
372 """
373 Model holding unconfirmed email addresses as well as the verification key
374 """
376 email = models.EmailField(max_length=MAX_LENGTH_EMAIL)
377 verification_key = models.CharField(max_length=64)
378 last_send_date = models.DateTimeField(null=True, blank=True)
379 last_status = models.TextField(max_length=2047, null=True, blank=True)
381 class Meta: # pylint: disable=too-few-public-methods
382 """
383 Class attributes
384 """
386 verbose_name = _("unconfirmed email")
387 verbose_name_plural = _("unconfirmed emails")
389 def save(
390 self, force_insert=False, force_update=False, using=None, update_fields=None
391 ):
392 if not self.verification_key:
393 hash_object = hashlib.new("sha256")
394 hash_object.update(
395 urandom(1024)
396 + self.user.username.encode("utf-8") # pylint: disable=no-member
397 ) # pylint: disable=no-member
398 self.verification_key = hash_object.hexdigest()
399 super(UnconfirmedEmail, self).save(
400 force_insert, force_update, using, update_fields
401 )
403 def send_confirmation_mail(self, url=SECURE_BASE_URL):
404 """
405 Send confirmation mail to that mail address
406 """
407 link = url + reverse(
408 "confirm_email", kwargs={"verification_key": self.verification_key}
409 )
410 email_subject = _("Confirm your email address on %s") % SITE_NAME
411 email_body = render_to_string(
412 "email_confirmation.txt",
413 {
414 "verification_link": link,
415 "site_name": SITE_NAME,
416 },
417 )
418 self.last_send_date = timezone.now()
419 self.last_status = "OK"
420 # if settings.DEBUG:
421 # print('DEBUG: %s' % link)
422 try:
423 send_mail(email_subject, email_body, DEFAULT_FROM_EMAIL, [self.email])
424 except Exception as e:
425 self.last_status = f"{e}"
426 self.save()
427 return True
429 def __str__(self):
430 return "%s (%i) from %s" % (self.email, self.pk, self.user)
433class UnconfirmedOpenId(BaseAccountModel):
434 """
435 Model holding unconfirmed OpenIDs
436 """
438 openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL)
440 class Meta: # pylint: disable=too-few-public-methods
441 """
442 Meta class
443 """
445 verbose_name = _("unconfirmed OpenID")
446 verbose_name_plural = "unconfirmed_OpenIDs"
448 def __str__(self):
449 return "%s (%i) from %s" % (self.openid, self.pk, self.user)
452class ConfirmedOpenId(BaseAccountModel):
453 """
454 Model holding confirmed OpenIDs, as well as the relation to
455 the assigned photo
456 """
458 openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL)
459 photo = models.ForeignKey(
460 Photo,
461 related_name="openids",
462 blank=True,
463 null=True,
464 on_delete=models.deletion.SET_NULL,
465 )
466 # http://<id>/ base version - http w/ trailing slash
467 digest = models.CharField(max_length=64)
468 # http://<id> - http w/o trailing slash
469 alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None)
470 # https://<id>/ - https w/ trailing slash
471 alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None)
472 # https://<id> - https w/o trailing slash
473 alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None)
474 # Alternative assignment - use Bluesky handle
475 bluesky_handle = models.CharField(max_length=256, null=True, blank=True)
477 access_count = models.BigIntegerField(default=0, editable=False)
479 class Meta: # pylint: disable=too-few-public-methods
480 """
481 Meta class
482 """
484 verbose_name = _("confirmed OpenID")
485 verbose_name_plural = _("confirmed OpenIDs")
487 def set_photo(self, photo):
488 """
489 Helper method to save photo
490 """
491 self.photo = photo
492 self.save()
494 def set_bluesky_handle(self, handle):
495 """
496 Helper method to set Bluesky handle
497 """
498 bs = Bluesky()
499 handle = bs.normalize_handle(handle)
500 avatar = bs.get_profile(handle)
501 if not avatar:
502 raise ValueError("Invalid Bluesky handle")
503 self.bluesky_handle = handle
504 self.save()
506 def save(
507 self, force_insert=False, force_update=False, using=None, update_fields=None
508 ):
509 url = urlsplit(self.openid)
510 if url.username: # pragma: no cover
511 password = url.password or ""
512 netloc = f"{url.username}:{password}@{url.hostname}"
513 else:
514 netloc = url.hostname
515 lowercase_url = urlunsplit(
516 (url.scheme.lower(), netloc, url.path, url.query, url.fragment)
517 )
518 self.openid = lowercase_url
520 self.digest = hashlib.sha256(
521 openid_variations(lowercase_url)[0].encode("utf-8")
522 ).hexdigest()
523 self.alt_digest1 = hashlib.sha256(
524 openid_variations(lowercase_url)[1].encode("utf-8")
525 ).hexdigest()
526 self.alt_digest2 = hashlib.sha256(
527 openid_variations(lowercase_url)[2].encode("utf-8")
528 ).hexdigest()
529 self.alt_digest3 = hashlib.sha256(
530 openid_variations(lowercase_url)[3].encode("utf-8")
531 ).hexdigest()
533 return super().save(force_insert, force_update, using, update_fields)
535 def __str__(self):
536 return "%s (%i) (%s)" % (self.openid, self.pk, self.user)
539class OpenIDNonce(models.Model):
540 """
541 Model holding OpenID Nonces
542 See also: https://github.com/edx/django-openid-auth/
543 """
545 server_url = models.CharField(max_length=255)
546 timestamp = models.IntegerField()
547 salt = models.CharField(max_length=128)
549 def __str__(self):
550 return "%s (%i) (timestamp: %i)" % (self.server_url, self.pk, self.timestamp)
553class OpenIDAssociation(models.Model):
554 """
555 Model holding the relation/association about OpenIDs
556 """
558 server_url = models.TextField(max_length=2047)
559 handle = models.CharField(max_length=255)
560 secret = models.TextField(max_length=255) # stored base64 encoded
561 issued = models.IntegerField()
562 lifetime = models.IntegerField()
563 assoc_type = models.TextField(max_length=64)
565 def __str__(self):
566 return "%s (%i) (%s, lifetime: %i)" % (
567 self.server_url,
568 self.pk,
569 self.assoc_type,
570 self.lifetime,
571 )
574class DjangoOpenIDStore(OpenIDStore):
575 """
576 The Python openid library needs an OpenIDStore subclass to persist data
577 related to OpenID authentications. This one uses our Django models.
578 """
580 @staticmethod
581 def storeAssociation(server_url, association): # pragma: no cover
582 """
583 Helper method to store associations
584 """
585 assoc = OpenIDAssociation(
586 server_url=server_url,
587 handle=association.handle,
588 secret=base64.encodebytes(association.secret),
589 issued=association.issued,
590 lifetime=association.issued,
591 assoc_type=association.assoc_type,
592 )
593 assoc.save()
595 def getAssociation(self, server_url, handle=None): # pragma: no cover
596 """
597 Helper method to get associations
598 """
599 assocs = []
600 if handle is not None:
601 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
602 server_url=server_url, handle=handle
603 )
604 else:
605 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
606 server_url=server_url
607 )
608 if not assocs:
609 return None
610 associations = []
611 for assoc in assocs:
612 if isinstance(assoc.secret, str):
613 assoc.secret = assoc.secret.split("b'")[1].split("'")[0]
614 assoc.secret = bytes(assoc.secret, "utf-8")
615 association = OIDAssociation(
616 assoc.handle,
617 base64.decodebytes(assoc.secret),
618 assoc.issued,
619 assoc.lifetime,
620 assoc.assoc_type,
621 )
622 expires = 0
623 try:
624 # pylint: disable=no-member
625 expires = association.getExpiresIn()
626 except AttributeError:
627 expires = association.expiresIn
628 if expires == 0:
629 self.removeAssociation(server_url, assoc.handle)
630 else:
631 associations.append((association.issued, association))
632 return associations[-1][1] if associations else None
634 @staticmethod
635 def removeAssociation(server_url, handle): # pragma: no cover
636 """
637 Helper method to remove associations
638 """
639 assocs = list(
640 OpenIDAssociation.objects.filter( # pylint: disable=no-member
641 server_url=server_url, handle=handle
642 )
643 )
644 assocs_exist = len(assocs) > 0
645 for assoc in assocs:
646 assoc.delete()
647 return assocs_exist
649 @staticmethod
650 def useNonce(server_url, timestamp, salt): # pragma: no cover
651 """
652 Helper method to 'use' nonces
653 """
654 # Has nonce expired?
655 if abs(timestamp - time.time()) > oidnonce.SKEW:
656 return False
657 try:
658 nonce = OpenIDNonce.objects.get( # pylint: disable=no-member
659 server_url__exact=server_url,
660 timestamp__exact=timestamp,
661 salt__exact=salt,
662 )
663 except ObjectDoesNotExist:
664 nonce = OpenIDNonce.objects.create( # pylint: disable=no-member
665 server_url=server_url, timestamp=timestamp, salt=salt
666 )
667 return True
668 nonce.delete()
669 return False
671 @staticmethod
672 def cleanupNonces(): # pragma: no cover
673 """
674 Helper method to cleanup nonces
675 """
676 timestamp = int(time.time()) - oidnonce.SKEW
677 # pylint: disable=no-member
678 OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete()
680 @staticmethod
681 def cleanupAssociations(): # pragma: no cover
682 """
683 Helper method to cleanup associations
684 """
685 OpenIDAssociation.objects.extra(
686 where=[f"issued + lifetimeint < ({time.time()})"]
687 ).delete()