Coverage for ivatar/ivataraccount/models.py: 88%

280 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-04 23:12 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Our models for ivatar.ivataraccount 

4""" 

5 

6import base64 

7import hashlib 

8import time 

9from io import BytesIO 

10from os import urandom 

11from urllib.error import HTTPError, URLError 

12from ivatar.utils import urlopen, Bluesky 

13from urllib.parse import urlsplit, urlunsplit, quote 

14 

15from PIL import Image 

16from django.contrib.auth.models import User 

17from django.contrib import messages 

18from django.db import models 

19from django.utils import timezone 

20from django.http import HttpResponseRedirect 

21from django.urls import reverse_lazy, reverse 

22from django.utils.translation import gettext_lazy as _ 

23from django.core.cache import cache 

24from django.core.exceptions import ObjectDoesNotExist 

25from django.core.mail import send_mail 

26from django.template.loader import render_to_string 

27from openid.association import Association as OIDAssociation 

28from openid.store import nonce as oidnonce 

29from openid.store.interface import OpenIDStore 

30 

31from libravatar import libravatar_url 

32 

33from ivatar.settings import MAX_LENGTH_EMAIL, logger 

34from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY 

35from ivatar.settings import MAX_LENGTH_URL 

36from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL 

37from ivatar.utils import openid_variations 

38from .gravatar import get_photo as get_gravatar_photo 

39 

40 

41def file_format(image_type): 

42 """ 

43 Helper method returning a short image type 

44 """ 

45 if image_type in ("JPEG", "MPO"): 

46 return "jpg" 

47 elif image_type == "PNG": 

48 return "png" 

49 elif image_type == "GIF": 

50 return "gif" 

51 elif image_type == "WEBP": 

52 return "webp" 

53 return None 

54 

55 

56def pil_format(image_type): 

57 """ 

58 Helper method returning the 'encoder name' for PIL 

59 """ 

60 if image_type in ("jpg", "jpeg", "mpo"): 

61 return "JPEG" 

62 elif image_type == "png": 

63 return "PNG" 

64 elif image_type == "gif": 

65 return "GIF" 

66 elif image_type == "webp": 

67 return "WEBP" 

68 

69 logger.info("Unsupported file format: %s", image_type) 

70 return None 

71 

72 

73class UserPreference(models.Model): 

74 """ 

75 Holds the user users preferences 

76 """ 

77 

78 THEMES = ( 

79 ("default", "Default theme"), 

80 ("clime", "climes theme"), 

81 ("green", "green theme"), 

82 ("red", "red theme"), 

83 ) 

84 

85 theme = models.CharField( 

86 max_length=10, 

87 choices=THEMES, 

88 default="default", 

89 ) 

90 

91 user = models.OneToOneField( 

92 User, 

93 on_delete=models.deletion.CASCADE, 

94 primary_key=True, 

95 ) 

96 

97 def __str__(self): 

98 return "Preference (%i) for %s" % (self.pk, self.user) 

99 

100 

101class BaseAccountModel(models.Model): 

102 """ 

103 Base, abstract model, holding fields we use in all cases 

104 """ 

105 

106 user = models.ForeignKey( 

107 User, 

108 on_delete=models.deletion.CASCADE, 

109 ) 

110 ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True) 

111 add_date = models.DateTimeField(default=timezone.now) 

112 

113 class Meta: # pylint: disable=too-few-public-methods 

114 """ 

115 Class attributes 

116 """ 

117 

118 abstract = True 

119 

120 

121class Photo(BaseAccountModel): 

122 """ 

123 Model holding the photos and information about them 

124 """ 

125 

126 ip_address = models.GenericIPAddressField(unpack_ipv4=True) 

127 data = models.BinaryField() 

128 format = models.CharField(max_length=4) 

129 access_count = models.BigIntegerField(default=0, editable=False) 

130 

131 class Meta: # pylint: disable=too-few-public-methods 

132 """ 

