Coverage for ivatar / ivataraccount / models.py: 86%

307 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2025-12-30 00:08 +0000

1""" 

2Our models for ivatar.ivataraccount 

3""" 

4 

5import base64 

6import hashlib 

7import time 

8from io import BytesIO 

9from os import urandom 

10from urllib.error import HTTPError, URLError 

11from ivatar.utils import urlopen, Bluesky 

12from urllib.parse import urlsplit, urlunsplit, quote 

13import logging 

14 

15from PIL import Image 

16from django.contrib.auth.models import User 

17from django.contrib import messages 

18from django.db import models 

19from django.utils import timezone 

20from django.http import HttpResponseRedirect 

21from django.urls import reverse_lazy, reverse 

22from django.utils.translation import gettext_lazy as _ 

23from django.core.cache import cache 

24from django.core.exceptions import ObjectDoesNotExist 

25from django.core.mail import send_mail 

26from django.template.loader import render_to_string 

27from openid.association import Association as OIDAssociation 

28from openid.store import nonce as oidnonce 

29from openid.store.interface import OpenIDStore 

30 

31from libravatar import libravatar_url 

32 

33from ivatar.settings import MAX_LENGTH_EMAIL 

34from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY 

35from ivatar.settings import MAX_LENGTH_URL 

36from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL 

37from ivatar.utils import openid_variations 

38from .gravatar import get_photo as get_gravatar_photo 

39 

40# Initialize logger 

41logger = logging.getLogger("ivatar") 

42 

43 

44def file_format(image_type): 

45 """ 

46 Helper method returning a short image type 

47 """ 

48 if image_type in ("JPEG", "MPO"): 

49 return "jpg" 

50 elif image_type == "PNG": 

51 return "png" 

52 elif image_type == "GIF": 

53 return "gif" 

54 elif image_type == "WEBP": 

55 return "webp" 

56 return None 

57 

58 

59def pil_format(image_type): 

60 """ 

61 Helper method returning the 'encoder name' for PIL 

62 """ 

63 if image_type in ("jpg", "jpeg", "mpo"): 

64 return "JPEG" 

65 elif image_type == "png": 

66 return "PNG" 

67 elif image_type == "gif": 

68 return "GIF" 

69 elif image_type == "webp": 

70 return "WEBP" 

71 

72 logger.info("Unsupported file format: %s", image_type) 

73 return None 

74 

75 

76class UserPreference(models.Model): 

77 """ 

78 Holds the user users preferences 

79 """ 

80 

81 THEMES = ( 

82 ("default", "Default theme"), 

83 ("clime", "climes theme"), 

84 ("green", "green theme"), 

85 ("red", "red theme"), 

86 ) 

87 

88 theme = models.CharField( 

89 max_length=10, 

90 choices=THEMES, 

91 default="default", 

92 ) 

93 

94 user = models.OneToOneField( 

95 User, 

96 on_delete=models.deletion.CASCADE, 

97 primary_key=True, 

98 ) 

99 

100 def __str__(self): 

101 return "Preference (%i) for %s" % (self.pk, self.user) 

102 

103 

104class BaseAccountModel(models.Model): 

105 """ 

106 Base, abstract model, holding fields we use in all cases 

107 """ 

108 

109 user = models.ForeignKey( 

110 User, 

111 on_delete=models.deletion.CASCADE, 

112 ) 

113 ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True) 

114 add_date = models.DateTimeField(default=timezone.now) 

115 

116 class Meta: # pylint: disable=too-few-public-methods 

117 """ 

118 Class attributes 

119 """ 

120 

121 abstract = True 

122 

123 

124class Photo(BaseAccountModel): 

125 """ 

126 Model holding the photos and information about them 

127 """ 

128 

129 ip_address = models.GenericIPAddressField(unpack_ipv4=True) 

130 data = models.BinaryField() 

131 format = models.CharField(max_length=4) 

132 access_count = models.BigIntegerField(default=0, editable=False) 

133 

134 class Meta: # pylint: disable=too-few-public-methods 

135 """ 

136 Class attributes 

137 """ 

138 

139 verbose_name = _("photo") 

140 verbose_name_plural = _("photos") 

141 indexes = [ 

142 models.Index(fields=["format"], name="idx_photo_format"), 

143 models.Index(fields=["access_count"], name="idx_photo_access_count"), 

144 models.Index(fields=["user_id", "format"], name="idx_photo_user_format"), 

145 ] 

146 

147 def import_image(self, service_name, email_address): 

