Coverage for ivatar/ivataraccount/models.py: 88%
255 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-26 00:11 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-26 00:11 +0000
1# -*- coding: utf-8 -*-
2"""
3Our models for ivatar.ivataraccount
4"""
6import base64
7import hashlib
8import time
9from io import BytesIO
10from os import urandom
11from urllib.error import HTTPError, URLError
12from urllib.request import urlopen
13from urllib.parse import urlsplit, urlunsplit
15from PIL import Image
16from django.contrib.auth.models import User
17from django.contrib import messages
18from django.db import models
19from django.utils import timezone
20from django.http import HttpResponseRedirect
21from django.urls import reverse_lazy, reverse
22from django.utils.translation import gettext_lazy as _
23from django.core.exceptions import ObjectDoesNotExist
24from django.core.mail import send_mail
25from django.template.loader import render_to_string
26from openid.association import Association as OIDAssociation
27from openid.store import nonce as oidnonce
28from openid.store.interface import OpenIDStore
30from libravatar import libravatar_url
32from ivatar.settings import MAX_LENGTH_EMAIL, logger
33from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY
34from ivatar.settings import MAX_LENGTH_URL
35from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL
36from ivatar.utils import openid_variations
37from .gravatar import get_photo as get_gravatar_photo
40def file_format(image_type):
41 """
42 Helper method returning a short image type
43 """
44 if image_type in ("JPEG", "MPO"):
45 return "jpg"
46 elif image_type == "PNG":
47 return "png"
48 elif image_type == "GIF":
49 return "gif"
50 elif image_type == "WEBP":
51 return "webp"
52 return None
55def pil_format(image_type):
56 """
57 Helper method returning the 'encoder name' for PIL
58 """
59 if image_type in ("jpg", "jpeg", "mpo"):
60 return "JPEG"
61 elif image_type == "png":
62 return "PNG"
63 elif image_type == "gif":
64 return "GIF"
65 elif image_type == "webp":
66 return "WEBP"
68 logger.info("Unsupported file format: %s", image_type)
69 return None
72class UserPreference(models.Model):
73 """
74 Holds the user users preferences
75 """
77 THEMES = (
78 ("default", "Default theme"),
79 ("clime", "climes theme"),
80 ("green", "green theme"),
81 ("red", "red theme"),
82 )
84 theme = models.CharField(
85 max_length=10,
86 choices=THEMES,
87 default="default",
88 )
90 user = models.OneToOneField(
91 User,
92 on_delete=models.deletion.CASCADE,
93 primary_key=True,
94 )
96 def __str__(self):
97 return "Preference (%i) for %s" % (self.pk, self.user)
100class BaseAccountModel(models.Model):
101 """
102 Base, abstract model, holding fields we use in all cases
103 """
105 user = models.ForeignKey(
106 User,
107 on_delete=models.deletion.CASCADE,
108 )
109 ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True)
110 add_date = models.DateTimeField(default=timezone.now)
112 class Meta: # pylint: disable=too-few-public-methods
113 """
114 Class attributes
115 """
117 abstract = True
120class Photo(BaseAccountModel):
121 """
122 Model holding the photos and information about them
123 """
125 ip_address = models.GenericIPAddressField(unpack_ipv4=True)
126 data = models.BinaryField()
127 format = models.CharField(max_length=4)
128 access_count = models.BigIntegerField(default=0, editable=False)
130 class Meta: # pylint: disable=too-few-public-methods
131 """
132 Class attributes
133 """
135 verbose_name = _("photo")
136 verbose_name_plural = _("photos")
138 def import_image(self, service_name, email_address):
139 """
140 Allow to import image from other (eg. Gravatar) service
141 """
142 image_url = False
144 if service_name == "Gravatar":
145 gravatar = get_gravatar_photo(email_address)
146 if gravatar:
147 image_url = gravatar["image_url"]
149 if service_name == "Libravatar":
150 image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE)
152 if not image_url:
153 return False # pragma: no cover
154 try:
155 image = urlopen(image_url)
156 # No idea how to test this
157 # pragma: no cover
158 except HTTPError as exc:
159 print("%s import failed with an HTTP error: %s" % (service_name, exc.code))
160 return False
161 # No idea how to test this
162 # pragma: no cover
163 except URLError as exc:
164 print("%s import failed: %s" % (service_name, exc.reason))
165 return False
166 data = image.read()
168 try:
169 img = Image.open(BytesIO(data))
170 # How am I supposed to test this?
171 except ValueError: # pragma: no cover
172 return False # pragma: no cover
174 self.format = file_format(img.format)
175 if not self.format:
176 print("Unable to determine format: %s" % img) # pragma: no cover
177 return False # pragma: no cover
178 self.data = data
179 super().save()
180 return True
182 def save(
183 self, force_insert=False, force_update=False, using=None, update_fields=None
184 ):
185 """
186 Override save from parent, taking care about the image
187 """
188 # Use PIL to read the file format
189 try:
190 img = Image.open(BytesIO(self.data))
191 # Testing? Ideas anyone?
192 except Exception as exc: # pylint: disable=broad-except
193 # For debugging only
194 print("Exception caught in Photo.save(): %s" % exc)
195 return False
196 self.format = file_format(img.format)
197 if not self.format:
198 print("Format not recognized")
199 return False
200 return super().save(force_insert, force_update, using, update_fields)
202 def perform_crop(self, request, dimensions, email, openid):
203 """
204 Helper to crop the image
205 """
206 if request.user.photo_set.count() == 1:
207 # This is the first photo, assign to all confirmed addresses
208 for addr in request.user.confirmedemail_set.all():
209 addr.photo = self
210 addr.save()
212 for addr in request.user.confirmedopenid_set.all():
213 addr.photo = self
214 addr.save()
216 if email:
217 # Explicitly asked
218 email.photo = self
219 email.save()
221 if openid:
222 # Explicitly asked
223 openid.photo = self
224 openid.save()
226 # Do the real work cropping
227 img = Image.open(BytesIO(self.data))
229 # This should be anyway checked during save...
230 dimensions["a"], dimensions["b"] = img.size # pylint: disable=invalid-name
231 if dimensions["a"] > MAX_PIXELS or dimensions["b"] > MAX_PIXELS:
232 messages.error(
233 request,
234 _(
235 "Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s"
236 % {
237 "max_pixels": MAX_PIXELS,
238 }
239 ),
240 )
241 return HttpResponseRedirect(reverse_lazy("profile"))
243 if dimensions["w"] == 0 and dimensions["h"] == 0:
244 dimensions["w"], dimensions["h"] = dimensions["a"], dimensions["b"]
245 min_from_w_h = min(dimensions["w"], dimensions["h"])
246 dimensions["w"], dimensions["h"] = min_from_w_h, min_from_w_h
247 elif (
248 (dimensions["w"] < 0)
249 or ((dimensions["x"] + dimensions["w"]) > dimensions["a"])
250 or (dimensions["h"] < 0)
251 or ((dimensions["y"] + dimensions["h"]) > dimensions["b"])
252 ):
253 messages.error(request, _("Crop outside of original image bounding box"))
254 return HttpResponseRedirect(reverse_lazy("profile"))
256 cropped = img.crop(
257 (
258 dimensions["x"],
259 dimensions["y"],
260 dimensions["x"] + dimensions["w"],
261 dimensions["y"] + dimensions["h"],
262 )
263 )
264 # cropped.load()
265 # Resize the image only if it's larger than the specified max width.
266 cropped_w, cropped_h = cropped.size
267 max_w = AVATAR_MAX_SIZE
268 if cropped_w > max_w or cropped_h > max_w:
269 cropped = cropped.resize((max_w, max_w), Image.LANCZOS)
271 data = BytesIO()
272 cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY)
273 data.seek(0)
275 # Overwrite the existing image
276 self.data = data.read()
277 self.save()
279 return HttpResponseRedirect(reverse_lazy("profile"))
281 def __str__(self):
282 return "%s (%i) from %s" % (self.format, self.pk or 0, self.user)
285# pylint: disable=too-few-public-methods
286class ConfirmedEmailManager(models.Manager):
287 """
288 Manager for our confirmed email addresses model
289 """
291 @staticmethod
292 def create_confirmed_email(user, email_address, is_logged_in):
293 """
294 Helper method to create confirmed email address
295 """
296 confirmed = ConfirmedEmail()
297 confirmed.user = user
298 confirmed.ip_address = "0.0.0.0"
299 confirmed.email = email_address
300 confirmed.save()
302 external_photos = []
303 if is_logged_in:
304 gravatar = get_gravatar_photo(confirmed.email)
305 if gravatar:
306 external_photos.append(gravatar)
308 return (confirmed.pk, external_photos)
311class ConfirmedEmail(BaseAccountModel):
312 """
313 Model holding our confirmed email addresses, as well as the relation
314 to the assigned photo
315 """
317 email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL)
318 photo = models.ForeignKey(
319 Photo,
320 related_name="emails",
321 blank=True,
322 null=True,
323 on_delete=models.deletion.SET_NULL,
324 )
325 digest = models.CharField(max_length=32)
326 digest_sha256 = models.CharField(max_length=64)
327 objects = ConfirmedEmailManager()
328 access_count = models.BigIntegerField(default=0, editable=False)
330 class Meta: # pylint: disable=too-few-public-methods
331 """
332 Class attributes
333 """
335 verbose_name = _("confirmed email")
336 verbose_name_plural = _("confirmed emails")
338 def set_photo(self, photo):
339 """
340 Helper method to set photo
341 """
342 self.photo = photo
343 self.save()
345 def save(
346 self, force_insert=False, force_update=False, using=None, update_fields=None
347 ):
348 """
349 Override save from parent, add digest
350 """
351 self.digest = hashlib.md5(
352 self.email.strip().lower().encode("utf-8")
353 ).hexdigest()
354 self.digest_sha256 = hashlib.sha256(
355 self.email.strip().lower().encode("utf-8")
356 ).hexdigest()
357 return super().save(force_insert, force_update, using, update_fields)
359 def __str__(self):
360 return "%s (%i) from %s" % (self.email, self.pk, self.user)
363class UnconfirmedEmail(BaseAccountModel):
364 """
365 Model holding unconfirmed email addresses as well as the verification key
366 """
368 email = models.EmailField(max_length=MAX_LENGTH_EMAIL)
369 verification_key = models.CharField(max_length=64)
370 last_send_date = models.DateTimeField(null=True, blank=True)
371 last_status = models.TextField(max_length=2047, null=True, blank=True)
373 class Meta: # pylint: disable=too-few-public-methods
374 """
375 Class attributes
376 """
378 verbose_name = _("unconfirmed email")
379 verbose_name_plural = _("unconfirmed emails")
381 def save(
382 self, force_insert=False, force_update=False, using=None, update_fields=None
383 ):
384 if not self.verification_key:
385 hash_object = hashlib.new("sha256")
386 hash_object.update(
387 urandom(1024)
388 + self.user.username.encode("utf-8") # pylint: disable=no-member
389 ) # pylint: disable=no-member
390 self.verification_key = hash_object.hexdigest()
391 super(UnconfirmedEmail, self).save(
392 force_insert, force_update, using, update_fields
393 )
395 def send_confirmation_mail(self, url=SECURE_BASE_URL):
396 """
397 Send confirmation mail to that mail address
398 """
399 link = url + reverse(
400 "confirm_email", kwargs={"verification_key": self.verification_key}
401 )
402 email_subject = _("Confirm your email address on %s") % SITE_NAME
403 email_body = render_to_string(
404 "email_confirmation.txt",
405 {
406 "verification_link": link,
407 "site_name": SITE_NAME,
408 },
409 )
410 self.last_send_date = timezone.now()
411 self.last_status = "OK"
412 # if settings.DEBUG:
413 # print('DEBUG: %s' % link)
414 try:
415 send_mail(email_subject, email_body, DEFAULT_FROM_EMAIL, [self.email])
416 except Exception as e:
417 self.last_status = "%s" % e
418 self.save()
419 return True
421 def __str__(self):
422 return "%s (%i) from %s" % (self.email, self.pk, self.user)
425class UnconfirmedOpenId(BaseAccountModel):
426 """
427 Model holding unconfirmed OpenIDs
428 """
430 openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL)
432 class Meta: # pylint: disable=too-few-public-methods
433 """
434 Meta class
435 """
437 verbose_name = _("unconfirmed OpenID")
438 verbose_name_plural = "unconfirmed_OpenIDs"
440 def __str__(self):
441 return "%s (%i) from %s" % (self.openid, self.pk, self.user)
444class ConfirmedOpenId(BaseAccountModel):
445 """
446 Model holding confirmed OpenIDs, as well as the relation to
447 the assigned photo
448 """
450 openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL)
451 photo = models.ForeignKey(
452 Photo,
453 related_name="openids",
454 blank=True,
455 null=True,
456 on_delete=models.deletion.SET_NULL,
457 )
458 # http://<id>/ base version - http w/ trailing slash
459 digest = models.CharField(max_length=64)
460 # http://<id> - http w/o trailing slash
461 alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None)
462 # https://<id>/ - https w/ trailing slash
463 alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None)
464 # https://<id> - https w/o trailing slash
465 alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None)
467 access_count = models.BigIntegerField(default=0, editable=False)
469 class Meta: # pylint: disable=too-few-public-methods
470 """
471 Meta class
472 """
474 verbose_name = _("confirmed OpenID")
475 verbose_name_plural = _("confirmed OpenIDs")
477 def set_photo(self, photo):
478 """
479 Helper method to save photo
480 """
481 self.photo = photo
482 self.save()
484 def save(
485 self, force_insert=False, force_update=False, using=None, update_fields=None
486 ):
487 url = urlsplit(self.openid)
488 if url.username: # pragma: no cover
489 password = url.password or ""
490 netloc = url.username + ":" + password + "@" + url.hostname
491 else:
492 netloc = url.hostname
493 lowercase_url = urlunsplit(
494 (url.scheme.lower(), netloc, url.path, url.query, url.fragment)
495 )
496 self.openid = lowercase_url
498 self.digest = hashlib.sha256(
499 openid_variations(lowercase_url)[0].encode("utf-8")
500 ).hexdigest()
501 self.alt_digest1 = hashlib.sha256(
502 openid_variations(lowercase_url)[1].encode("utf-8")
503 ).hexdigest()
504 self.alt_digest2 = hashlib.sha256(
505 openid_variations(lowercase_url)[2].encode("utf-8")
506 ).hexdigest()
507 self.alt_digest3 = hashlib.sha256(
508 openid_variations(lowercase_url)[3].encode("utf-8")
509 ).hexdigest()
511 return super().save(force_insert, force_update, using, update_fields)
513 def __str__(self):
514 return "%s (%i) (%s)" % (self.openid, self.pk, self.user)
517class OpenIDNonce(models.Model):
518 """
519 Model holding OpenID Nonces
520 See also: https://github.com/edx/django-openid-auth/
521 """
523 server_url = models.CharField(max_length=255)
524 timestamp = models.IntegerField()
525 salt = models.CharField(max_length=128)
527 def __str__(self):
528 return "%s (%i) (timestamp: %i)" % (self.server_url, self.pk, self.timestamp)
531class OpenIDAssociation(models.Model):
532 """
533 Model holding the relation/association about OpenIDs
534 """
536 server_url = models.TextField(max_length=2047)
537 handle = models.CharField(max_length=255)
538 secret = models.TextField(max_length=255) # stored base64 encoded
539 issued = models.IntegerField()
540 lifetime = models.IntegerField()
541 assoc_type = models.TextField(max_length=64)
543 def __str__(self):
544 return "%s (%i) (%s, lifetime: %i)" % (
545 self.server_url,
546 self.pk,
547 self.assoc_type,
548 self.lifetime,
549 )
552class DjangoOpenIDStore(OpenIDStore):
553 """
554 The Python openid library needs an OpenIDStore subclass to persist data
555 related to OpenID authentications. This one uses our Django models.
556 """
558 @staticmethod
559 def storeAssociation(server_url, association): # pragma: no cover
560 """
561 Helper method to store associations
562 """
563 assoc = OpenIDAssociation(
564 server_url=server_url,
565 handle=association.handle,
566 secret=base64.encodebytes(association.secret),
567 issued=association.issued,
568 lifetime=association.issued,
569 assoc_type=association.assoc_type,
570 )
571 assoc.save()
573 def getAssociation(self, server_url, handle=None): # pragma: no cover
574 """
575 Helper method to get associations
576 """
577 assocs = []
578 if handle is not None:
579 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
580 server_url=server_url, handle=handle
581 )
582 else:
583 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member
584 server_url=server_url
585 )
586 if not assocs:
587 return None
588 associations = []
589 for assoc in assocs:
590 if isinstance(assoc.secret, str):
591 assoc.secret = assoc.secret.split("b'")[1].split("'")[0]
592 assoc.secret = bytes(assoc.secret, "utf-8")
593 association = OIDAssociation(
594 assoc.handle,
595 base64.decodebytes(assoc.secret),
596 assoc.issued,
597 assoc.lifetime,
598 assoc.assoc_type,
599 )
600 expires = 0
601 try:
602 # pylint: disable=no-member
603 expires = association.getExpiresIn()
604 except AttributeError:
605 expires = association.expiresIn
606 if expires == 0:
607 self.removeAssociation(server_url, assoc.handle)
608 else:
609 associations.append((association.issued, association))
610 if not associations:
611 return None
612 return associations[-1][1]
614 @staticmethod
615 def removeAssociation(server_url, handle): # pragma: no cover
616 """
617 Helper method to remove associations
618 """
619 assocs = list(
620 OpenIDAssociation.objects.filter( # pylint: disable=no-member
621 server_url=server_url, handle=handle
622 )
623 )
624 assocs_exist = len(assocs) > 0
625 for assoc in assocs:
626 assoc.delete()
627 return assocs_exist
629 @staticmethod
630 def useNonce(server_url, timestamp, salt): # pragma: no cover
631 """
632 Helper method to 'use' nonces
633 """
634 # Has nonce expired?
635 if abs(timestamp - time.time()) > oidnonce.SKEW:
636 return False
637 try:
638 nonce = OpenIDNonce.objects.get( # pylint: disable=no-member
639 server_url__exact=server_url,
640 timestamp__exact=timestamp,
641 salt__exact=salt,
642 )
643 except ObjectDoesNotExist:
644 nonce = OpenIDNonce.objects.create( # pylint: disable=no-member
645 server_url=server_url, timestamp=timestamp, salt=salt
646 )
647 return True
648 nonce.delete()
649 return False
651 @staticmethod
652 def cleanupNonces(): # pragma: no cover
653 """
654 Helper method to cleanup nonces
655 """
656 timestamp = int(time.time()) - oidnonce.SKEW
657 # pylint: disable=no-member
658 OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete()
660 @staticmethod
661 def cleanupAssociations(): # pragma: no cover
662 """
663 Helper method to cleanup associations
664 """
665 OpenIDAssociation.objects.extra( # pylint: disable=no-member
666 where=["issued + lifetimeint < (%s)" % time.time()]
667 ).delete()