133 Class attributes 

134 """ 

135 

136 verbose_name = _("photo") 

137 verbose_name_plural = _("photos") 

138 

139 def import_image(self, service_name, email_address): 

140 """ 

141 Allow to import image from other (eg. Gravatar) service 

142 """ 

143 image_url = False 

144 

145 if service_name == "Gravatar": 

146 if gravatar := get_gravatar_photo(email_address): 

147 image_url = gravatar["image_url"] 

148 

149 if service_name == "Libravatar": 

150 image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE) 

151 

152 if not image_url: 

153 return False # pragma: no cover 

154 try: 

155 image = urlopen(image_url) 

156 except HTTPError as exc: 

157 print(f"{service_name} import failed with an HTTP error: {exc.code}") 

158 return False 

159 except URLError as exc: 

160 print(f"{service_name} import failed: {exc.reason}") 

161 return False 

162 data = image.read() 

163 

164 try: 

165 img = Image.open(BytesIO(data)) 

166 # How am I supposed to test this? 

167 except ValueError: # pragma: no cover 

168 return False # pragma: no cover 

169 

170 self.format = file_format(img.format) 

171 if not self.format: 

172 print(f"Unable to determine format: {img}") 

173 return False # pragma: no cover 

174 self.data = data 

175 super().save() 

176 return True 

177 

178 def save( 

179 self, force_insert=False, force_update=False, using=None, update_fields=None 

180 ): 

181 """ 

182 Override save from parent, taking care about the image 

183 """ 

184 # Use PIL to read the file format 

185 try: 

186 img = Image.open(BytesIO(self.data)) 

187 except Exception as exc: # pylint: disable=broad-except 

188 # For debugging only 

189 print(f"Exception caught in Photo.save(): {exc}") 

190 return False 

191 self.format = file_format(img.format) 

192 if not self.format: 

193 print("Format not recognized") 

194 return False 

195 return super().save(force_insert, force_update, using, update_fields) 

196 

197 def perform_crop(self, request, dimensions, email, openid): 

198 """ 

199 Helper to crop the image 

200 """ 

201 if request.user.photo_set.count() == 1: 

202 # This is the first photo, assign to all confirmed addresses 

203 for addr in request.user.confirmedemail_set.all(): 

204 addr.photo = self 

205 addr.save() 

206 

207 for addr in request.user.confirmedopenid_set.all(): 

208 addr.photo = self 

209 addr.save() 

210 

211 if email: 

212 # Explicitly asked 

213 email.photo = self 

214 email.save() 

215 

216 if openid: 

217 # Explicitly asked 

218 openid.photo = self 

219 openid.save() 

220 

221 # Do the real work cropping 

222 img = Image.open(BytesIO(self.data)) 

223 

224 # This should be anyway checked during save... 

225 dimensions["a"], dimensions["b"] = img.size # pylint: disable=invalid-name 

226 if dimensions["a"] > MAX_PIXELS or dimensions["b"] > MAX_PIXELS: 

227 messages.error( 

228 request, 

229 _( 

230 "Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s" 

231 % { 

232 "max_pixels": MAX_PIXELS, 

233 } 

234 ), 

235 ) 

236 return HttpResponseRedirect(reverse_lazy("profile")) 

237 

238 if dimensions["w"] == 0 and dimensions["h"] == 0: 

239 dimensions["w"], dimensions["h"] = dimensions["a"], dimensions["b"] 

240 min_from_w_h = min(dimensions["w"], dimensions["h"]) 

241 dimensions["w"], dimensions["h"] = min_from_w_h, min_from_w_h 

242 elif ( 

243 (dimensions["w"] < 0) 

244 or ((dimensions["x"] + dimensions["w"]) > dimensions["a"]) 

245 or (dimensions["h"] < 0) 

246 or ((dimensions["y"] + dimensions["h"]) > dimensions["b"]) 

247 ): 

248 messages.error(request, _("Crop outside of original image bounding box")) 

249 return HttpResponseRedirect(reverse_lazy("profile")) 

250 

251 cropped = img.crop( 

252 ( 

253 dimensions["x"], 

254 dimensions["y"], 

255 dimensions["x"] + dimensions["w"], 

256 dimensions["y"] + dimensions["h"], 

257 ) 

258 ) 

259 # cropped.load() 

260 # Resize the image only if it's larger than the specified max width. 

261 cropped_w, cropped_h = cropped.size 

262 max_w = AVATAR_MAX_SIZE 

263 if cropped_w > max_w or cropped_h > max_w: 

264 cropped = cropped.resize((max_w, max_w), Image.LANCZOS) 

265 

266 data = BytesIO() 

267 cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY) 

268 data.seek(0) 

269 

270 # Overwrite the existing image 

271 self.data = data.read() 

272 self.save() 

273 

274 return HttpResponseRedirect(reverse_lazy("profile")) 

275 

276 def __str__(self): 

277 return "%s (%i) from %s" % (self.format, self.pk or 0, self.user) 

278 

279 

280# pylint: disable=too-few-public-methods 

281class ConfirmedEmailManager(models.Manager): 

282 """ 