148 """ 

149 Allow to import image from other (eg. Gravatar) service 

150 """ 

151 image_url = False 

152 

153 if service_name == "Gravatar": 

154 if gravatar := get_gravatar_photo(email_address): 

155 image_url = gravatar["image_url"] 

156 

157 if service_name == "Libravatar": 

158 image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE) 

159 

160 if not image_url: 

161 return False # pragma: no cover 

162 try: 

163 image = urlopen(image_url) 

164 except HTTPError as exc: 

165 logger.warning( 

166 f"{service_name} import failed with an HTTP error: {exc.code}" 

167 ) 

168 return False 

169 except URLError as exc: 

170 logger.warning(f"{service_name} import failed: {exc.reason}") 

171 return False 

172 data = image.read() 

173 

174 try: 

175 img = Image.open(BytesIO(data)) 

176 # How am I supposed to test this? 

177 except ValueError: # pragma: no cover 

178 return False # pragma: no cover 

179 

180 self.format = file_format(img.format) 

181 if not self.format: 

182 logger.warning(f"Unable to determine format: {img}") 

183 return False # pragma: no cover 

184 self.data = data 

185 super().save() 

186 return True 

187 

188 def save( 

189 self, force_insert=False, force_update=False, using=None, update_fields=None 

190 ): 

191 """ 

192 Override save from parent, taking care about the image 

193 """ 

194 # Use PIL to read the file format 

195 try: 

196 img = Image.open(BytesIO(self.data)) 

197 except Exception as exc: # pylint: disable=broad-except 

198 # For debugging only 

199 logger.error(f"Exception caught in Photo.save(): {exc}") 

200 return False 

201 self.format = file_format(img.format) 

202 if not self.format: 

203 logger.error("Format not recognized") 

204 return False 

205 return super().save( 

206 force_insert=force_insert, 

207 force_update=force_update, 

208 using=using, 

209 update_fields=update_fields, 

210 ) 

211 

212 def perform_crop(self, request, dimensions, email, openid): 

213 """ 

214 Helper to crop the image 

215 """ 

216 if request.user.photo_set.count() == 1: 

217 # This is the first photo, assign to all confirmed addresses 

218 for addr in request.user.confirmedemail_set.all(): 

219 addr.photo = self 

220 addr.save() 

221 

222 for addr in request.user.confirmedopenid_set.all(): 

223 addr.photo = self 

224 addr.save() 

225 

226 if email: 

227 # Explicitly asked 

228 email.photo = self 

229 email.save() 

230 

231 if openid: 

232 # Explicitly asked 

233 openid.photo = self 

234 openid.save() 

235 

236 # Do the real work cropping 

237 img = Image.open(BytesIO(self.data)) 

238 

239 # This should be anyway checked during save... 

240 dimensions["a"], dimensions["b"] = img.size # pylint: disable=invalid-name 

241 if dimensions["a"] > MAX_PIXELS or dimensions["b"] > MAX_PIXELS: 

242 messages.error( 

243 request, 

244 _( 

245 "Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s" 

246 % { 

247 "max_pixels": MAX_PIXELS, 

248 } 

249 ), 

250 ) 

251 return HttpResponseRedirect(reverse_lazy("profile")) 

252 

253 if dimensions["w"] == 0 and dimensions["h"] == 0: 

254 dimensions["w"], dimensions["h"] = dimensions["a"], dimensions["b"] 

255 min_from_w_h = min(dimensions["w"], dimensions["h"]) 

256 dimensions["w"], dimensions["h"] = min_from_w_h, min_from_w_h 

257 elif ( 

258 (dimensions["w"] < 0) 

259 or ((dimensions["x"] + dimensions["w"]) > dimensions["a"]) 

260 or (dimensions["h"] < 0) 

261 or ((dimensions["y"] + dimensions["h"]) > dimensions["b"]) 

262 ): 

263 messages.error(request, _("Crop outside of original image bounding box")) 

264 return HttpResponseRedirect(reverse_lazy("profile")) 

265 

266 cropped = img.crop( 

267 ( 

268 dimensions["x"], 

269 dimensions["y"], 

270 dimensions["x"] + dimensions["w"], 

271 dimensions["y"] + dimensions["h"], 

272 ) 

273 ) 

274 # cropped.load() 

275 # Resize the image only if it's larger than the specified max width. 

276 cropped_w, cropped_h = cropped.size 

277 max_w = AVATAR_MAX_SIZE 

