Coverage for ivatar/ivataraccount/models.py: 89%

272 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-12 23:12 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Our models for ivatar.ivataraccount 

4""" 

5 

6import base64 

7import hashlib 

8import time 

9from io import BytesIO 

10from os import urandom 

11from urllib.error import HTTPError, URLError 

12from ivatar.utils import urlopen, Bluesky 

13from urllib.parse import urlsplit, urlunsplit 

14 

15from PIL import Image 

16from django.contrib.auth.models import User 

17from django.contrib import messages 

18from django.db import models 

19from django.utils import timezone 

20from django.http import HttpResponseRedirect 

21from django.urls import reverse_lazy, reverse 

22from django.utils.translation import gettext_lazy as _ 

23from django.core.exceptions import ObjectDoesNotExist 

24from django.core.mail import send_mail 

25from django.template.loader import render_to_string 

26from openid.association import Association as OIDAssociation 

27from openid.store import nonce as oidnonce 

28from openid.store.interface import OpenIDStore 

29 

30from libravatar import libravatar_url 

31 

32from ivatar.settings import MAX_LENGTH_EMAIL, logger 

33from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY 

34from ivatar.settings import MAX_LENGTH_URL 

35from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL 

36from ivatar.utils import openid_variations 

37from .gravatar import get_photo as get_gravatar_photo 

38 

39 

40def file_format(image_type): 

41 """ 

42 Helper method returning a short image type 

43 """ 

44 if image_type in ("JPEG", "MPO"): 

45 return "jpg" 

46 elif image_type == "PNG": 

47 return "png" 

48 elif image_type == "GIF": 

49 return "gif" 

50 elif image_type == "WEBP": 

51 return "webp" 

52 return None 

53 

54 

55def pil_format(image_type): 

56 """ 

57 Helper method returning the 'encoder name' for PIL 

58 """ 

59 if image_type in ("jpg", "jpeg", "mpo"): 

60 return "JPEG" 

61 elif image_type == "png": 

62 return "PNG" 

63 elif image_type == "gif": 

64 return "GIF" 

65 elif image_type == "webp": 

66 return "WEBP" 

67 

68 logger.info("Unsupported file format: %s", image_type) 

69 return None 

70 

71 

72class UserPreference(models.Model): 

73 """ 

74 Holds the user users preferences 

75 """ 

76 

77 THEMES = ( 

78 ("default", "Default theme"), 

79 ("clime", "climes theme"), 

80 ("green", "green theme"), 

81 ("red", "red theme"), 

82 ) 

83 

84 theme = models.CharField( 

85 max_length=10, 

86 choices=THEMES, 

87 default="default", 

88 ) 

89 

90 user = models.OneToOneField( 

91 User, 

92 on_delete=models.deletion.CASCADE, 

93 primary_key=True, 

94 ) 

95 

96 def __str__(self): 

97 return "Preference (%i) for %s" % (self.pk, self.user) 

98 

99 

100class BaseAccountModel(models.Model): 

101 """ 

102 Base, abstract model, holding fields we use in all cases 

103 """ 

104 

105 user = models.ForeignKey( 

106 User, 

107 on_delete=models.deletion.CASCADE, 

108 ) 

109 ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True) 

110 add_date = models.DateTimeField(default=timezone.now) 

111 

112 class Meta: # pylint: disable=too-few-public-methods 

113 """ 

114 Class attributes 

115 """ 

116 

117 abstract = True 

118 

119 

120class Photo(BaseAccountModel): 

121 """ 

122 Model holding the photos and information about them 

123 """ 

124 

125 ip_address = models.GenericIPAddressField(unpack_ipv4=True) 

126 data = models.BinaryField() 

127 format = models.CharField(max_length=4) 

128 access_count = models.BigIntegerField(default=0, editable=False) 

129 

130 class Meta: # pylint: disable=too-few-public-methods 

131 """ 

132 Class attributes 

133 """ 

134 

135 verbose_name = _("photo") 

136 verbose_name_plural = _("photos") 

137 

138 def import_image(self, service_name, email_address): 

139 """ 

140 Allow to import image from other (eg. Gravatar) service 

