Coverage for ivatar/ivataraccount/models.py: 89%

261 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-18 23:09 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Our models for ivatar.ivataraccount 

4""" 

5 

6import base64 

7import hashlib 

8import time 

9from io import BytesIO 

10from os import urandom 

11from urllib.error import HTTPError, URLError 

12from urllib.request import urlopen 

13from urllib.parse import urlsplit, urlunsplit 

14 

15from PIL import Image 

16from django.contrib.auth.models import User 

17from django.contrib import messages 

18from django.db import models 

19from django.utils import timezone 

20from django.http import HttpResponseRedirect 

21from django.urls import reverse_lazy, reverse 

22from django.utils.translation import gettext_lazy as _ 

23from django.core.exceptions import ObjectDoesNotExist 

24from django.core.mail import send_mail 

25from django.template.loader import render_to_string 

26from openid.association import Association as OIDAssociation 

27from openid.store import nonce as oidnonce 

28from openid.store.interface import OpenIDStore 

29 

30from libravatar import libravatar_url 

31 

32from ivatar.settings import MAX_LENGTH_EMAIL, logger 

33from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY 

34from ivatar.settings import MAX_LENGTH_URL 

35from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL 

36from ivatar.utils import openid_variations 

37from .gravatar import get_photo as get_gravatar_photo 

38 

39 

40def file_format(image_type): 

41 """ 

42 Helper method returning a short image type 

43 """ 

44 if image_type in ("JPEG", "MPO"): 

45 return "jpg" 

46 elif image_type == "PNG": 

47 return "png" 

48 elif image_type == "GIF": 

49 return "gif" 

50 elif image_type == "WEBP": 

51 return "webp" 

52 return None 

53 

54 

55def pil_format(image_type): 

56 """ 

57 Helper method returning the 'encoder name' for PIL 

58 """ 

59 if image_type in ("jpg", "jpeg", "mpo"): 

60 return "JPEG" 

61 elif image_type == "png": 

62 return "PNG" 

63 elif image_type == "gif": 

64 return "GIF" 

65 elif image_type == "webp": 

66 return "WEBP" 

67 

68 logger.info("Unsupported file format: %s", image_type) 

69 return None 

70 

71 

72class UserPreference(models.Model): 

73 """ 

74 Holds the user users preferences 

75 """ 

76 

77 THEMES = ( 

78 ("default", "Default theme"), 

79 ("clime", "climes theme"), 

80 ("green", "green theme"), 

81 ("red", "red theme"), 

82 ) 

83 

84 theme = models.CharField( 

85 max_length=10, 

86 choices=THEMES, 

87 default="default", 

88 ) 

89 

90 user = models.OneToOneField( 

91 User, 

92 on_delete=models.deletion.CASCADE, 

93 primary_key=True, 

94 ) 

95 

96 def __str__(self): 

97 return "Preference (%i) for %s" % (self.pk, self.user) 

98 

99 

100class BaseAccountModel(models.Model): 

101 """ 

102 Base, abstract model, holding fields we use in all cases 

103 """ 

104 

105 user = models.ForeignKey( 

106 User, 

107 on_delete=models.deletion.CASCADE, 

108 ) 

109 ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True) 

110 add_date = models.DateTimeField(default=timezone.now) 

111 

112 class Meta: # pylint: disable=too-few-public-methods 

113 """ 

114 Class attributes 

115 """ 

116 

117 abstract = True 

118 

119 

120class Photo(BaseAccountModel): 

121 """ 

122 Model holding the photos and information about them 

123 """ 

124 

125 ip_address = models.GenericIPAddressField(unpack_ipv4=True) 

126 data = models.BinaryField() 

127 format = models.CharField(max_length=3) 

128 access_count = models.BigIntegerField(default=0, editable=False) 

129 

130 class Meta: # pylint: disable=too-few-public-methods 

131 """ 

132 Class attributes 

133 """ 

134 

135 verbose_name = _("photo") 

136 verbose_name_plural = _("photos") 

137 

138 def import_image(self, service_name, email_address): 

139 """ 

140 Allow to import image from other (eg. Gravatar) service 

