Coverage for ivatar/ivataraccount/models.py: 86%

307 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-02 00:07 +0000

1""" 

2Our models for ivatar.ivataraccount 

3""" 

4 

5import base64 

6import hashlib 

7import time 

8from io import BytesIO 

9from os import urandom 

10from urllib.error import HTTPError, URLError 

11from ivatar.utils import urlopen, Bluesky 

12from urllib.parse import urlsplit, urlunsplit, quote 

13import logging 

14 

15from PIL import Image 

16from django.contrib.auth.models import User 

17from django.contrib import messages 

18from django.db import models 

19from django.utils import timezone 

20from django.http import HttpResponseRedirect 

21from django.urls import reverse_lazy, reverse 

22from django.utils.translation import gettext_lazy as _ 

23from django.core.cache import cache 

24from django.core.exceptions import ObjectDoesNotExist 

25from django.core.mail import send_mail 

26from django.template.loader import render_to_string 

27from openid.association import Association as OIDAssociation 

28from openid.store import nonce as oidnonce 

29from openid.store.interface import OpenIDStore 

30 

31from libravatar import libravatar_url 

32 

33from ivatar.settings import MAX_LENGTH_EMAIL 

34from ivatar.settings import MAX_PIXELS, AVATAR_MAX_SIZE, JPEG_QUALITY 

35from ivatar.settings import MAX_LENGTH_URL 

36from ivatar.settings import SECURE_BASE_URL, SITE_NAME, DEFAULT_FROM_EMAIL 

37from ivatar.utils import openid_variations 

38from .gravatar import get_photo as get_gravatar_photo 

39 

40# Initialize logger 

41logger = logging.getLogger("ivatar") 

42 

43 

44def file_format(image_type): 

45 """ 

46 Helper method returning a short image type 

47 """ 

48 if image_type in ("JPEG", "MPO"): 

49 return "jpg" 

50 elif image_type == "PNG": 

51 return "png" 

52 elif image_type == "GIF": 

53 return "gif" 

54 elif image_type == "WEBP": 

55 return "webp" 

56 return None 

57 

58 

59def pil_format(image_type): 

60 """ 

61 Helper method returning the 'encoder name' for PIL 

62 """ 

63 if image_type in ("jpg", "jpeg", "mpo"): 

64 return "JPEG" 

65 elif image_type == "png": 

66 return "PNG" 

67 elif image_type == "gif": 

68 return "GIF" 

69 elif image_type == "webp": 

70 return "WEBP" 

71 

72 logger.info("Unsupported file format: %s", image_type) 

73 return None 

74 

75 

76class UserPreference(models.Model): 

77 """ 

78 Holds the user users preferences 

79 """ 

80 

81 THEMES = ( 

82 ("default", "Default theme"), 

83 ("clime", "climes theme"), 

84 ("green", "green theme"), 

85 ("red", "red theme"), 

86 ) 

87 

88 theme = models.CharField( 

89 max_length=10, 

90 choices=THEMES, 

91 default="default", 

92 ) 

93 

94 user = models.OneToOneField( 

95 User, 

96 on_delete=models.deletion.CASCADE, 

97 primary_key=True, 

98 ) 

99 

100 def __str__(self): 

101 return "Preference (%i) for %s" % (self.pk, self.user) 

102 

103 

104class BaseAccountModel(models.Model): 

105 """ 

106 Base, abstract model, holding fields we use in all cases 

107 """ 

108 

109 user = models.ForeignKey( 

110 User, 

111 on_delete=models.deletion.CASCADE, 

112 ) 

113 ip_address = models.GenericIPAddressField(unpack_ipv4=True, null=True) 

114 add_date = models.DateTimeField(default=timezone.now) 

115 

116 class Meta: # pylint: disable=too-few-public-methods 

117 """ 

118 Class attributes 

119 """ 

120 

121 abstract = True 

122 

123 

124class Photo(BaseAccountModel): 

