Coverage for ivatar/ivataraccount/models.py: 88%

279 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-14 23:06 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Our models for ivatar.ivataraccount 

4""" 

5 

6import base64 

7import hashlib 

8import time 

9from io import BytesIO 

10from os import urandom 

11from urllib.error import HTTPError, URLError 

12from ivatar.utils import urlopen, Bluesky 

13from urllib.parse import urlsplit, urlunsplit, quote 

14 

15from PIL import Image 

16from django.contrib.auth.models import User 

17from django.contrib import messages 

18from django.db import models 

19from django.utils import timezone 

20from django.http import HttpResponseRedirect 

21from django.urls import reverse_lazy, reverse 

22from django.utils.translation import gettext_lazy as _ 

23from django.core.cache import cache 

24from django.core.exceptions import ObjectDoesNotExist 

25from django.core.mail import send_mail 

26from django.template.loader import render_to_string 

27from openid.association import Association as OIDAssociation 

28from openid.store import nonce as oidnonce 

29from openid.store.interface import OpenIDStore 

30 

31from libravatar import libravatar_url 

32 

33from ivatar.settings import MAX_LENGTH_EMAIL, logger 

34from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY 

35from ivatar.settings import MAX_LENGTH_URL 

36from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL 

37from ivatar.utils import openid_variations 

38from .gravatar import get_photo as get_gravatar_photo 

39 

40 

41def file_format(image_type): 

42 """ 

43 Helper method returning a short image type 

44 """ 

45 if image_type in ("JPEG", "MPO"): 

46 return "jpg" 

47 elif image_type == "PNG": 

48 return "png" 

49 elif image_type == "GIF": 

50 return "gif" 

51 elif image_type == "WEBP": 

52 return "webp" 

53 return None 

54 

55 

56def pil_format(image_type): 

57 """ 

58 Helper method returning the 'encoder name' for PIL 

59 """ 

60 if image_type in ("jpg", "jpeg", "mpo"): 

61 return "JPEG" 

62 elif image_type == "png": 

63 return "PNG" 

64 elif image_type == "gif": 

65 return "GIF" 

66 elif image_type == "webp": 

67 return "WEBP" 

68 

69 logger.info("Unsupported file format: %s", image_type) 

70 return None 

71 

72 

73class UserPreference(models.Model): 

74 """ 

75 Holds the user users preferences 

76 """ 

77 

78 THEMES = ( 

79 ("default", "Default theme"), 

80 ("clime", "climes theme"), 

81 ("green", "green theme"), 

82 ("red", "red theme"), 

83 ) 

84 

85 theme = models.CharField( 

86 max_length=10, 

87 choices=THEMES, 

88 default="default", 

89 ) 

90 

91 user = models.OneToOneField( 

92 User, 

93 on_delete=models.deletion.CASCADE, 

94 primary_key=True, 

95 ) 

96 

97 def __str__(self): 

98 return "Preference (%i) for %s" % (self.pk, self.user) 

99 

100 

101class BaseAccountModel(models.Model): 

102 """ 

103 Base, abstract model, holding fields we use in all cases 

104 """ 

105 

106 user = models.ForeignKey( 

107 User, 

108 on_delete=models.deletion.CASCADE, 

109 ) 

110 ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True) 

111 add_date = models.DateTimeField(default=timezone.now) 

112 

113 class Meta: # pylint: disable=too-few-public-methods 

114 """ 

115 Class attributes 

116 """ 

117 

118 abstract = True 

119 

120 

121class Photo(BaseAccountModel): 

122 """ 

123 Model holding the photos and information about them 

124 """ 

125 

126 ip_address = models.GenericIPAddressField(unpack_ipv4=True) 

127 data = models.BinaryField() 

128 format = models.CharField(max_length=4) 

129 access_count = models.BigIntegerField(default=0, editable=False) 

130 

131 class Meta: # pylint: disable=too-few-public-methods 

132 """ 