278 if cropped_w > max_w or cropped_h > max_w: 

279 cropped = cropped.resize((max_w, max_w), Image.LANCZOS) 

280 

281 data = BytesIO() 

282 cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY) 

283 data.seek(0) 

284 

285 # Overwrite the existing image 

286 self.data = data.read() 

287 self.save() 

288 

289 return HttpResponseRedirect(reverse_lazy("profile")) 

290 

291 def __str__(self): 

292 return "%s (%i) from %s" % (self.format, self.pk or 0, self.user) 

293 

294 

295# pylint: disable=too-few-public-methods 

296class ConfirmedEmailManager(models.Manager): 

297 """ 

298 Manager for our confirmed email addresses model 

299 """ 

300 

301 @staticmethod 

302 def create_confirmed_email(user, email_address, is_logged_in): 

303 """ 

304 Helper method to create confirmed email address 

305 """ 

306 confirmed = ConfirmedEmail() 

307 confirmed.user = user 

308 confirmed.ip_address = "0.0.0.0" 

309 confirmed.email = email_address 

310 confirmed.save() 

311 

312 external_photos = [] 

313 if is_logged_in: 

314 if gravatar := get_gravatar_photo(confirmed.email): 

315 external_photos.append(gravatar) 

316 

317 return (confirmed.pk, external_photos) 

318 

319 

320class ConfirmedEmail(BaseAccountModel): 

321 """ 

322 Model holding our confirmed email addresses, as well as the relation 

323 to the assigned photo 

324 """ 

325 

326 email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL) 

327 photo = models.ForeignKey( 

328 Photo, 

329 related_name="emails", 

330 blank=True, 

331 null=True, 

332 on_delete=models.deletion.SET_NULL, 

333 ) 

334 # Alternative assignment - use Bluesky handle 

335 bluesky_handle = models.CharField(max_length=256, null=True, blank=True) 

336 digest = models.CharField(max_length=32) 

337 digest_sha256 = models.CharField(max_length=64) 

338 objects = ConfirmedEmailManager() 

339 access_count = models.BigIntegerField(default=0, editable=False) 

340 

341 class Meta: # pylint: disable=too-few-public-methods 

342 """ 

343 Class attributes 

344 """ 

345 

346 verbose_name = _("confirmed email") 

347 verbose_name_plural = _("confirmed emails") 

348 indexes = [ 

349 models.Index(fields=["digest"], name="idx_cemail_digest"), 

350 models.Index(fields=["digest_sha256"], name="idx_cemail_digest_sha256"), 

351 models.Index(fields=["access_count"], name="idx_cemail_access_count"), 

352 models.Index(fields=["bluesky_handle"], name="idx_cemail_bluesky_handle"), 

353 models.Index( 

354 fields=["user_id", "access_count"], 

355 name="idx_cemail_user_access", 

356 ), 

357 models.Index( 

358 fields=["photo_id", "access_count"], 

359 name="idx_cemail_photo_access", 

360 ), 

361 ] 

362 

363 def set_photo(self, photo): 

364 """ 

365 Helper method to set photo 

366 """ 

367 self.photo = photo 

368 self.save() 

369 

370 def set_bluesky_handle(self, handle): 

371 """ 

372 Helper method to set Bluesky handle 

373 """ 

374 

375 bs = Bluesky() 

376 handle = bs.normalize_handle(handle) 

377 avatar = bs.get_profile(handle) 

378 if not avatar: 

379 raise ValueError("Invalid Bluesky handle") 

380 self.bluesky_handle = handle 

381 self.save() 

382 

383 def save( 

384 self, force_insert=False, force_update=False, using=None, update_fields=None 

385 ): 

386 """ 

387 Override save from parent, add digest 

388 """ 

389 self.digest = hashlib.md5( 

390 self.email.strip().lower().encode("utf-8") 

391 ).hexdigest() 

392 self.digest_sha256 = hashlib.sha256( 

393 self.email.strip().lower().encode("utf-8") 

394 ).hexdigest() 

395 

396 # We need to manually expire the page caches 

397 # TODO: Verify this works as expected 

398 # First check if we already have an ID 

399 if self.pk: 

400 cache_url = reverse_lazy( 

401 "assign_photo_email", kwargs={"email_id": int(self.pk)} 

402 ) 

403 

404 cache_key = f"views.decorators.cache.cache_page.{quote(str(cache_url))}" 

405 try: 

406 if cache.has_key(cache_key): 

407 cache.delete(cache_key) 