283 Manager for our confirmed email addresses model 

284 """ 

285 

286 @staticmethod 

287 def create_confirmed_email(user, email_address, is_logged_in): 

288 """ 

289 Helper method to create confirmed email address 

290 """ 

291 confirmed = ConfirmedEmail() 

292 confirmed.user = user 

293 confirmed.ip_address = "0.0.0.0" 

294 confirmed.email = email_address 

295 confirmed.save() 

296 

297 external_photos = [] 

298 if is_logged_in: 

299 if gravatar := get_gravatar_photo(confirmed.email): 

300 external_photos.append(gravatar) 

301 

302 return (confirmed.pk, external_photos) 

303 

304 

305class ConfirmedEmail(BaseAccountModel): 

306 """ 

307 Model holding our confirmed email addresses, as well as the relation 

308 to the assigned photo 

309 """ 

310 

311 email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL) 

312 photo = models.ForeignKey( 

313 Photo, 

314 related_name="emails", 

315 blank=True, 

316 null=True, 

317 on_delete=models.deletion.SET_NULL, 

318 ) 

319 # Alternative assignment - use Bluesky handle 

320 bluesky_handle = models.CharField(max_length=256, null=True, blank=True) 

321 digest = models.CharField(max_length=32) 

322 digest_sha256 = models.CharField(max_length=64) 

323 objects = ConfirmedEmailManager() 

324 access_count = models.BigIntegerField(default=0, editable=False) 

325 

326 class Meta: # pylint: disable=too-few-public-methods 

327 """ 

328 Class attributes 

329 """ 

330 

331 verbose_name = _("confirmed email") 

332 verbose_name_plural = _("confirmed emails") 

333 

334 def set_photo(self, photo): 

335 """ 

336 Helper method to set photo 

337 """ 

338 self.photo = photo 

339 self.save() 

340 

341 def set_bluesky_handle(self, handle): 

342 """ 

343 Helper method to set Bluesky handle 

344 """ 

345 

346 bs = Bluesky() 

347 handle = bs.normalize_handle(handle) 

348 avatar = bs.get_profile(handle) 

349 if not avatar: 

350 raise ValueError("Invalid Bluesky handle") 

351 self.bluesky_handle = handle 

352 self.save() 

353 

354 def save( 

355 self, force_insert=False, force_update=False, using=None, update_fields=None 

356 ): 

357 """ 

358 Override save from parent, add digest 