141 """ 

142 image_url = False 

143 

144 if service_name == "Gravatar": 

145 if gravatar := get_gravatar_photo(email_address): 

146 image_url = gravatar["image_url"] 

147 

148 if service_name == "Libravatar": 

149 image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE) 

150 

151 if not image_url: 

152 return False # pragma: no cover 

153 try: 

154 image = urlopen(image_url) 

155 except HTTPError as exc: 

156 print(f"{service_name} import failed with an HTTP error: {exc.code}") 

157 return False 

158 except URLError as exc: 

159 print(f"{service_name} import failed: {exc.reason}") 

160 return False 

161 data = image.read() 

162 

163 try: 

164 img = Image.open(BytesIO(data)) 

165 # How am I supposed to test this? 

166 except ValueError: # pragma: no cover 

167 return False # pragma: no cover 

168 

169 self.format = file_format(img.format) 

170 if not self.format: 

171 print(f"Unable to determine format: {img}") 

172 return False # pragma: no cover 

173 self.data = data 

174 super().save() 

175 return True 

176 

177 def save( 

178 self, force_insert=False, force_update=False, using=None, update_fields=None 

179 ): 

180 """ 

181 Override save from parent, taking care about the image 

182 """ 

183 # Use PIL to read the file format 

184 try: 

185 img = Image.open(BytesIO(self.data)) 

186 except Exception as exc: # pylint: disable=broad-except 

187 # For debugging only 

188 print(f"Exception caught in Photo.save(): {exc}") 

189 return False 

190 self.format = file_format(img.format) 

191 if not self.format: 

192 print("Format not recognized") 

193 return False 

194 return super().save(force_insert, force_update, using, update_fields) 

195 

196 def perform_crop(self, request, dimensions, email, openid): 

197 """ 

198 Helper to crop the image 

199 """ 

200 if request.user.photo_set.count() == 1: 

201 # This is the first photo, assign to all confirmed addresses 

202 for addr in request.user.confirmedemail_set.all(): 

203 addr.photo = self 

204 addr.save() 

205 

206 for addr in request.user.confirmedopenid_set.all(): 

207 addr.photo = self 

208 addr.save() 

209 

210 if email: 

211 # Explicitly asked 

212 email.photo = self 

213 email.save() 

214 

215 if openid: 

216 # Explicitly asked 

217 openid.photo = self 

218 openid.save() 

219 

220 # Do the real work cropping 

221 img = Image.open(BytesIO(self.data)) 

222 

223 # This should be anyway checked during save... 

224 dimensions["a"], dimensions["b"] = img.size # pylint: disable=invalid-name 

225 if dimensions["a"] > MAX_PIXELS or dimensions["b"] > MAX_PIXELS: 

226 messages.error( 

227 request, 

228 _( 

229 "Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s" 

230 % { 

231 "max_pixels": MAX_PIXELS, 

232 } 

233 ), 

234 ) 

235 return HttpResponseRedirect(reverse_lazy("profile")) 

236 

237 if dimensions["w"] == 0 and dimensions["h"] == 0: 

238 dimensions["w"], dimensions["h"] = dimensions["a"], dimensions["b"] 

239 min_from_w_h = min(dimensions["w"], dimensions["h"]) 

240 dimensions["w"], dimensions["h"] = min_from_w_h, min_from_w_h 

241 elif ( 

242 (dimensions["w"] < 0) 

243 or ((dimensions["x"] + dimensions["w"]) > dimensions["a"]) 

244 or (dimensions["h"] < 0) 

245 or ((dimensions["y"] + dimensions["h"]) > dimensions["b"]) 

246 ): 

247 messages.error(request, _("Crop outside of original image bounding box")) 

248 return HttpResponseRedirect(reverse_lazy("profile")) 

249 

250 cropped = img.crop( 

251 ( 

252 dimensions["x"], 

253 dimensions["y"], 

254 dimensions["x"] + dimensions["w"], 

255 dimensions["y"] + dimensions["h"], 

256 ) 

257 ) 

258 # cropped.load() 

259 # Resize the image only if it's larger than the specified max width. 

260 cropped_w, cropped_h = cropped.size 

261 max_w = AVATAR_MAX_SIZE 

262 if cropped_w > max_w or cropped_h > max_w: 

263 cropped = cropped.resize((max_w, max_w), Image.LANCZOS) 

264 

265 data = BytesIO() 

266 cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY) 

267 data.seek(0) 

268 

269 # Overwrite the existing image 

270 self.data = data.read() 

271 self.save() 

272 

273 return HttpResponseRedirect(reverse_lazy("profile")) 

274 

275 def __str__(self): 

276 return "%s (%i) from %s" % (self.format, self.pk or 0, self.user) 

277 

278 

279# pylint: disable=too-few-public-methods 

280class ConfirmedEmailManager(models.Manager): 

281 """ 

282 Manager for our confirmed email addresses model 

283 """ 

284 

285 @staticmethod 

286 def create_confirmed_email(user, email_address, is_logged_in): 

287 """ 