141 """ 

142 image_url = False 

143 

144 if service_name == "Gravatar": 

145 gravatar = get_gravatar_photo(email_address) 

146 if gravatar: 

147 image_url = gravatar["image_url"] 

148 

149 if service_name == "Libravatar": 

150 image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE) 

151 

152 if not image_url: 

153 return False # pragma: no cover 

154 try: 

155 image = urlopen(image_url) 

156 # No idea how to test this 

157 # pragma: no cover 

158 except HTTPError as exc: 

159 print("%s import failed with an HTTP error: %s" % (service_name, exc.code)) 

160 return False 

161 # No idea how to test this 

162 # pragma: no cover 

163 except URLError as exc: 

164 print("%s import failed: %s" % (service_name, exc.reason)) 

165 return False 

166 data = image.read() 

167 

168 try: 

169 img = Image.open(BytesIO(data)) 

170 # How am I supposed to test this? 

171 except ValueError: # pragma: no cover 

172 return False # pragma: no cover 

173 

174 self.format = file_format(img.format) 

175 if not self.format: 

176 print("Unable to determine format: %s" % img) # pragma: no cover 

177 return False # pragma: no cover 

178 self.data = data 

179 super().save() 

180 return True 

181 

182 def save( 

183 self, force_insert=False, force_update=False, using=None, update_fields=None 

184 ): 

185 """ 

186 Override save from parent, taking care about the image 

187 """ 

188 # Use PIL to read the file format 

189 try: 

190 img = Image.open(BytesIO(self.data)) 

191 # Testing? Ideas anyone? 

192 except Exception as exc: # pylint: disable=broad-except 

193 # For debugging only 

194 print("Exception caught in Photo.save(): %s" % exc) 

195 return False 

196 self.format = file_format(img.format) 

197 if not self.format: 

198 print("Format not recognized") 

199 return False 

200 return super().save(force_insert, force_update, using, update_fields) 

201 

202 def perform_crop(self, request, dimensions, email, openid): 

203 """ 

204 Helper to crop the image 

205 """ 

206 if request.user.photo_set.count() == 1: 

207 # This is the first photo, assign to all confirmed addresses 

208 for addr in request.user.confirmedemail_set.all(): 

209 addr.photo = self 

210 addr.save() 

211 

212 for addr in request.user.confirmedopenid_set.all(): 

213 addr.photo = self 

214 addr.save() 

215 

216 if email: 

217 # Explicitly asked 

218 email.photo = self 

219 email.save() 

220 

221 if openid: 

222 # Explicitly asked 

223 openid.photo = self 

224 openid.save() 

225 

226 # Do the real work cropping 

227 img = Image.open(BytesIO(self.data)) 

228 

229 # This should be anyway checked during save... 

230 dimensions["a"], dimensions["b"] = img.size # pylint: disable=invalid-name 

231 if dimensions["a"] > MAX_PIXELS or dimensions["b"] > MAX_PIXELS: 

232 messages.error( 

233 request, 

234 _( 

235 "Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s" 

236 % { 

237 "max_pixels": MAX_PIXELS, 

238 } 

239 ), 

240 ) 

241 return HttpResponseRedirect(reverse_lazy("profile")) 

242 

243 if dimensions["w"] == 0 and dimensions["h"] == 0: 

244 dimensions["w"], dimensions["h"] = dimensions["a"], dimensions["b"] 

245 min_from_w_h = min(dimensions["w"], dimensions["h"]) 

246 dimensions["w"], dimensions["h"] = min_from_w_h, min_from_w_h 

247 elif ( 

248 (dimensions["w"] < 0) 

249 or ((dimensions["x"] + dimensions["w"]) > dimensions["a"]) 

250 or (dimensions["h"] < 0) 

251 or ((dimensions["y"] + dimensions["h"]) > dimensions["b"]) 

252 ): 

253 messages.error(request, _("Crop outside of original image bounding box")) 

254 return HttpResponseRedirect(reverse_lazy("profile")) 

255 

256 cropped = img.crop( 

257 ( 

258 dimensions["x"], 

259 dimensions["y"], 

260 dimensions["x"] + dimensions["w"], 

261 dimensions["y"] + dimensions["h"], 

262 ) 

263 ) 

264 # cropped.load() 

265 # Resize the image only if it's larger than the specified max width. 

266 cropped_w, cropped_h = cropped.size 

267 max_w = AVATAR_MAX_SIZE 

268 if cropped_w > max_w or cropped_h > max_w: 

269 cropped = cropped.resize((max_w, max_w), Image.LANCZOS) 

270 

271 data = BytesIO() 

272 cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY) 

273 data.seek(0) 

274 

275 # Overwrite the existing image 

276 self.data = data.read() 

277 self.save() 

278 

279 return HttpResponseRedirect(reverse_lazy("profile")) 

280 

281 def __str__(self): 

282 return "%s (%i) from %s" % (self.format, self.pk or 0, self.user) 

283 

284 

285# pylint: disable=too-few-public-methods 

286class ConfirmedEmailManager(models.Manager): 

287 """ 