133 Class attributes 

134 """ 

135 

136 verbose_name = _("photo") 

137 verbose_name_plural = _("photos") 

138 

139 def import_image(self, service_name, email_address): 

140 """ 

141 Allow to import image from other (eg. Gravatar) service 

142 """ 

143 image_url = False 

144 

145 if service_name == "Gravatar": 

146 if gravatar := get_gravatar_photo(email_address): 

147 image_url = gravatar["image_url"] 

148 

149 if service_name == "Libravatar": 

150 image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE) 

151 

152 if not image_url: 

153 return False # pragma: no cover 

154 try: 

155 image = urlopen(image_url) 

156 except HTTPError as exc: 

157 print(f"{service_name} import failed with an HTTP error: {exc.code}") 

158 return False 

159 except URLError as exc: 

160 print(f"{service_name} import failed: {exc.reason}") 

161 return False 

162 data = image.read() 

163 

164 try: 

165 img = Image.open(BytesIO(data)) 

166 # How am I supposed to test this? 

167 except ValueError: # pragma: no cover 

168 return False # pragma: no cover 

169 

170 self.format = file_format(img.format) 

171 if not self.format: 

172 print(f"Unable to determine format: {img}") 

173 return False # pragma: no cover 

174 self.data = data 

175 super().save() 

176 return True 

177 

178 def save( 

179 self, force_insert=False, force_update=False, using=None, update_fields=None 

180 ): 

181 """ 

182 Override save from parent, taking care about the image 

183 """ 

184 # Use PIL to read the file format 

185 try: 

186 img = Image.open(BytesIO(self.data)) 

187 except Exception as exc: # pylint: disable=broad-except 

188 # For debugging only 

189 print(f"Exception caught in Photo.save(): {exc}") 

190 return False 

191 self.format = file_format(img.format) 

192 if not self.format: 

193 print("Format not recognized") 

194 return False 

195 return super().save(force_insert, force_update, using, update_fields) 

196 

197 def perform_crop(self, request, dimensions, email, openid): 

198 """ 

199 Helper to crop the image 

200 """ 

201 if request.user.photo_set.count() == 1: 

202 # This is the first photo, assign to all confirmed addresses 

203 for addr in request.user.confirmedemail_set.all(): 

204 addr.photo = self 

205 addr.save() 

206 

207 for addr in request.user.confirmedopenid_set.all(): 

208 addr.photo = self 

209 addr.save() 

210 

211 if email: 

212 # Explicitly asked 

213 email.photo = self 

214 email.save() 

215 

216 if openid: 

217 # Explicitly asked 

218 openid.photo = self 

219 openid.save() 

220 

221 # Do the real work cropping 

222 img = Image.open(BytesIO(self.data)) 

223 

224 # This should be anyway checked during save... 

225 dimensions["a"], dimensions["b"] = img.size # pylint: disable=invalid-name 

226 if dimensions["a"] > MAX_PIXELS or dimensions["b"] > MAX_PIXELS: 

227 messages.error( 

228 request, 

229 _( 

230 "Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s" 

231 % { 

232 "max_pixels": MAX_PIXELS, 

233 } 

234 ), 

235 ) 

236 return HttpResponseRedirect(reverse_lazy("profile")) 

237 

238 if dimensions["w"] == 0 and dimensions["h"] == 0: 

239 dimensions["w"], dimensions["h"] = dimensions["a"], dimensions["b"] 

240 min_from_w_h = min(dimensions["w"], dimensions["h"]) 

241 dimensions["w"], dimensions["h"] = min_from_w_h, min_from_w_h 

242 elif ( 

243 (dimensions["w"] < 0) 

244 or ((dimensions["x"] + dimensions["w"]) > dimensions["a"]) 

245 or (dimensions["h"] < 0) 

246 or ((dimensions["y"] + dimensions["h"]) > dimensions["b"]) 

247 ): 

248 messages.error(request, _("Crop outside of original image bounding box")) 

249 return HttpResponseRedirect(reverse_lazy("profile")) 

250 

251 cropped = img.crop( 

252 ( 

253 dimensions["x"], 

254 dimensions["y"], 

255 dimensions["x"] + dimensions["w"], 

256 dimensions["y"] + dimensions["h"], 

257 ) 

258 ) 

259 # cropped.load() 

260 # Resize the image only if it's larger than the specified max width. 

261 cropped_w, cropped_h = cropped.size 

262 max_w = AVATAR_MAX_SIZE 

263 if cropped_w > max_w or cropped_h > max_w: 

264 cropped = cropped.resize((max_w, max_w), Image.LANCZOS) 

265 

266 data = BytesIO() 

267 cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY) 

268 data.seek(0) 

269 

270 # Overwrite the existing image 

271 self.data = data.read() 

272 self.save() 

273 

274 return HttpResponseRedirect(reverse_lazy("profile")) 

275 

276 def __str__(self): 

277 return "%s (%i) from %s" % (self.format, self.pk or 0, self.user) 

278 

279 

280# pylint: disable=too-few-public-methods 

281class ConfirmedEmailManager(models.Manager): 

282 """ 