408 logger.debug("Successfully cleaned up cached page: %s" % cache_key) 

409 except Exception as exc: 

410 logger.warning( 

411 "Failed to clean up cached page {}: {}".format(cache_key, exc) 

412 ) 

413 

414 # Invalidate Bluesky avatar URL cache if bluesky_handle changed 

415 if hasattr(self, "bluesky_handle") and self.bluesky_handle: 

416 try: 

417 cache.delete(self.bluesky_handle) 

418 logger.debug( 

419 "Successfully cleaned up Bluesky avatar cache for handle: %s" 

420 % self.bluesky_handle 

421 ) 

422 except Exception as exc: 

423 logger.warning( 

424 "Failed to clean up Bluesky avatar cache for handle %s: %s" 

425 % (self.bluesky_handle, exc) 

426 ) 

427 

428 return super().save( 

429 force_insert=force_insert, 

430 force_update=force_update, 

431 using=using, 

432 update_fields=update_fields, 

433 ) 

434 

435 def __str__(self): 

436 return "%s (%i) from %s" % (self.email, self.pk, self.user) 

437 

438 

439class UnconfirmedEmail(BaseAccountModel): 

440 """ 

441 Model holding unconfirmed email addresses as well as the verification key 

442 """ 

443 

444 email = models.EmailField(max_length=MAX_LENGTH_EMAIL) 

445 verification_key = models.CharField(max_length=64) 

446 last_send_date = models.DateTimeField(null=True, blank=True) 

447 last_status = models.TextField(max_length=2047, null=True, blank=True) 

448 

449 class Meta: # pylint: disable=too-few-public-methods 

450 """ 

451 Class attributes 

452 """ 

453 

454 verbose_name = _("unconfirmed email") 

455 verbose_name_plural = _("unconfirmed emails") 

456 

457 def save( 

458 self, force_insert=False, force_update=False, using=None, update_fields=None 

459 ): 

460 if not self.verification_key: 

461 hash_object = hashlib.new("sha256") 

462 hash_object.update( 

463 urandom(1024) 

464 + self.user.username.encode("utf-8") # pylint: disable=no-member 

465 ) # pylint: disable=no-member 

466 self.verification_key = hash_object.hexdigest() 

467 super().save( 

468 force_insert=force_insert, 

469 force_update=force_update, 

470 using=using, 

471 update_fields=update_fields, 

472 ) 

473 

474 def send_confirmation_mail(self, url=SECURE_BASE_URL): 

475 """ 

476 Send confirmation mail to that mail address 

477 """ 

478 link = url + reverse( 

479 "confirm_email", kwargs={"verification_key": self.verification_key} 

480 ) 

481 email_subject = _("Confirm your email address on %s") % SITE_NAME 

482 email_body = render_to_string( 

483 "email_confirmation.txt", 

484 { 

485 "verification_link": link, 

486 "site_name": SITE_NAME, 

487 }, 

488 ) 

489 self.last_send_date = timezone.now() 

490 self.last_status = "OK" 

491 # if settings.DEBUG: 

492 # print('DEBUG: %s' % link) 

493 try: 

494 send_mail(email_subject, email_body, DEFAULT_FROM_EMAIL, [self.email]) 

495 except Exception as e: 

496 self.last_status = f"{e}" 

497 self.save() 

498 return True 

499 

500 def __str__(self): 

501 return "%s (%i) from %s" % (self.email, self.pk, self.user) 

502 

503 

504class UnconfirmedOpenId(BaseAccountModel): 

505 """ 

506 Model holding unconfirmed OpenIDs 

507 """ 

508 

509 openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL) 

510 

511 class Meta: # pylint: disable=too-few-public-methods 

512 """ 

513 Meta class 

514 """ 

515 

516 verbose_name = _("unconfirmed OpenID") 

517 verbose_name_plural = "unconfirmed_OpenIDs" 

518 

519 def __str__(self): 

520 return "%s (%i) from %s" % (self.openid, self.pk, self.user) 

521 

522 

523class ConfirmedOpenId(BaseAccountModel): 

524 """ 

525 Model holding confirmed OpenIDs, as well as the relation to 

526 the assigned photo 

527 """ 

528 

529 openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL) 

530 photo = models.ForeignKey( 

531 Photo, 

532 related_name="openids", 

533 blank=True, 

534 null=True, 

535 on_delete=models.deletion.SET_NULL, 

536 ) 

537 # http://<id>/ base version - http w/ trailing slash 

538 digest = models.CharField(max_length=64) 