288 Manager for our confirmed email addresses model 

289 """ 

290 

291 @staticmethod 

292 def create_confirmed_email(user, email_address, is_logged_in): 

293 """ 

294 Helper method to create confirmed email address 

295 """ 

296 confirmed = ConfirmedEmail() 

297 confirmed.user = user 

298 confirmed.ip_address = "0.0.0.0" 

299 confirmed.email = email_address 

300 confirmed.save() 

301 

302 external_photos = [] 

303 if is_logged_in: 

304 gravatar = get_gravatar_photo(confirmed.email) 

305 if gravatar: 

306 external_photos.append(gravatar) 

307 

308 return (confirmed.pk, external_photos) 

309 

310 

311class ConfirmedEmail(BaseAccountModel): 

312 """ 

313 Model holding our confirmed email addresses, as well as the relation 

314 to the assigned photo 

315 """ 

316 

317 email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL) 

318 photo = models.ForeignKey( 

319 Photo, 

320 related_name="emails", 

321 blank=True, 

322 null=True, 

323 on_delete=models.deletion.SET_NULL, 

324 ) 

325 digest = models.CharField(max_length=32) 

326 digest_sha256 = models.CharField(max_length=64) 

327 objects = ConfirmedEmailManager() 

328 access_count = models.BigIntegerField(default=0, editable=False) 

329 

330 class Meta: # pylint: disable=too-few-public-methods 

331 """ 

332 Class attributes 

333 """ 

334 

335 verbose_name = _("confirmed email") 

336 verbose_name_plural = _("confirmed emails") 

337 

338 def set_photo(self, photo): 

339 """ 

340 Helper method to set photo 

341 """ 

342 self.photo = photo 

343 self.save() 

344 

345 def save( 

346 self, force_insert=False, force_update=False, using=None, update_fields=None 

347 ): 

348 """ 

349 Override save from parent, add digest 

350 """ 

351 self.digest = hashlib.md5( 

352 self.email.strip().lower().encode("utf-8") 

353 ).hexdigest() 

354 self.digest_sha256 = hashlib.sha256( 

355 self.email.strip().lower().encode("utf-8") 

356 ).hexdigest() 

357 return super().save(force_insert, force_update, using, update_fields) 

358 

359 def __str__(self): 

360 return "%s (%i) from %s" % (self.email, self.pk, self.user) 

361 

362 

363class UnconfirmedEmail(BaseAccountModel): 

364 """ 

365 Model holding unconfirmed email addresses as well as the verification key 

366 """ 

367 

368 email = models.EmailField(max_length=MAX_LENGTH_EMAIL) 

369 verification_key = models.CharField(max_length=64) 

370 last_send_date = models.DateTimeField(null=True, blank=True) 

371 last_status = models.TextField(max_length=2047, null=True, blank=True) 

372 

373 class Meta: # pylint: disable=too-few-public-methods 

374 """ 

375 Class attributes 

376 """ 

377 

378 verbose_name = _("unconfirmed email") 

379 verbose_name_plural = _("unconfirmed emails") 

380 

381 def save( 

382 self, force_insert=False, force_update=False, using=None, update_fields=None 

383 ): 

384 if not self.verification_key: 

385 hash_object = hashlib.new("sha256") 

386 hash_object.update( 

387 urandom(1024) 

388 + self.user.username.encode("utf-8") # pylint: disable=no-member 

389 ) # pylint: disable=no-member 

390 self.verification_key = hash_object.hexdigest() 

391 super(UnconfirmedEmail, self).save( 

392 force_insert, force_update, using, update_fields 

393 ) 

394 

395 def send_confirmation_mail(self, url=SECURE_BASE_URL): 

396 """ 

397 Send confirmation mail to that mail address 