283 Manager for our confirmed email addresses model 

284 """ 

285 

286 @staticmethod 

287 def create_confirmed_email(user, email_address, is_logged_in): 

288 """ 

289 Helper method to create confirmed email address 

290 """ 

291 confirmed = ConfirmedEmail() 

292 confirmed.user = user 

293 confirmed.ip_address = "0.0.0.0" 

294 confirmed.email = email_address 

295 confirmed.save() 

296 

297 external_photos = [] 

298 if is_logged_in: 

299 if gravatar := get_gravatar_photo(confirmed.email): 

300 external_photos.append(gravatar) 

301 

302 return (confirmed.pk, external_photos) 

303 

304 

305class ConfirmedEmail(BaseAccountModel): 

306 """ 

307 Model holding our confirmed email addresses, as well as the relation 

308 to the assigned photo 

309 """ 

310 

311 email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL) 

312 photo = models.ForeignKey( 

313 Photo, 

314 related_name="emails", 

315 blank=True, 

316 null=True, 

317 on_delete=models.deletion.SET_NULL, 

318 ) 

319 # Alternative assignment - use Bluesky handle 

320 bluesky_handle = models.CharField(max_length=256, null=True, blank=True) 

321 digest = models.CharField(max_length=32) 

322 digest_sha256 = models.CharField(max_length=64) 

323 objects = ConfirmedEmailManager() 

324 access_count = models.BigIntegerField(default=0, editable=False) 

325 

326 class Meta: # pylint: disable=too-few-public-methods 

327 """ 

328 Class attributes 

329 """ 

330 

331 verbose_name = _("confirmed email") 

332 verbose_name_plural = _("confirmed emails") 

333 

334 def set_photo(self, photo): 

335 """ 

336 Helper method to set photo 

337 """ 

338 self.photo = photo 

339 self.save() 

340 

341 def set_bluesky_handle(self, handle): 

342 """ 

343 Helper method to set Bluesky handle 

344 """ 

345 

346 bs = Bluesky() 

347 handle = bs.normalize_handle(handle) 

348 avatar = bs.get_profile(handle) 

349 if not avatar: 

350 raise ValueError("Invalid Bluesky handle") 

351 self.bluesky_handle = handle 

352 self.save() 

353 

354 def save( 

355 self, force_insert=False, force_update=False, using=None, update_fields=None 

356 ): 

357 """ 

358 Override save from parent, add digest 