125 """ 

126 Model holding the photos and information about them 

127 """ 

128 

129 ip_address = models.GenericIPAddressField(unpack_ipv4=True) 

130 data = models.BinaryField() 

131 format = models.CharField(max_length=4) 

132 access_count = models.BigIntegerField(default=0, editable=False) 

133 

134 class Meta: # pylint: disable=too-few-public-methods 

135 """ 

136 Class attributes 

137 """ 

138 

139 verbose_name = _("photo") 

140 verbose_name_plural = _("photos") 

141 indexes = [ 

142 models.Index(fields=["format"], name="idx_photo_format"), 

143 models.Index(fields=["access_count"], name="idx_photo_access_count"), 

144 models.Index(fields=["user_id", "format"], name="idx_photo_user_format"), 

145 ] 

146 

147 def import_image(self, service_name, email_address): 

148 """ 

149 Allow to import image from other (eg. Gravatar) service 

150 """ 

151 image_url = False 

152 

153 if service_name == "Gravatar": 

154 if gravatar := get_gravatar_photo(email_address): 

155 image_url = gravatar["image_url"] 

156 

157 if service_name == "Libravatar": 

158 image_url = libravatar_url(email_address, size=AVATAR_MAX_SIZE) 

159 

160 if not image_url: 

161 return False # pragma: no cover 

162 try: 

163 image = urlopen(image_url) 

164 except HTTPError as exc: 

165 logger.warning( 

166 f"{service_name} import failed with an HTTP error: {exc.code}" 

167 ) 

168 return False 

169 except URLError as exc: 

170 logger.warning(f"{service_name} import failed: {exc.reason}") 

171 return False 

172 data = image.read() 

173 

174 try: 

175 img = Image.open(BytesIO(data)) 

176 # How am I supposed to test this? 

177 except ValueError: # pragma: no cover 

178 return False # pragma: no cover 

179 

180 self.format = file_format(img.format) 

181 if not self.format: 

182 logger.warning(f"Unable to determine format: {img}") 

183 return False # pragma: no cover 

184 self.data = data 

185 super().save() 

186 return True 

187 

188 def save( 

189 self, force_insert=False, force_update=False, using=None, update_fields=None 

190 ): 

191 """ 

192 Override save from parent, taking care about the image 

193 """ 

194 # Use PIL to read the file format 

195 try: 

196 img = Image.open(BytesIO(self.data)) 

197 except Exception as exc: # pylint: disable=broad-except 

198 # For debugging only 

199 logger.error(f"Exception caught in Photo.save(): {exc}") 

200 return False 

201 self.format = file_format(img.format) 

202 if not self.format: 

203 logger.error("Format not recognized") 

204 return False 

205 return super().save(force_insert, force_update, using, update_fields) 

206 

207 def perform_crop(self, request, dimensions, email, openid): 

208 """ 

209 Helper to crop the image 

210 """ 

211 if request.user.photo_set.count() == 1: 

212 # This is the first photo, assign to all confirmed addresses 

213 for addr in request.user.confirmedemail_set.all(): 

214 addr.photo = self 

215 addr.save() 

216 

217 for addr in request.user.confirmedopenid_set.all(): 

218 addr.photo = self 

219 addr.save() 

220 

221 if email: 

222 # Explicitly asked 

223 email.photo = self 

224 email.save() 

225 

226 if openid: 

227 # Explicitly asked 

228 openid.photo = self 

229 openid.save() 

230 

231 # Do the real work cropping 

232 img = Image.open(BytesIO(self.data)) 

233 

234 # This should be anyway checked during save... 

235 dimensions["a"], dimensions["b"] = img.size # pylint: disable=invalid-name 

236 if dimensions["a"] > MAX_PIXELS or dimensions["b"] > MAX_PIXELS: 

237 messages.error( 

238 request, 

239 _( 

240 "Image dimensions are too big (max: %(max_pixels)s x %(max_pixels)s" 

241 % { 

242 "max_pixels": MAX_PIXELS, 

243 } 

244 ), 

245 ) 