359 """ 

360 self.digest = hashlib.md5( 

361 self.email.strip().lower().encode("utf-8") 

362 ).hexdigest() 

363 self.digest_sha256 = hashlib.sha256( 

364 self.email.strip().lower().encode("utf-8") 

365 ).hexdigest() 

366 

367 # We need to manually expire the page caches 

368 # TODO: Verify this works as expected 

369 # First check if we already have an ID 

370 if self.pk: 

371 cache_url = reverse_lazy( 

372 "assign_photo_email", kwargs={"email_id": int(self.pk)} 

373 ) 

374 

375 cache_key = f"views.decorators.cache.cache_page.{quote(str(cache_url))}" 

376 if cache.has_key(cache_key): 

377 cache.delete(cache_key) 

378 logger.error("Successfully cleaned up cached page: %s" % cache_key) 

379 else: 

380 logger.error("Page %s wasn't cached.", cache_key) 

381 

382 return super().save(force_insert, force_update, using, update_fields) 

383 

384 def __str__(self): 

385 return "%s (%i) from %s" % (self.email, self.pk, self.user) 

386 

387 

388class UnconfirmedEmail(BaseAccountModel): 

389 """ 

390 Model holding unconfirmed email addresses as well as the verification key 

391 """ 

392 

393 email = models.EmailField(max_length=MAX_LENGTH_EMAIL) 

394 verification_key = models.CharField(max_length=64) 

395 last_send_date = models.DateTimeField(null=True, blank=True) 

396 last_status = models.TextField(max_length=2047, null=True, blank=True) 

397 

398 class Meta: # pylint: disable=too-few-public-methods 

399 """ 

400 Class attributes 

401 """ 

402 

403 verbose_name = _("unconfirmed email") 

404 verbose_name_plural = _("unconfirmed emails") 

405 

406 def save( 

407 self, force_insert=False, force_update=False, using=None, update_fields=None 

408 ): 

409 if not self.verification_key: 

410 hash_object = hashlib.new("sha256") 

411 hash_object.update( 

412 urandom(1024) 

413 + self.user.username.encode("utf-8") # pylint: disable=no-member 

414 ) # pylint: disable=no-member 

415 self.verification_key = hash_object.hexdigest() 

416 super(UnconfirmedEmail, self).save( 

417 force_insert, force_update, using, update_fields 

418 ) 

419 

420 def send_confirmation_mail(self, url=SECURE_BASE_URL): 

421 """ 

422 Send confirmation mail to that mail address 

423 """ 

424 link = url + reverse( 

425 "confirm_email", kwargs={"verification_key": self.verification_key} 

426 ) 

427 email_subject = _("Confirm your email address on %s") % SITE_NAME 

428 email_body = render_to_string( 

429 "email_confirmation.txt", 

430 { 

431 "verification_link": link, 

432 "site_name": SITE_NAME, 

433 }, 

434 ) 

435 self.last_send_date = timezone.now() 

436 self.last_status = "OK" 

437 # if settings.DEBUG: 

438 # print('DEBUG: %s' % link) 

439 try: 

440 send_mail(email_subject, email_body, DEFAULT_FROM_EMAIL, [self.email]) 

441 except Exception as e: 

442 self.last_status = f"{e}" 

443 self.save() 

444 return True 

445 

446 def __str__(self): 

447 return "%s (%i) from %s" % (self.email, self.pk, self.user) 

448 

449 

450class UnconfirmedOpenId(BaseAccountModel): 

451 """ 

452 Model holding unconfirmed OpenIDs 

453 """ 

454 

455 openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL) 

456 

457 class Meta: # pylint: disable=too-few-public-methods 

458 """ 

459 Meta class 

460 """ 

461 

462 verbose_name = _("unconfirmed OpenID") 

463 verbose_name_plural = "unconfirmed_OpenIDs" 

464 

465 def __str__(self): 

466 return "%s (%i) from %s" % (self.openid, self.pk, self.user) 

467 

468 

469class ConfirmedOpenId(BaseAccountModel): 

470 """ 

471 Model holding confirmed OpenIDs, as well as the relation to 

472 the assigned photo 