288 Helper method to create confirmed email address 

289 """ 

290 confirmed = ConfirmedEmail() 

291 confirmed.user = user 

292 confirmed.ip_address = "0.0.0.0" 

293 confirmed.email = email_address 

294 confirmed.save() 

295 

296 external_photos = [] 

297 if is_logged_in: 

298 if gravatar := get_gravatar_photo(confirmed.email): 

299 external_photos.append(gravatar) 

300 

301 return (confirmed.pk, external_photos) 

302 

303 

304class ConfirmedEmail(BaseAccountModel): 

305 """ 

306 Model holding our confirmed email addresses, as well as the relation 

307 to the assigned photo 

308 """ 

309 

310 email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL) 

311 photo = models.ForeignKey( 

312 Photo, 

313 related_name="emails", 

314 blank=True, 

315 null=True, 

316 on_delete=models.deletion.SET_NULL, 

317 ) 

318 # Alternative assignment - use Bluesky handle 

319 bluesky_handle = models.CharField(max_length=256, null=True, blank=True) 

320 digest = models.CharField(max_length=32) 

321 digest_sha256 = models.CharField(max_length=64) 

322 objects = ConfirmedEmailManager() 

323 access_count = models.BigIntegerField(default=0, editable=False) 

324 

325 class Meta: # pylint: disable=too-few-public-methods 

326 """ 

327 Class attributes 

328 """ 

329 

330 verbose_name = _("confirmed email") 

331 verbose_name_plural = _("confirmed emails") 

332 

333 def set_photo(self, photo): 

334 """ 

335 Helper method to set photo 

336 """ 

337 self.photo = photo 

338 self.save() 

339 

340 def set_bluesky_handle(self, handle): 

341 """ 

342 Helper method to set Bluesky handle 

343 """ 

344 

345 bs = Bluesky() 

346 handle = bs.normalize_handle(handle) 

347 avatar = bs.get_profile(handle) 

348 if not avatar: 

349 raise ValueError("Invalid Bluesky handle") 

350 self.bluesky_handle = handle 

351 self.save() 

352 

353 def save( 

354 self, force_insert=False, force_update=False, using=None, update_fields=None 

355 ): 

356 """ 

357 Override save from parent, add digest 

358 """ 

359 self.digest = hashlib.md5( 

360 self.email.strip().lower().encode("utf-8") 

361 ).hexdigest() 

362 self.digest_sha256 = hashlib.sha256( 

363 self.email.strip().lower().encode("utf-8") 

364 ).hexdigest() 

365 return super().save(force_insert, force_update, using, update_fields) 

366 

367 def __str__(self): 

368 return "%s (%i) from %s" % (self.email, self.pk, self.user) 

369 

370 

371class UnconfirmedEmail(BaseAccountModel): 

372 """ 

373 Model holding unconfirmed email addresses as well as the verification key 

374 """ 

375 

376 email = models.EmailField(max_length=MAX_LENGTH_EMAIL) 

377 verification_key = models.CharField(max_length=64) 

378 last_send_date = models.DateTimeField(null=True, blank=True) 

379 last_status = models.TextField(max_length=2047, null=True, blank=True) 

380 

381 class Meta: # pylint: disable=too-few-public-methods 

382 """ 

383 Class attributes 

384 """ 

385 

386 verbose_name = _("unconfirmed email") 

387 verbose_name_plural = _("unconfirmed emails") 

388 

389 def save( 

390 self, force_insert=False, force_update=False, using=None, update_fields=None 

391 ): 

392 if not self.verification_key: 

393 hash_object = hashlib.new("sha256") 

394 hash_object.update( 

395 urandom(1024) 

396 + self.user.username.encode("utf-8") # pylint: disable=no-member 

397 ) # pylint: disable=no-member 

398 self.verification_key = hash_object.hexdigest() 

399 super(UnconfirmedEmail, self).save( 

400 force_insert, force_update, using, update_fields 

401 ) 

402 

403 def send_confirmation_mail(self, url=SECURE_BASE_URL): 

404 """ 

405 Send confirmation mail to that mail address 