246 return HttpResponseRedirect(reverse_lazy("profile")) 

247 

248 if dimensions["w"] == 0 and dimensions["h"] == 0: 

249 dimensions["w"], dimensions["h"] = dimensions["a"], dimensions["b"] 

250 min_from_w_h = min(dimensions["w"], dimensions["h"]) 

251 dimensions["w"], dimensions["h"] = min_from_w_h, min_from_w_h 

252 elif ( 

253 (dimensions["w"] < 0) 

254 or ((dimensions["x"] + dimensions["w"]) > dimensions["a"]) 

255 or (dimensions["h"] < 0) 

256 or ((dimensions["y"] + dimensions["h"]) > dimensions["b"]) 

257 ): 

258 messages.error(request, _("Crop outside of original image bounding box")) 

259 return HttpResponseRedirect(reverse_lazy("profile")) 

260 

261 cropped = img.crop( 

262 ( 

263 dimensions["x"], 

264 dimensions["y"], 

265 dimensions["x"] + dimensions["w"], 

266 dimensions["y"] + dimensions["h"], 

267 ) 

268 ) 

269 # cropped.load() 

270 # Resize the image only if it's larger than the specified max width. 

271 cropped_w, cropped_h = cropped.size 

272 max_w = AVATAR_MAX_SIZE 

273 if cropped_w > max_w or cropped_h > max_w: 

274 cropped = cropped.resize((max_w, max_w), Image.LANCZOS) 

275 

276 data = BytesIO() 

277 cropped.save(data, pil_format(self.format), quality=JPEG_QUALITY) 

278 data.seek(0) 

279 

280 # Overwrite the existing image 

281 self.data = data.read() 

282 self.save() 

283 

284 return HttpResponseRedirect(reverse_lazy("profile")) 

285 

286 def __str__(self): 

287 return "%s (%i) from %s" % (self.format, self.pk or 0, self.user) 

288 

289 

290# pylint: disable=too-few-public-methods 

291class ConfirmedEmailManager(models.Manager): 

292 """ 

293 Manager for our confirmed email addresses model 

294 """ 

295 

296 @staticmethod 

297 def create_confirmed_email(user, email_address, is_logged_in): 

298 """ 

299 Helper method to create confirmed email address 

300 """ 

301 confirmed = ConfirmedEmail() 

302 confirmed.user = user 

303 confirmed.ip_address = "0.0.0.0" 

304 confirmed.email = email_address 

305 confirmed.save() 

306 

307 external_photos = [] 

308 if is_logged_in: 

309 if gravatar := get_gravatar_photo(confirmed.email): 

310 external_photos.append(gravatar) 

311 

312 return (confirmed.pk, external_photos) 

313 

314 

315class ConfirmedEmail(BaseAccountModel): 

316 """ 

317 Model holding our confirmed email addresses, as well as the relation 

318 to the assigned photo 

319 """ 

320 

321 email = models.EmailField(unique=True, max_length=MAX_LENGTH_EMAIL) 

322 photo = models.ForeignKey( 

323 Photo, 

324 related_name="emails", 

325 blank=True, 

326 null=True, 

327 on_delete=models.deletion.SET_NULL, 

328 ) 

329 # Alternative assignment - use Bluesky handle 

330 bluesky_handle = models.CharField(max_length=256, null=True, blank=True) 

331 digest = models.CharField(max_length=32) 

332 digest_sha256 = models.CharField(max_length=64) 

333 objects = ConfirmedEmailManager() 

334 access_count = models.BigIntegerField(default=0, editable=False) 

335 

336 class Meta: # pylint: disable=too-few-public-methods 

337 """ 

338 Class attributes 

339 """ 

340 

341 verbose_name = _("confirmed email") 

342 verbose_name_plural = _("confirmed emails") 