473 """ 

474 

475 openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL) 

476 photo = models.ForeignKey( 

477 Photo, 

478 related_name="openids", 

479 blank=True, 

480 null=True, 

481 on_delete=models.deletion.SET_NULL, 

482 ) 

483 # http://<id>/ base version - http w/ trailing slash 

484 digest = models.CharField(max_length=64) 

485 # http://<id> - http w/o trailing slash 

486 alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None) 

487 # https://<id>/ - https w/ trailing slash 

488 alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None) 

489 # https://<id> - https w/o trailing slash 

490 alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None) 

491 # Alternative assignment - use Bluesky handle 

492 bluesky_handle = models.CharField(max_length=256, null=True, blank=True) 

493 

494 access_count = models.BigIntegerField(default=0, editable=False) 

495 

496 class Meta: # pylint: disable=too-few-public-methods 

497 """ 

498 Meta class 

499 """ 

500 

501 verbose_name = _("confirmed OpenID") 

502 verbose_name_plural = _("confirmed OpenIDs") 

503 

504 def set_photo(self, photo): 

505 """ 

506 Helper method to save photo 

507 """ 

508 self.photo = photo 

509 self.save() 

510 

511 def set_bluesky_handle(self, handle): 

512 """ 

513 Helper method to set Bluesky handle 

514 """ 

515 bs = Bluesky() 

516 handle = bs.normalize_handle(handle) 

517 avatar = bs.get_profile(handle) 

518 if not avatar: 

519 raise ValueError("Invalid Bluesky handle") 

520 self.bluesky_handle = handle 

521 self.save() 

522 

523 def save( 

524 self, force_insert=False, force_update=False, using=None, update_fields=None 

525 ): 

526 url = urlsplit(self.openid) 

527 if url.username: # pragma: no cover 

528 password = url.password or "" 

529 netloc = f"{url.username}:{password}@{url.hostname}" 

530 else: 

531 netloc = url.hostname 

532 lowercase_url = urlunsplit( 

533 (url.scheme.lower(), netloc, url.path, url.query, url.fragment) 

534 ) 

535 self.openid = lowercase_url 

536 

537 self.digest = hashlib.sha256( 

538 openid_variations(lowercase_url)[0].encode("utf-8") 

539 ).hexdigest() 

540 self.alt_digest1 = hashlib.sha256( 

541 openid_variations(lowercase_url)[1].encode("utf-8") 

542 ).hexdigest() 

543 self.alt_digest2 = hashlib.sha256( 

544 openid_variations(lowercase_url)[2].encode("utf-8") 

545 ).hexdigest() 

546 self.alt_digest3 = hashlib.sha256( 

547 openid_variations(lowercase_url)[3].encode("utf-8") 

548 ).hexdigest() 

549 

550 return super().save(force_insert, force_update, using, update_fields) 

551 

552 def __str__(self): 

553 return "%s (%i) (%s)" % (self.openid, self.pk, self.user) 

554 

555 

556class OpenIDNonce(models.Model): 

557 """ 

558 Model holding OpenID Nonces 

559 See also: https://github.com/edx/django-openid-auth/ 

560 """ 

561 

562 server_url = models.CharField(max_length=255) 

563 timestamp = models.IntegerField() 

564 salt = models.CharField(max_length=128) 

565 

566 def __str__(self): 

567 return "%s (%i) (timestamp: %i)" % (self.server_url, self.pk, self.timestamp) 

568 

569 

570class OpenIDAssociation(models.Model): 

571 """ 

572 Model holding the relation/association about OpenIDs 

573 """ 

574 

575 server_url = models.TextField(max_length=2047) 

576 handle = models.CharField(max_length=255) 

577 secret = models.TextField(max_length=255) # stored base64 encoded 

578 issued = models.IntegerField() 

579 lifetime = models.IntegerField() 

580 assoc_type = models.TextField(max_length=64) 

581 

582 def __str__(self): 

583 return "%s (%i) (%s, lifetime: %i)" % ( 

584 self.server_url, 

585 self.pk, 

586 self.assoc_type, 

587 self.lifetime, 

588 ) 

589 

590 

591class DjangoOpenIDStore(OpenIDStore): 

592 """ 