406 """ 

407 link = url + reverse( 

408 "confirm_email", kwargs={"verification_key": self.verification_key} 

409 ) 

410 email_subject = _("Confirm your email address on %s") % SITE_NAME 

411 email_body = render_to_string( 

412 "email_confirmation.txt", 

413 { 

414 "verification_link": link, 

415 "site_name": SITE_NAME, 

416 }, 

417 ) 

418 self.last_send_date = timezone.now() 

419 self.last_status = "OK" 

420 # if settings.DEBUG: 

421 # print('DEBUG: %s' % link) 

422 try: 

423 send_mail(email_subject, email_body, DEFAULT_FROM_EMAIL, [self.email]) 

424 except Exception as e: 

425 self.last_status = f"{e}" 

426 self.save() 

427 return True 

428 

429 def __str__(self): 

430 return "%s (%i) from %s" % (self.email, self.pk, self.user) 

431 

432 

433class UnconfirmedOpenId(BaseAccountModel): 

434 """ 

435 Model holding unconfirmed OpenIDs 

436 """ 

437 

438 openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL) 

439 

440 class Meta: # pylint: disable=too-few-public-methods 

441 """ 

442 Meta class 

443 """ 

444 

445 verbose_name = _("unconfirmed OpenID") 

446 verbose_name_plural = "unconfirmed_OpenIDs" 

447 

448 def __str__(self): 

449 return "%s (%i) from %s" % (self.openid, self.pk, self.user) 

450 

451 

452class ConfirmedOpenId(BaseAccountModel): 

453 """ 

454 Model holding confirmed OpenIDs, as well as the relation to 

455 the assigned photo 

456 """ 

457 

458 openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL) 

459 photo = models.ForeignKey( 

460 Photo, 

461 related_name="openids", 

462 blank=True, 

463 null=True, 

464 on_delete=models.deletion.SET_NULL, 

465 ) 

466 # http://<id>/ base version - http w/ trailing slash 

467 digest = models.CharField(max_length=64) 

468 # http://<id> - http w/o trailing slash 

469 alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None) 

470 # https://<id>/ - https w/ trailing slash 

471 alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None) 

472 # https://<id> - https w/o trailing slash 

473 alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None) 

474 # Alternative assignment - use Bluesky handle 

475 bluesky_handle = models.CharField(max_length=256, null=True, blank=True) 

476 

477 access_count = models.BigIntegerField(default=0, editable=False) 

478 

479 class Meta: # pylint: disable=too-few-public-methods 

480 """ 

481 Meta class 

482 """ 

483 

484 verbose_name = _("confirmed OpenID") 

485 verbose_name_plural = _("confirmed OpenIDs") 

486 

487 def set_photo(self, photo): 

488 """ 

489 Helper method to save photo 

490 """ 

491 self.photo = photo 

492 self.save() 

493 

494 def set_bluesky_handle(self, handle): 

495 """ 

496 Helper method to set Bluesky handle 

497 """ 

498 bs = Bluesky() 

499 handle = bs.normalize_handle(handle) 

500 avatar = bs.get_profile(handle) 

501 if not avatar: 

502 raise ValueError("Invalid Bluesky handle") 

503 self.bluesky_handle = handle 

504 self.save() 

505 

506 def save( 

507 self, force_insert=False, force_update=False, using=None, update_fields=None 

508 ): 

509 url = urlsplit(self.openid) 

510 if url.username: # pragma: no cover 

511 password = url.password or "" 

512 netloc = f"{url.username}:{password}@{url.hostname}" 

513 else: 

514 netloc = url.hostname 

515 lowercase_url = urlunsplit( 

516 (url.scheme.lower(), netloc, url.path, url.query, url.fragment) 

517 ) 

518 self.openid = lowercase_url 

519 

520 self.digest = hashlib.sha256( 

521 openid_variations(lowercase_url)[0].encode("utf-8") 

522 ).hexdigest() 

523 self.alt_digest1 = hashlib.sha256( 

524 openid_variations(lowercase_url)[1].encode("utf-8") 

525 ).hexdigest() 

526 self.alt_digest2 = hashlib.sha256( 

527 openid_variations(lowercase_url)[2].encode("utf-8") 

528 ).hexdigest() 

529 self.alt_digest3 = hashlib.sha256( 

530 openid_variations(lowercase_url)[3].encode("utf-8") 

531 ).hexdigest() 

532 

533 return super().save(force_insert, force_update, using, update_fields) 

534 

535 def __str__(self): 

536 return "%s (%i) (%s)" % (self.openid, self.pk, self.user) 

537 

538 

539class OpenIDNonce(models.Model): 

540 """ 

541 Model holding OpenID Nonces 

542 See also: https://github.com/edx/django-openid-auth/ 

543 """ 

544 

545 server_url = models.CharField(max_length=255) 

546 timestamp = models.IntegerField() 

547 salt = models.CharField(max_length=128) 

548 

549 def __str__(self): 

550 return "%s (%i) (timestamp: %i)" % (self.server_url, self.pk, self.timestamp) 

551 

552 

553class OpenIDAssociation(models.Model): 

554 """ 