343 indexes = [ 

344 models.Index(fields=["digest"], name="idx_cemail_digest"), 

345 models.Index(fields=["digest_sha256"], name="idx_cemail_digest_sha256"), 

346 models.Index(fields=["access_count"], name="idx_cemail_access_count"), 

347 models.Index(fields=["bluesky_handle"], name="idx_cemail_bluesky_handle"), 

348 models.Index( 

349 fields=["user_id", "access_count"], 

350 name="idx_cemail_user_access", 

351 ), 

352 models.Index( 

353 fields=["photo_id", "access_count"], 

354 name="idx_cemail_photo_access", 

355 ), 

356 ] 

357 

358 def set_photo(self, photo): 

359 """ 

360 Helper method to set photo 

361 """ 

362 self.photo = photo 

363 self.save() 

364 

365 def set_bluesky_handle(self, handle): 

366 """ 

367 Helper method to set Bluesky handle 

368 """ 

369 

370 bs = Bluesky() 

371 handle = bs.normalize_handle(handle) 

372 avatar = bs.get_profile(handle) 

373 if not avatar: 

374 raise ValueError("Invalid Bluesky handle") 

375 self.bluesky_handle = handle 

376 self.save() 

377 

378 def save( 

379 self, force_insert=False, force_update=False, using=None, update_fields=None 

380 ): 

381 """ 

382 Override save from parent, add digest 

383 """ 

384 self.digest = hashlib.md5( 

385 self.email.strip().lower().encode("utf-8") 

386 ).hexdigest() 

387 self.digest_sha256 = hashlib.sha256( 

388 self.email.strip().lower().encode("utf-8") 

389 ).hexdigest() 

390 

391 # We need to manually expire the page caches 

392 # TODO: Verify this works as expected 

393 # First check if we already have an ID 

394 if self.pk: 

395 cache_url = reverse_lazy( 

396 "assign_photo_email", kwargs={"email_id": int(self.pk)} 

397 ) 

398 

399 cache_key = f"views.decorators.cache.cache_page.{quote(str(cache_url))}" 

400 try: 

401 if cache.has_key(cache_key): 

402 cache.delete(cache_key) 

403 logger.debug("Successfully cleaned up cached page: %s" % cache_key) 

404 except Exception as exc: 

405 logger.warning( 

406 "Failed to clean up cached page {}: {}".format(cache_key, exc) 

407 ) 

408 

409 # Invalidate Bluesky avatar URL cache if bluesky_handle changed 

410 if hasattr(self, "bluesky_handle") and self.bluesky_handle: 

411 try: 

412 cache.delete(self.bluesky_handle) 

413 logger.debug( 

414 "Successfully cleaned up Bluesky avatar cache for handle: %s" 

415 % self.bluesky_handle 

416 ) 

417 except Exception as exc: 

418 logger.warning( 

419 "Failed to clean up Bluesky avatar cache for handle %s: %s" 

420 % (self.bluesky_handle, exc) 

421 ) 

422 

423 return super().save(force_insert, force_update, using, update_fields) 

424 

425 def __str__(self): 

426 return "%s (%i) from %s" % (self.email, self.pk, self.user) 

427 

428 

429class UnconfirmedEmail(BaseAccountModel): 

430 """ 

431 Model holding unconfirmed email addresses as well as the verification key 

432 """ 

433 

434 email = models.EmailField(max_length=MAX_LENGTH_EMAIL) 

435 verification_key = models.CharField(max_length=64) 

436 last_send_date = models.DateTimeField(null=True, blank=True) 

437 last_status = models.TextField(max_length=2047, null=True, blank=True) 

438 

439 class Meta: # pylint: disable=too-few-public-methods 

440 """ 

441 Class attributes 

442 """ 

443 

444 verbose_name = _("unconfirmed email") 

445 verbose_name_plural = _("unconfirmed emails") 

446 

447 def save( 

448 self, force_insert=False, force_update=False, using=None, update_fields=None 

449 ): 

450 if not self.verification_key: 