359 """ 

360 self.digest = hashlib.md5( 

361 self.email.strip().lower().encode("utf-8") 

362 ).hexdigest() 

363 self.digest_sha256 = hashlib.sha256( 

364 self.email.strip().lower().encode("utf-8") 

365 ).hexdigest() 

366 

367 # We need to manually expire the page caches 

368 # TODO: Verify this works as expected 

369 # First check if we already have an ID 

370 if self.pk: 

371 cache_url = reverse_lazy( 

372 "assign_photo_email", kwargs={"email_id": int(self.pk)} 

373 ) 

374 

375 cache_key = f"views.decorators.cache.cache_page.{quote(str(cache_url))}" 

376 if cache.has_key(cache_key): 

377 cache.delete(cache_key) 

378 logger.debug("Successfully cleaned up cached page: %s" % cache_key) 

379 

380 return super().save(force_insert, force_update, using, update_fields) 

381 

382 def __str__(self): 

383 return "%s (%i) from %s" % (self.email, self.pk, self.user) 

384 

385 

386class UnconfirmedEmail(BaseAccountModel): 

387 """ 

388 Model holding unconfirmed email addresses as well as the verification key 

389 """ 

390 

391 email = models.EmailField(max_length=MAX_LENGTH_EMAIL) 

392 verification_key = models.CharField(max_length=64) 

393 last_send_date = models.DateTimeField(null=True, blank=True) 

394 last_status = models.TextField(max_length=2047, null=True, blank=True) 

395 

396 class Meta: # pylint: disable=too-few-public-methods 

397 """ 

398 Class attributes 

399 """ 

400 

401 verbose_name = _("unconfirmed email") 

402 verbose_name_plural = _("unconfirmed emails") 

403 

404 def save( 

405 self, force_insert=False, force_update=False, using=None, update_fields=None 

406 ): 

407 if not self.verification_key: 

408 hash_object = hashlib.new("sha256") 

409 hash_object.update( 

410 urandom(1024) 

411 + self.user.username.encode("utf-8") # pylint: disable=no-member 

412 ) # pylint: disable=no-member 

413 self.verification_key = hash_object.hexdigest() 

414 super(UnconfirmedEmail, self).save( 

415 force_insert, force_update, using, update_fields 

416 ) 

417 

418 def send_confirmation_mail(self, url=SECURE_BASE_URL): 

419 """ 

420 Send confirmation mail to that mail address 

421 """ 

422 link = url + reverse( 

423 "confirm_email", kwargs={"verification_key": self.verification_key} 

424 ) 

425 email_subject = _("Confirm your email address on %s") % SITE_NAME 

426 email_body = render_to_string( 

427 "email_confirmation.txt", 

428 { 

429 "verification_link": link, 

430 "site_name": SITE_NAME, 

431 }, 

432 ) 

433 self.last_send_date = timezone.now() 

434 self.last_status = "OK" 

435 # if settings.DEBUG: 

436 # print('DEBUG: %s' % link) 

437 try: 

438 send_mail(email_subject, email_body, DEFAULT_FROM_EMAIL, [self.email]) 

439 except Exception as e: 

440 self.last_status = f"{e}" 

441 self.save() 

442 return True 

443 

444 def __str__(self): 

445 return "%s (%i) from %s" % (self.email, self.pk, self.user) 

446 

447 

448class UnconfirmedOpenId(BaseAccountModel): 

449 """ 

450 Model holding unconfirmed OpenIDs 

451 """ 

452 

453 openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL) 

454 

455 class Meta: # pylint: disable=too-few-public-methods 

456 """ 

457 Meta class 

458 """ 

459 

460 verbose_name = _("unconfirmed OpenID") 

461 verbose_name_plural = "unconfirmed_OpenIDs" 

462 

463 def __str__(self): 

464 return "%s (%i) from %s" % (self.openid, self.pk, self.user) 

465 

466 

467class ConfirmedOpenId(BaseAccountModel): 

468 """ 

469 Model holding confirmed OpenIDs, as well as the relation to 

470 the assigned photo 