555 Model holding the relation/association about OpenIDs 

556 """ 

557 

558 server_url = models.TextField(max_length=2047) 

559 handle = models.CharField(max_length=255) 

560 secret = models.TextField(max_length=255) # stored base64 encoded 

561 issued = models.IntegerField() 

562 lifetime = models.IntegerField() 

563 assoc_type = models.TextField(max_length=64) 

564 

565 def __str__(self): 

566 return "%s (%i) (%s, lifetime: %i)" % ( 

567 self.server_url, 

568 self.pk, 

569 self.assoc_type, 

570 self.lifetime, 

571 ) 

572 

573 

574class DjangoOpenIDStore(OpenIDStore): 

575 """ 

576 The Python openid library needs an OpenIDStore subclass to persist data 

577 related to OpenID authentications. This one uses our Django models. 

578 """ 

579 

580 @staticmethod 

581 def storeAssociation(server_url, association): # pragma: no cover 

582 """ 

583 Helper method to store associations 

584 """ 

585 assoc = OpenIDAssociation( 

586 server_url=server_url, 

587 handle=association.handle, 

588 secret=base64.encodebytes(association.secret), 

589 issued=association.issued, 

590 lifetime=association.issued, 

591 assoc_type=association.assoc_type, 

592 ) 

593 assoc.save() 

594 

595 def getAssociation(self, server_url, handle=None): # pragma: no cover 

596 """ 

597 Helper method to get associations 

598 """ 

599 assocs = [] 

600 if handle is not None: 

601 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member 

602 server_url=server_url, handle=handle 

603 ) 

604 else: 

605 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member 

606 server_url=server_url 

607 ) 

608 if not assocs: 

609 return None 

610 associations = [] 

611 for assoc in assocs: 

612 if isinstance(assoc.secret, str): 

613 assoc.secret = assoc.secret.split("b'")[1].split("'")[0] 

614 assoc.secret = bytes(assoc.secret, "utf-8") 

615 association = OIDAssociation( 

616 assoc.handle, 

617 base64.decodebytes(assoc.secret), 

618 assoc.issued, 

619 assoc.lifetime, 

620 assoc.assoc_type, 

621 ) 

622 expires = 0 

623 try: 

624 # pylint: disable=no-member 

625 expires = association.getExpiresIn() 

626 except AttributeError: 

627 expires = association.expiresIn 

628 if expires == 0: 

629 self.removeAssociation(server_url, assoc.handle) 

630 else: 

631 associations.append((association.issued, association)) 

632 return associations[-1][1] if associations else None 

633 

634 @staticmethod 

635 def removeAssociation(server_url, handle): # pragma: no cover 

636 """ 

637 Helper method to remove associations 

638 """ 

639 assocs = list( 

640 OpenIDAssociation.objects.filter( # pylint: disable=no-member 

641 server_url=server_url, handle=handle 

642 ) 

643 ) 

644 assocs_exist = len(assocs) > 0 

645 for assoc in assocs: 

646 assoc.delete() 

647 return assocs_exist 

648 

649 @staticmethod 

650 def useNonce(server_url, timestamp, salt): # pragma: no cover 

651 """ 

652 Helper method to 'use' nonces 

653 """ 

654 # Has nonce expired? 

655 if abs(timestamp - time.time()) > oidnonce.SKEW: 

656 return False 

657 try: 

658 nonce = OpenIDNonce.objects.get( # pylint: disable=no-member 

659 server_url__exact=server_url, 

660 timestamp__exact=timestamp, 

661 salt__exact=salt, 

662 ) 

663 except ObjectDoesNotExist: 

664 nonce = OpenIDNonce.objects.create( # pylint: disable=no-member 

665 server_url=server_url, timestamp=timestamp, salt=salt 

666 ) 

667 return True 

668 nonce.delete() 

669 return False 

670 

671 @staticmethod 

672 def cleanupNonces(): # pragma: no cover 

673 """ 

674 Helper method to cleanup nonces 

675 """ 

676 timestamp = int(time.time()) - oidnonce.SKEW 

677 # pylint: disable=no-member 

678 OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete() 

679 

680 @staticmethod 

681 def cleanupAssociations(): # pragma: no cover 

682 """ 

683 Helper method to cleanup associations 

684 """ 

685 OpenIDAssociation.objects.extra( 

686 where=[f"issued + lifetimeint < ({time.time()})"] 

687 ).delete()