451 hash_object = hashlib.new("sha256") 

452 hash_object.update( 

453 urandom(1024) 

454 + self.user.username.encode("utf-8") # pylint: disable=no-member 

455 ) # pylint: disable=no-member 

456 self.verification_key = hash_object.hexdigest() 

457 super().save(force_insert, force_update, using, update_fields) 

458 

459 def send_confirmation_mail(self, url=SECURE_BASE_URL): 

460 """ 

461 Send confirmation mail to that mail address 

462 """ 

463 link = url + reverse( 

464 "confirm_email", kwargs={"verification_key": self.verification_key} 

465 ) 

466 email_subject = _("Confirm your email address on %s") % SITE_NAME 

467 email_body = render_to_string( 

468 "email_confirmation.txt", 

469 { 

470 "verification_link": link, 

471 "site_name": SITE_NAME, 

472 }, 

473 ) 

474 self.last_send_date = timezone.now() 

475 self.last_status = "OK" 

476 # if settings.DEBUG: 

477 # print('DEBUG: %s' % link) 

478 try: 

479 send_mail(email_subject, email_body, DEFAULT_FROM_EMAIL, [self.email]) 

480 except Exception as e: 

481 self.last_status = f"{e}" 

482 self.save() 

483 return True 

484 

485 def __str__(self): 

486 return "%s (%i) from %s" % (self.email, self.pk, self.user) 

487 

488 

489class UnconfirmedOpenId(BaseAccountModel): 

490 """ 

491 Model holding unconfirmed OpenIDs 

492 """ 

493 

494 openid = models.URLField(unique=False, max_length=MAX_LENGTH_URL) 

495 

496 class Meta: # pylint: disable=too-few-public-methods 

497 """ 

498 Meta class 

499 """ 

500 

501 verbose_name = _("unconfirmed OpenID") 

502 verbose_name_plural = "unconfirmed_OpenIDs" 

503 

504 def __str__(self): 

505 return "%s (%i) from %s" % (self.openid, self.pk, self.user) 

506 

507 

508class ConfirmedOpenId(BaseAccountModel): 

509 """ 

510 Model holding confirmed OpenIDs, as well as the relation to 

511 the assigned photo 

512 """ 

513 

514 openid = models.URLField(unique=True, max_length=MAX_LENGTH_URL) 

515 photo = models.ForeignKey( 

516 Photo, 

517 related_name="openids", 

518 blank=True, 

519 null=True, 

520 on_delete=models.deletion.SET_NULL, 

521 ) 

522 # http://<id>/ base version - http w/ trailing slash 

523 digest = models.CharField(max_length=64) 

524 # http://<id> - http w/o trailing slash 

525 alt_digest1 = models.CharField(max_length=64, null=True, blank=True, default=None) 

526 # https://<id>/ - https w/ trailing slash 

527 alt_digest2 = models.CharField(max_length=64, null=True, blank=True, default=None) 

528 # https://<id> - https w/o trailing slash 

529 alt_digest3 = models.CharField(max_length=64, null=True, blank=True, default=None) 

530 # Alternative assignment - use Bluesky handle 

531 bluesky_handle = models.CharField(max_length=256, null=True, blank=True) 

532 

533 access_count = models.BigIntegerField(default=0, editable=False) 

534 

535 class Meta: # pylint: disable=too-few-public-methods 

536 """ 

537 Meta class 

538 """ 

539 

540 verbose_name = _("confirmed OpenID") 

541 verbose_name_plural = _("confirmed OpenIDs") 

542 

543 def set_photo(self, photo): 

544 """ 

545 Helper method to save photo 

546 """ 

547 self.photo = photo 

548 self.save() 

549 

550 def set_bluesky_handle(self, handle): 

551 """ 

552 Helper method to set Bluesky handle 

553 """ 

554 bs = Bluesky() 

555 handle = bs.normalize_handle(handle) 

556 avatar = bs.get_profile(handle) 

557 if not avatar: 