471 """ 

472 

473 openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL) 

474 photo = models.ForeignKey( 

475 Photo, 

476 related_name="openids", 

477 blank=True, 

478 null=True, 

479 on_delete=models.deletion.SET_NULL, 

480 ) 

481 # http://<id>/ base version - http w/ trailing slash 

482 digest = models.CharField(max_length=64) 

483 # http://<id> - http w/o trailing slash 

484 alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None) 

485 # https://<id>/ - https w/ trailing slash 

486 alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None) 

487 # https://<id> - https w/o trailing slash 

488 alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None) 

489 # Alternative assignment - use Bluesky handle 

490 bluesky_handle = models.CharField(max_length=256, null=True, blank=True) 

491 

492 access_count = models.BigIntegerField(default=0, editable=False) 

493 

494 class Meta: # pylint: disable=too-few-public-methods 

495 """ 

496 Meta class 

497 """ 

498 

499 verbose_name = _("confirmed OpenID") 

500 verbose_name_plural = _("confirmed OpenIDs") 

501 

502 def set_photo(self, photo): 

503 """ 

504 Helper method to save photo 

505 """ 

506 self.photo = photo 

507 self.save() 

508 

509 def set_bluesky_handle(self, handle): 

510 """ 

511 Helper method to set Bluesky handle 

512 """ 

513 bs = Bluesky() 

514 handle = bs.normalize_handle(handle) 

515 avatar = bs.get_profile(handle) 

516 if not avatar: 

517 raise ValueError("Invalid Bluesky handle") 

518 self.bluesky_handle = handle 

519 self.save() 

520 

521 def save( 

522 self, force_insert=False, force_update=False, using=None, update_fields=None 

523 ): 

524 url = urlsplit(self.openid) 

525 if url.username: # pragma: no cover 

526 password = url.password or "" 

527 netloc = f"{url.username}:{password}@{url.hostname}" 

528 else: 

529 netloc = url.hostname 

530 lowercase_url = urlunsplit( 

531 (url.scheme.lower(), netloc, url.path, url.query, url.fragment) 

532 ) 

533 self.openid = lowercase_url 

534 

535 self.digest = hashlib.sha256( 

536 openid_variations(lowercase_url)[0].encode("utf-8") 

537 ).hexdigest() 

538 self.alt_digest1 = hashlib.sha256( 

539 openid_variations(lowercase_url)[1].encode("utf-8") 

540 ).hexdigest() 

541 self.alt_digest2 = hashlib.sha256( 

542 openid_variations(lowercase_url)[2].encode("utf-8") 

543 ).hexdigest() 

544 self.alt_digest3 = hashlib.sha256( 

545 openid_variations(lowercase_url)[3].encode("utf-8") 

546 ).hexdigest() 

547 

548 return super().save(force_insert, force_update, using, update_fields) 

549 

550 def __str__(self): 

551 return "%s (%i) (%s)" % (self.openid, self.pk, self.user) 

552 

553 

554class OpenIDNonce(models.Model): 

555 """ 

556 Model holding OpenID Nonces 

557 See also: https://github.com/edx/django-openid-auth/ 

558 """ 

559 

560 server_url = models.CharField(max_length=255) 

561 timestamp = models.IntegerField() 

562 salt = models.CharField(max_length=128) 

563 

564 def __str__(self): 

565 return "%s (%i) (timestamp: %i)" % (self.server_url, self.pk, self.timestamp) 

566 

567 

568class OpenIDAssociation(models.Model): 

569 """ 

570 Model holding the relation/association about OpenIDs 

571 """ 

572 

573 server_url = models.TextField(max_length=2047) 

574 handle = models.CharField(max_length=255) 

575 secret = models.TextField(max_length=255) # stored base64 encoded 

576 issued = models.IntegerField() 

577 lifetime = models.IntegerField() 

578 assoc_type = models.TextField(max_length=64) 

579 

580 def __str__(self): 

581 return "%s (%i) (%s, lifetime: %i)" % ( 

582 self.server_url, 

583 self.pk, 

584 self.assoc_type, 

585 self.lifetime, 

586 ) 

587 

588 

589class DjangoOpenIDStore(OpenIDStore): 

590 """ 