398 """ 

399 link = url + reverse( 

400 "confirm_email", kwargs={"verification_key": self.verification_key} 

401 ) 

402 email_subject = _("Confirm your email address on %s") % SITE_NAME 

403 email_body = render_to_string( 

404 "email_confirmation.txt", 

405 { 

406 "verification_link": link, 

407 "site_name": SITE_NAME, 

408 }, 

409 ) 

410 self.last_send_date = timezone.now() 

411 self.last_status = "OK" 

412 # if settings.DEBUG: 

413 # print('DEBUG: %s' % link) 

414 try: 

415 send_mail(email_subject, email_body, DEFAULT_FROM_EMAIL, [self.email]) 

416 except Exception as e: 

417 self.last_status = "%s" % e 

418 self.save() 

419 return True 

420 

421 def __str__(self): 

422 return "%s (%i) from %s" % (self.email, self.pk, self.user) 

423 

424 

425class UnconfirmedOpenId(BaseAccountModel): 

426 """ 

427 Model holding unconfirmed OpenIDs 

428 """ 

429 

430 openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL) 

431 

432 class Meta: # pylint: disable=too-few-public-methods 

433 """ 

434 Meta class 

435 """ 

436 

437 verbose_name = _("unconfirmed OpenID") 

438 verbose_name_plural = "unconfirmed_OpenIDs" 

439 

440 def __str__(self): 

441 return "%s (%i) from %s" % (self.openid, self.pk, self.user) 

442 

443 

444class ConfirmedOpenId(BaseAccountModel): 

445 """ 

446 Model holding confirmed OpenIDs, as well as the relation to 

447 the assigned photo 

448 """ 

449 

450 openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL) 

451 photo = models.ForeignKey( 

452 Photo, 

453 related_name="openids", 

454 blank=True, 

455 null=True, 

456 on_delete=models.deletion.SET_NULL, 

457 ) 

458 # http://<id>/ base version - http w/ trailing slash 

459 digest = models.CharField(max_length=64) 

460 # http://<id> - http w/o trailing slash 

461 alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None) 

462 # https://<id>/ - https w/ trailing slash 

463 alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None) 

464 # https://<id> - https w/o trailing slash 

465 alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None) 

466 

467 access_count = models.BigIntegerField(default=0, editable=False) 

468 

469 class Meta: # pylint: disable=too-few-public-methods 

470 """ 

471 Meta class 

472 """ 

473 

474 verbose_name = _("confirmed OpenID") 

475 verbose_name_plural = _("confirmed OpenIDs") 

476 

477 def set_photo(self, photo): 

478 """ 

479 Helper method to save photo 

480 """ 

481 self.photo = photo 

482 self.save() 

483 

484 def save( 

485 self, force_insert=False, force_update=False, using=None, update_fields=None 

486 ): 

487 url = urlsplit(self.openid) 

488 if url.username: # pragma: no cover 

489 password = url.password or "" 

490 netloc = url.username + ":" + password + "@" + url.hostname 

491 else: 

492 netloc = url.hostname 

493 lowercase_url = urlunsplit( 

494 (url.scheme.lower(), netloc, url.path, url.query, url.fragment) 

495 ) 

496 self.openid = lowercase_url 

497 

498 self.digest = hashlib.sha256( 

499 openid_variations(lowercase_url)[0].encode("utf-8") 

500 ).hexdigest() 

501 self.alt_digest1 = hashlib.sha256( 

502 openid_variations(lowercase_url)[1].encode("utf-8") 

503 ).hexdigest() 

504 self.alt_digest2 = hashlib.sha256( 

505 openid_variations(lowercase_url)[2].encode("utf-8") 

506 ).hexdigest() 

507 self.alt_digest3 = hashlib.sha256( 

508 openid_variations(lowercase_url)[3].encode("utf-8") 

509 ).hexdigest() 

510 

511 return super().save(force_insert, force_update, using, update_fields) 

512 

513 def __str__(self): 

514 return "%s (%i) (%s)" % (self.openid, self.pk, self.user) 

515 

516 

517class OpenIDNonce(models.Model): 

518 """ 

519 Model holding OpenID Nonces 

520 See also: https://github.com/edx/django-openid-auth/ 

521 """ 

522 

523 server_url = models.CharField(max_length=255) 

524 timestamp = models.IntegerField() 

525 salt = models.CharField(max_length=128) 

526 

527 def __str__(self): 

528 return "%s (%i) (timestamp: %i)" % (self.server_url, self.pk, self.timestamp) 

529 

530 

531class OpenIDAssociation(models.Model): 

532 """ 

533 Model holding the relation/association about OpenIDs 