558 raise ValueError("Invalid Bluesky handle") 

559 self.bluesky_handle = handle 

560 self.save() 

561 

562 def save( 

563 self, force_insert=False, force_update=False, using=None, update_fields=None 

564 ): 

565 url = urlsplit(self.openid) 

566 if url.username: # pragma: no cover 

567 password = url.password or "" 

568 netloc = f"{url.username}:{password}@{url.hostname}" 

569 else: 

570 netloc = url.hostname 

571 lowercase_url = urlunsplit( 

572 (url.scheme.lower(), netloc, url.path, url.query, url.fragment) 

573 ) 

574 self.openid = lowercase_url 

575 

576 self.digest = hashlib.sha256( 

577 openid_variations(lowercase_url)[0].encode("utf-8") 

578 ).hexdigest() 

579 self.alt_digest1 = hashlib.sha256( 

580 openid_variations(lowercase_url)[1].encode("utf-8") 

581 ).hexdigest() 

582 self.alt_digest2 = hashlib.sha256( 

583 openid_variations(lowercase_url)[2].encode("utf-8") 

584 ).hexdigest() 

585 self.alt_digest3 = hashlib.sha256( 

586 openid_variations(lowercase_url)[3].encode("utf-8") 

587 ).hexdigest() 

588 

589 # Invalidate page caches and Bluesky avatar cache 

590 if self.pk: 

591 # Invalidate assign_photo_openid page cache 

592 cache_url = reverse_lazy( 

593 "assign_photo_openid", kwargs={"openid_id": int(self.pk)} 

594 ) 

595 cache_key = f"views.decorators.cache.cache_page.{quote(str(cache_url))}" 

596 try: 

597 if cache.has_key(cache_key): 

598 cache.delete(cache_key) 

599 logger.debug("Successfully cleaned up cached page: %s" % cache_key) 

600 except Exception as exc: 

601 logger.warning( 

602 "Failed to clean up cached page {}: {}".format(cache_key, exc) 

603 ) 

604 

605 # Invalidate Bluesky avatar URL cache if bluesky_handle exists 

606 if hasattr(self, "bluesky_handle") and self.bluesky_handle: 

607 try: 

608 cache.delete(self.bluesky_handle) 

609 logger.debug( 

610 "Successfully cleaned up Bluesky avatar cache for handle: %s" 

611 % self.bluesky_handle 

612 ) 

613 except Exception as exc: 

614 logger.warning( 

615 "Failed to clean up Bluesky avatar cache for handle %s: %s" 

616 % (self.bluesky_handle, exc) 

617 ) 

618 

619 return super().save(force_insert, force_update, using, update_fields) 

620 

621 def __str__(self): 

622 return "%s (%i) (%s)" % (self.openid, self.pk, self.user) 

623 

624 

625class OpenIDNonce(models.Model): 

626 """ 

627 Model holding OpenID Nonces 

628 See also: https://github.com/edx/django-openid-auth/ 

629 """ 

630 

631 server_url = models.CharField(max_length=255) 

632 timestamp = models.IntegerField() 

633 salt = models.CharField(max_length=128) 

634 

635 def __str__(self): 

636 return "%s (%i) (timestamp: %i)" % (self.server_url, self.pk, self.timestamp) 

637 

638 

639class OpenIDAssociation(models.Model): 

640 """ 

641 Model holding the relation/association about OpenIDs 

642 """ 

643 

644 server_url = models.TextField(max_length=2047) 

645 handle = models.CharField(max_length=255) 

646 secret = models.TextField(max_length=255) # stored base64 encoded 

647 issued = models.IntegerField() 

648 lifetime = models.IntegerField() 

649 assoc_type = models.TextField(max_length=64) 

650 

651 def __str__(self): 

652 return "%s (%i) (%s, lifetime: %i)" % ( 

653 self.server_url, 

654 self.pk, 

655 self.assoc_type, 

656 self.lifetime, 

657 ) 