593 The Python openid library needs an OpenIDStore subclass to persist data 

594 related to OpenID authentications. This one uses our Django models. 

595 """ 

596 

597 @staticmethod 

598 def storeAssociation(server_url, association): # pragma: no cover 

599 """ 

600 Helper method to store associations 

601 """ 

602 assoc = OpenIDAssociation( 

603 server_url=server_url, 

604 handle=association.handle, 

605 secret=base64.encodebytes(association.secret), 

606 issued=association.issued, 

607 lifetime=association.issued, 

608 assoc_type=association.assoc_type, 

609 ) 

610 assoc.save() 

611 

612 def getAssociation(self, server_url, handle=None): # pragma: no cover 

613 """ 

614 Helper method to get associations 

615 """ 

616 assocs = [] 

617 if handle is not None: 

618 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member 

619 server_url=server_url, handle=handle 

620 ) 

621 else: 

622 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member 

623 server_url=server_url 

624 ) 

625 if not assocs: 

626 return None 

627 associations = [] 

628 for assoc in assocs: 

629 if isinstance(assoc.secret, str): 

630 assoc.secret = assoc.secret.split("b'")[1].split("'")[0] 

631 assoc.secret = bytes(assoc.secret, "utf-8") 

632 association = OIDAssociation( 

633 assoc.handle, 

634 base64.decodebytes(assoc.secret), 

635 assoc.issued, 

636 assoc.lifetime, 

637 assoc.assoc_type, 

638 ) 

639 expires = 0 

640 try: 

641 # pylint: disable=no-member 

642 expires = association.getExpiresIn() 

643 except AttributeError: 

644 expires = association.expiresIn 

645 if expires == 0: 

646 self.removeAssociation(server_url, assoc.handle) 

647 else: 

648 associations.append((association.issued, association)) 

649 return associations[-1][1] if associations else None 

650 

651 @staticmethod 

652 def removeAssociation(server_url, handle): # pragma: no cover 

653 """ 

654 Helper method to remove associations 

655 """ 

656 assocs = list( 

657 OpenIDAssociation.objects.filter( # pylint: disable=no-member 

658 server_url=server_url, handle=handle 

659 ) 

660 ) 

661 assocs_exist = len(assocs) > 0 

662 for assoc in assocs: 

663 assoc.delete() 

664 return assocs_exist 

665 

666 @staticmethod 

667 def useNonce(server_url, timestamp, salt): # pragma: no cover 

668 """ 

669 Helper method to 'use' nonces 

670 """ 

671 # Has nonce expired? 

672 if abs(timestamp - time.time()) > oidnonce.SKEW: 

673 return False 

674 try: 

675 nonce = OpenIDNonce.objects.get( # pylint: disable=no-member 

676 server_url__exact=server_url, 

677 timestamp__exact=timestamp, 

678 salt__exact=salt, 

679 ) 

680 except ObjectDoesNotExist: 

681 nonce = OpenIDNonce.objects.create( # pylint: disable=no-member 

682 server_url=server_url, timestamp=timestamp, salt=salt 

683 ) 

684 return True 

685 nonce.delete() 

686 return False 

687 

688 @staticmethod 

689 def cleanupNonces(): # pragma: no cover 

690 """ 

691 Helper method to cleanup nonces 

692 """ 

693 timestamp = int(time.time()) - oidnonce.SKEW 

694 # pylint: disable=no-member 

695 OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete() 

696 

697 @staticmethod 

698 def cleanupAssociations(): # pragma: no cover 

699 """ 

700 Helper method to cleanup associations 

701 """ 

702 OpenIDAssociation.objects.extra( 

703 where=[f"issued + lifetimeint < ({time.time()})"] 

704 ).delete()