539 # http://<id> - http w/o trailing slash 

540 alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None) 

541 # https://<id>/ - https w/ trailing slash 

542 alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None) 

543 # https://<id> - https w/o trailing slash 

544 alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None) 

545 # Alternative assignment - use Bluesky handle 

546 bluesky_handle = models.CharField(max_length=256, null=True, blank=True) 

547 

548 access_count = models.BigIntegerField(default=0, editable=False) 

549 

550 class Meta: # pylint: disable=too-few-public-methods 

551 """ 

552 Meta class 

553 """ 

554 

555 verbose_name = _("confirmed OpenID") 

556 verbose_name_plural = _("confirmed OpenIDs") 

557 

558 def set_photo(self, photo): 

559 """ 

560 Helper method to save photo 

561 """ 

562 self.photo = photo 

563 self.save() 

564 

565 def set_bluesky_handle(self, handle): 

566 """ 

567 Helper method to set Bluesky handle 

568 """ 

569 bs = Bluesky() 

570 handle = bs.normalize_handle(handle) 

571 avatar = bs.get_profile(handle) 

572 if not avatar: 

573 raise ValueError("Invalid Bluesky handle") 

574 self.bluesky_handle = handle 

575 self.save() 

576 

577 def save( 

578 self, force_insert=False, force_update=False, using=None, update_fields=None 

579 ): 

580 url = urlsplit(self.openid) 

581 if url.username: # pragma: no cover 

582 password = url.password or "" 

583 netloc = f"{url.username}:{password}@{url.hostname}" 

584 else: 

585 netloc = url.hostname 

586 lowercase_url = urlunsplit( 

587 (url.scheme.lower(), netloc, url.path, url.query, url.fragment) 

588 ) 

589 self.openid = lowercase_url 

590 

591 self.digest = hashlib.sha256( 

592 openid_variations(lowercase_url)[0].encode("utf-8") 

593 ).hexdigest() 

594 self.alt_digest1 = hashlib.sha256( 

595 openid_variations(lowercase_url)[1].encode("utf-8") 

596 ).hexdigest() 

597 self.alt_digest2 = hashlib.sha256( 

598 openid_variations(lowercase_url)[2].encode("utf-8") 

599 ).hexdigest() 

600 self.alt_digest3 = hashlib.sha256( 

601 openid_variations(lowercase_url)[3].encode("utf-8") 

602 ).hexdigest() 

603 

604 # Invalidate page caches and Bluesky avatar cache 

605 if self.pk: 

606 # Invalidate assign_photo_openid page cache 

607 cache_url = reverse_lazy( 

608 "assign_photo_openid", kwargs={"openid_id": int(self.pk)} 

609 ) 

610 cache_key = f"views.decorators.cache.cache_page.{quote(str(cache_url))}" 

611 try: 

612 if cache.has_key(cache_key): 

613 cache.delete(cache_key) 

614 logger.debug("Successfully cleaned up cached page: %s" % cache_key) 

615 except Exception as exc: 

616 logger.warning( 

617 "Failed to clean up cached page {}: {}".format(cache_key, exc) 

618 ) 

619 

620 # Invalidate Bluesky avatar URL cache if bluesky_handle exists 

621 if hasattr(self, "bluesky_handle") and self.bluesky_handle: 

622 try: 

623 cache.delete(self.bluesky_handle) 

624 logger.debug( 

625 "Successfully cleaned up Bluesky avatar cache for handle: %s" 

626 % self.bluesky_handle 

627 ) 

628 except Exception as exc: 

629 logger.warning( 

630 "Failed to clean up Bluesky avatar cache for handle %s: %s" 

631 % (self.bluesky_handle, exc) 

632 ) 

633 

634 return super().save( 

635 force_insert=force_insert, 

636 force_update=force_update, 

637 using=using, 

638 update_fields=update_fields, 

639 ) 

640 

641 def __str__(self): 

642 return "%s (%i) (%s)" % (self.openid, self.pk, self.user) 

643 

644 

645class OpenIDNonce(models.Model): 

646 """ 

647 Model holding OpenID Nonces 

648 See also: https://github.com/edx/django-openid-auth/ 

649 """ 

650 

651 server_url = models.CharField(max_length=255) 

652 timestamp = models.IntegerField() 

653 salt = models.CharField(max_length=128) 

654 

655 def __str__(self): 

656 return "%s (%i) (timestamp: %i)" % (self.server_url, self.pk, self.timestamp) 

657 