658 

659 

660class DjangoOpenIDStore(OpenIDStore): 

661 """ 

662 The Python openid library needs an OpenIDStore subclass to persist data 

663 related to OpenID authentications. This one uses our Django models. 

664 """ 

665 

666 @staticmethod 

667 def storeAssociation(server_url, association): # pragma: no cover 

668 """ 

669 Helper method to store associations 

670 """ 

671 assoc = OpenIDAssociation( 

672 server_url=server_url, 

673 handle=association.handle, 

674 secret=base64.encodebytes(association.secret), 

675 issued=association.issued, 

676 lifetime=association.issued, 

677 assoc_type=association.assoc_type, 

678 ) 

679 assoc.save() 

680 

681 def getAssociation(self, server_url, handle=None): # pragma: no cover 

682 """ 

683 Helper method to get associations 

684 """ 

685 assocs = [] 

686 if handle is not None: 

687 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member 

688 server_url=server_url, handle=handle 

689 ) 

690 else: 

691 assocs = OpenIDAssociation.objects.filter( # pylint: disable=no-member 

692 server_url=server_url 

693 ) 

694 if not assocs: 

695 return None 

696 associations = [] 

697 for assoc in assocs: 

698 if isinstance(assoc.secret, str): 

699 assoc.secret = assoc.secret.split("b'")[1].split("'")[0] 

700 assoc.secret = bytes(assoc.secret, "utf-8") 

701 association = OIDAssociation( 

702 assoc.handle, 

703 base64.decodebytes(assoc.secret), 

704 assoc.issued, 

705 assoc.lifetime, 

706 assoc.assoc_type, 

707 ) 

708 expires = 0 

709 try: 

710 # pylint: disable=no-member 

711 expires = association.getExpiresIn() 

712 except AttributeError: 

713 expires = association.expiresIn 

714 if expires == 0: 

715 self.removeAssociation(server_url, assoc.handle) 

716 else: 

717 associations.append((association.issued, association)) 

718 return associations[-1][1] if associations else None 

719 

720 @staticmethod 

721 def removeAssociation(server_url, handle): # pragma: no cover 

722 """ 

723 Helper method to remove associations 

724 """ 

725 assocs = list( 

726 OpenIDAssociation.objects.filter( # pylint: disable=no-member 

727 server_url=server_url, handle=handle 

728 ) 

729 ) 

730 assocs_exist = len(assocs) > 0 

731 for assoc in assocs: 

732 assoc.delete() 

733 return assocs_exist 

734 

735 @staticmethod 

736 def useNonce(server_url, timestamp, salt): # pragma: no cover 

737 """ 

738 Helper method to 'use' nonces 

739 """ 

740 # Has nonce expired? 

741 if abs(timestamp - time.time()) > oidnonce.SKEW: 

742 return False 

743 try: 

744 nonce = OpenIDNonce.objects.get( # pylint: disable=no-member 

745 server_url__exact=server_url, 

746 timestamp__exact=timestamp, 

747 salt__exact=salt, 

748 ) 

749 except ObjectDoesNotExist: 

750 nonce = OpenIDNonce.objects.create( # pylint: disable=no-member 

751 server_url=server_url, timestamp=timestamp, salt=salt 

752 ) 

753 return True 

754 nonce.delete() 

755 return False 

756 

757 @staticmethod 

758 def cleanupNonces(): # pragma: no cover 

759 """ 

760 Helper method to cleanup nonces 

761 """ 

762 timestamp = int(time.time()) - oidnonce.SKEW 

763 # pylint: disable=no-member 

764 OpenIDNonce.objects.filter(timestamp__lt=timestamp).delete() 

765 

766 @staticmethod 

767 def cleanupAssociations(): # pragma: no cover 

768 """ 

769 Helper method to cleanup associations 

770 """ 

771 OpenIDAssociation.objects.extra( 

772 where=[f"issued + lifetimeint < ({time.time()})"] 

773 ).delete()