534 """ 

535 

536 server_url = models.TextField(max_length=2047) 

537 handle = models.CharField(max_length=255) 

538 secret = models.TextField(max_length=255) # stored base64 encoded 

539 issued = models.IntegerField() 

540 lifetime = models.IntegerField() 

541 assoc_type = models.TextField(max_length=64) 

542 

543 def __str__(self): 

544 return "%s (%i) (%s, lifetime: %i)" % ( 

545 self.server_url, 

546 self.pk, 

547 self.assoc_type, 

548 self.lifetime, 

549 ) 

550 

551 

552class DjangoOpenIDStore(OpenIDStore): 

553 """ 

554 The Python openid library needs an OpenIDStore subclass to persist data 

555 related to OpenID authentications. This one uses our Django models. 

556 """ 

557 

558 @staticmethod 

559 def storeAssociation(server_url, association): # pragma: no cover 

560 """ 

561 Helper method to store associations 

562 """ 

563 assoc = OpenIDAssociation( 

564 server_url=server_url, 

565 handle=association.handle, 

566 secret=base64.encodebytes(association.secret), 

567 issued=association.issued, 

568 lifetime=association.issued, 

569 assoc_type=association.assoc_type, 

570 ) 

571 assoc.save() 

572 

573 def getAssociation(self, server_url, handle=None): # pragma: no cover 

574 """ 

575 Helper method to get associations 

576 """ 

577 assocs = [] 

578 if handle is not None: 

579 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member 

580 server_url=server_url, handle=handle 

581 ) 

582 else: 

583 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member 

584 server_url=server_url 

585 ) 

586 if not assocs: 

587 return None 

588 associations = [] 

589 for assoc in assocs: 

590 if isinstance(assoc.secret, str): 

591 assoc.secret = assoc.secret.split("b'")[1].split("'")[0] 

592 assoc.secret = bytes(assoc.secret, "utf-8") 

593 association = OIDAssociation( 

594 assoc.handle, 

595 base64.decodebytes(assoc.secret), 

596 assoc.issued, 

597 assoc.lifetime, 

598 assoc.assoc_type, 

599 ) 

600 expires = 0 

601 try: 

602 # pylint: disable=no-member 

603 expires = association.getExpiresIn() 

604 except AttributeError: 

605 expires = association.expiresIn 

606 if expires == 0: 

607 self.removeAssociation(server_url, assoc.handle) 

608 else: 

609 associations.append((association.issued, association)) 

610 if not associations: 

611 return None 

612 return associations[-1][1] 

613 

614 @staticmethod 

615 def removeAssociation(server_url, handle): # pragma: no cover 

616 """ 

617 Helper method to remove associations 

618 """ 

619 assocs = list( 

620 OpenIDAssociation.objects.filter( # pylint: disable=no-member 

621 server_url=server_url, handle=handle 

622 ) 

623 ) 

624 assocs_exist = len(assocs) > 0 

625 for assoc in assocs: 

626 assoc.delete() 

627 return assocs_exist 

628 

629 @staticmethod 

630 def useNonce(server_url, timestamp, salt): # pragma: no cover 

631 """ 

632 Helper method to 'use' nonces 

633 """ 

634 # Has nonce expired? 

635 if abs(timestamp - time.time()) > oidnonce.SKEW: 

636 return False 

637 try: 

638 nonce = OpenIDNonce.objects.get( # pylint: disable=no-member 

639 server_url__exact=server_url, 

640 timestamp__exact=timestamp, 

641 salt__exact=salt, 

642 ) 

643 except ObjectDoesNotExist: 

644 nonce = OpenIDNonce.objects.create( # pylint: disable=no-member 

645 server_url=server_url, timestamp=timestamp, salt=salt 

646 ) 

647 return True 

648 nonce.delete() 

649 return False 

650 

651 @staticmethod 

652 def cleanupNonces(): # pragma: no cover 

653 """ 

654 Helper method to cleanup nonces 

655 """ 

656 timestamp = int(time.time()) - oidnonce.SKEW 

657 # pylint: disable=no-member 

658 OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete() 

659 

660 @staticmethod 

661 def cleanupAssociations(): # pragma: no cover 

662 """ 

663 Helper method to cleanup associations 

664 """ 

665 OpenIDAssociation.objects.extra( # pylint: disable=no-member 

666 where=["issued + lifetimeint < (%s)" % time.time()] 

667 ).delete()