591 The Python openid library needs an OpenIDStore subclass to persist data 

592 related to OpenID authentications. This one uses our Django models. 

593 """ 

594 

595 @staticmethod 

596 def storeAssociation(server_url, association): # pragma: no cover 

597 """ 

598 Helper method to store associations 

599 """ 

600 assoc = OpenIDAssociation( 

601 server_url=server_url, 

602 handle=association.handle, 

603 secret=base64.encodebytes(association.secret), 

604 issued=association.issued, 

605 lifetime=association.issued, 

606 assoc_type=association.assoc_type, 

607 ) 

608 assoc.save() 

609 

610 def getAssociation(self, server_url, handle=None): # pragma: no cover 

611 """ 

612 Helper method to get associations 

613 """ 

614 assocs = [] 

615 if handle is not None: 

616 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member 

617 server_url=server_url, handle=handle 

618 ) 

619 else: 

620 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member 

621 server_url=server_url 

622 ) 

623 if not assocs: 

624 return None 

625 associations = [] 

626 for assoc in assocs: 

627 if isinstance(assoc.secret, str): 

628 assoc.secret = assoc.secret.split("b'")[1].split("'")[0] 

629 assoc.secret = bytes(assoc.secret, "utf-8") 

630 association = OIDAssociation( 

631 assoc.handle, 

632 base64.decodebytes(assoc.secret), 

633 assoc.issued, 

634 assoc.lifetime, 

635 assoc.assoc_type, 

636 ) 

637 expires = 0 

638 try: 

639 # pylint: disable=no-member 

640 expires = association.getExpiresIn() 

641 except AttributeError: 

642 expires = association.expiresIn 

643 if expires == 0: 

644 self.removeAssociation(server_url, assoc.handle) 

645 else: 

646 associations.append((association.issued, association)) 

647 return associations[-1][1] if associations else None 

648 

649 @staticmethod 

650 def removeAssociation(server_url, handle): # pragma: no cover 

651 """ 

652 Helper method to remove associations 

653 """ 

654 assocs = list( 

655 OpenIDAssociation.objects.filter( # pylint: disable=no-member 

656 server_url=server_url, handle=handle 

657 ) 

658 ) 

659 assocs_exist = len(assocs) > 0 

660 for assoc in assocs: 

661 assoc.delete() 

662 return assocs_exist 

663 

664 @staticmethod 

665 def useNonce(server_url, timestamp, salt): # pragma: no cover 

666 """ 

667 Helper method to 'use' nonces 

668 """ 

669 # Has nonce expired? 

670 if abs(timestamp - time.time()) > oidnonce.SKEW: 

671 return False 

672 try: 

673 nonce = OpenIDNonce.objects.get( # pylint: disable=no-member 

674 server_url__exact=server_url, 

675 timestamp__exact=timestamp, 

676 salt__exact=salt, 

677 ) 

678 except ObjectDoesNotExist: 

679 nonce = OpenIDNonce.objects.create( # pylint: disable=no-member 

680 server_url=server_url, timestamp=timestamp, salt=salt 

681 ) 

682 return True 

683 nonce.delete() 

684 return False 

685 

686 @staticmethod 

687 def cleanupNonces(): # pragma: no cover 

688 """ 

689 Helper method to cleanup nonces 

690 """ 

691 timestamp = int(time.time()) - oidnonce.SKEW 

692 # pylint: disable=no-member 

693 OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete() 

694 

695 @staticmethod 

696 def cleanupAssociations(): # pragma: no cover 

697 """ 

698 Helper method to cleanup associations 

699 """ 

700 OpenIDAssociation.objects.extra( 

701 where=[f"issued + lifetimeint < ({time.time()})"] 

702 ).delete()