658 

659class OpenIDAssociation(models.Model): 

660 """ 

661 Model holding the relation/association about OpenIDs 

662 """ 

663 

664 server_url = models.TextField(max_length=2047) 

665 handle = models.CharField(max_length=255) 

666 secret = models.TextField(max_length=255) # stored base64 encoded 

667 issued = models.IntegerField() 

668 lifetime = models.IntegerField() 

669 assoc_type = models.TextField(max_length=64) 

670 

671 def __str__(self): 

672 return "%s (%i) (%s, lifetime: %i)" % ( 

673 self.server_url, 

674 self.pk, 

675 self.assoc_type, 

676 self.lifetime, 

677 ) 

678 

679 

680class DjangoOpenIDStore(OpenIDStore): 

681 """ 

682 The Python openid library needs an OpenIDStore subclass to persist data 

683 related to OpenID authentications. This one uses our Django models. 

684 """ 

685 

686 @staticmethod 

687 def storeAssociation(server_url, association): # pragma: no cover 

688 """ 

689 Helper method to store associations 

690 """ 

691 assoc = OpenIDAssociation( 

692 server_url=server_url, 

693 handle=association.handle, 

694 secret=base64.encodebytes(association.secret), 

695 issued=association.issued, 

696 lifetime=association.issued, 

697 assoc_type=association.assoc_type, 

698 ) 

699 assoc.save() 

700 

701 def getAssociation(self, server_url, handle=None): # pragma: no cover 

702 """ 

703 Helper method to get associations 

704 """ 

705 assocs = [] 

706 if handle is not None: 

707 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member 

708 server_url=server_url, handle=handle 

709 ) 

710 else: 

711 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member 

712 server_url=server_url 

713 ) 

714 if not assocs: 

715 return None 

716 associations = [] 

717 for assoc in assocs: 

718 if isinstance(assoc.secret, str): 

719 assoc.secret = assoc.secret.split("b'")[1].split("'")[0] 

720 assoc.secret = bytes(assoc.secret, "utf-8") 

721 association = OIDAssociation( 

722 assoc.handle, 

723 base64.decodebytes(assoc.secret), 

724 assoc.issued, 

725 assoc.lifetime, 

726 assoc.assoc_type, 

727 ) 

728 expires = 0 

729 try: 

730 # pylint: disable=no-member 

731 expires = association.getExpiresIn() 

732 except AttributeError: 

733 expires = association.expiresIn 

734 if expires == 0: 

735 self.removeAssociation(server_url, assoc.handle) 

736 else: 

737 associations.append((association.issued, association)) 

738 return associations[-1][1] if associations else None 

739 

740 @staticmethod 

741 def removeAssociation(server_url, handle): # pragma: no cover 

742 """ 

743 Helper method to remove associations 

744 """ 

745 assocs = list( 

746 OpenIDAssociation.objects.filter( # pylint: disable=no-member 

747 server_url=server_url, handle=handle 

748 ) 

749 ) 

750 assocs_exist = len(assocs) > 0 

751 for assoc in assocs: 

752 assoc.delete() 

753 return assocs_exist 

754 

755 @staticmethod 

756 def useNonce(server_url, timestamp, salt): # pragma: no cover 

757 """ 

758 Helper method to 'use' nonces 

759 """ 

760 # Has nonce expired? 

761 if abs(timestamp - time.time()) > oidnonce.SKEW: 

762 return False 

763 try: 

764 nonce = OpenIDNonce.objects.get( # pylint: disable=no-member 

765 server_url__exact=server_url, 

766 timestamp__exact=timestamp, 

767 salt__exact=salt, 

768 ) 

769 except ObjectDoesNotExist: 

770 nonce = OpenIDNonce.objects.create( # pylint: disable=no-member 

771 server_url=server_url, timestamp=timestamp, salt=salt 

772 ) 

773 return True 

774 nonce.delete() 

775 return False 

776 

777 @staticmethod 

778 def cleanupNonces(): # pragma: no cover 

779 """ 

780 Helper method to cleanup nonces 

781 """ 

782 timestamp = int(time.time()) - oidnonce.SKEW 

783 # pylint: disable=no-member 

784 OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete() 

785 

786 @staticmethod 

787 def cleanupAssociations(): # pragma: no cover 

788 """ 

789 Helper method to cleanup associations 

790 """ 

791 OpenIDAssociation.objects.extra( 

792 where=[f"issued + lifetimeint < ({time.time()})"] 

793 ).delete()