Coverage for ivatar/ivataraccount/views.py: 76%

687 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 00:07 +0000

1""" 

2View classes for ivatar/ivataraccount/ 

3""" 

4 

5from io import BytesIO 

6from ivatar.utils import urlopen, Bluesky 

7import base64 

8import binascii 

9import contextlib 

10from xml.sax import saxutils 

11import gzip 

12import logging 

13 

14from PIL import Image 

15 

16from django.db.models import ProtectedError 

17from django.core.exceptions import ObjectDoesNotExist 

18from django.contrib.auth.decorators import login_required 

19from django.contrib.auth.models import User 

20from django.utils.decorators import method_decorator 

21from django.contrib.messages.views import SuccessMessageMixin 

22from django.contrib import messages 

23from django.views.generic.edit import FormView, UpdateView 

24from django.views.generic.base import View, TemplateView 

25from django.views.generic.detail import DetailView 

26from django.contrib.auth import authenticate, login 

27from django.contrib.auth.forms import UserCreationForm, SetPasswordForm 

28from django.contrib.auth.views import LoginView 

29from django.contrib.auth.views import ( 

30 PasswordResetView as PasswordResetViewOriginal, 

31) 

32from django.utils.crypto import get_random_string 

33from django.utils.translation import gettext_lazy as _ 

34from django.http import HttpResponseRedirect, HttpResponse 

35from django.urls import reverse_lazy, reverse 

36from django.shortcuts import render 

37from django_openid_auth.models import UserOpenID 

38 

39from openid import oidutil 

40from openid.consumer import consumer 

41 

42from ipware import get_client_ip 

43 

44from email_validator import validate_email 

45 

46from libravatar import libravatar_url 

47from ivatar.settings import ( 

48 MAX_NUM_PHOTOS, 

49 MAX_PHOTO_SIZE, 

50 JPEG_QUALITY, 

51 AVATAR_MAX_SIZE, 

52 SOCIAL_AUTH_FEDORA_KEY, 

53) 

54from .gravatar import get_photo as get_gravatar_photo 

55 

56from .forms import AddEmailForm, UploadPhotoForm, AddOpenIDForm 

57from .forms import UpdatePreferenceForm, UploadLibravatarExportForm 

58from .forms import DeleteAccountForm 

59from .models import UnconfirmedEmail, ConfirmedEmail, Photo 

60from .models import UnconfirmedOpenId, ConfirmedOpenId, DjangoOpenIDStore 

61from .models import UserPreference 

62from .models import file_format 

63from .read_libravatar_export import read_gzdata as libravatar_read_gzdata 

64 

65# Initialize loggers 

66logger = logging.getLogger("ivatar") 

67security_logger = logging.getLogger("ivatar.security") 

68 

69# Import OpenTelemetry with graceful degradation 

70from ..telemetry_utils import ( 

71 trace_file_upload, 

72 trace_authentication, 

73 get_telemetry_metrics, 

74) 

75 

76avatar_metrics = get_telemetry_metrics() 

77 

78 

79def openid_logging(message, level=0): 

80 """ 

81 Helper method for openid logging 

82 """ 

83 # Normal messages are not that important 

84 # No need for coverage here 

85 if level > 0: # pragma: no cover 

86 logger.debug(message) 

87 

88 

89class CreateView(SuccessMessageMixin, FormView): 

90 """ 

91 View class for creating a new user 

92 """ 

93 

94 template_name = "new.html" 

95 form_class = UserCreationForm 

96 

97 @trace_authentication("user_registration") 

98 def form_valid(self, form): 

99 form.save() 

100 user = authenticate( 

101 username=form.cleaned_data["username"], 

102 password=form.cleaned_data["password1"], 

103 ) 

104 if user is not None: 

105 # If the username looks like a mail address, automagically 

106 # add it as unconfirmed mail and set it also as user's 

107 # email address 

108 with contextlib.suppress(Exception): 

109 self._extracted_from_form_valid_(form, user) 

110 login(self.request, user) 

111 pref = UserPreference.objects.create( 

112 user_id=user.pk 

113 ) # pylint: disable=no-member 

114 pref.save() 

115 return HttpResponseRedirect(reverse_lazy("profile")) 

116 return HttpResponseRedirect(reverse_lazy("login")) # pragma: no cover 

117 

118 def _extracted_from_form_valid_(self, form, user): 

119 # This will error out if it's not a valid address 

120 valid = validate_email(form.cleaned_data["username"]) 

121 user.email = valid.email 

122 user.save() 

123 # The following will also error out if it already exists 

124 unconfirmed = UnconfirmedEmail() 

125 unconfirmed.email = valid.email 

126 unconfirmed.user = user 

127 unconfirmed.save() 

128 unconfirmed.send_confirmation_mail( 

129 url=self.request.build_absolute_uri("/")[:-1] 

130 ) 

131 

132 def get(self, request, *args, **kwargs): 

133 """ 

134 Handle get for create view 

135 """ 

136 if request.user and request.user.is_authenticated: 

137 return HttpResponseRedirect(reverse_lazy("profile")) 

138 return super().get(self, request, args, kwargs) 

139 

140 

141@method_decorator(login_required, name="dispatch") 

142class PasswordSetView(SuccessMessageMixin, FormView): 

143 """ 

144 View class for changing the password 

145 """ 

146 

147 template_name = "password_change.html" 

148 form_class = SetPasswordForm 

149 success_message = _("password changed successfully - please login again") 

150 success_url = reverse_lazy("profile") 

151 

152 def get_form_kwargs(self): 

153 kwargs = super().get_form_kwargs() 

154 kwargs["user"] = self.request.user 

155 return kwargs 

156 

157 def form_valid(self, form): 

158 form.save() 

159 super().form_valid(form) 

160 return HttpResponseRedirect(reverse_lazy("login")) 

161 

162 

163@method_decorator(login_required, name="dispatch") 

164class AddEmailView(SuccessMessageMixin, FormView): 

165 """ 

166 View class for adding email addresses 

167 """ 

168 

169 template_name = "add_email.html" 

170 form_class = AddEmailForm 

171 success_url = reverse_lazy("profile") 

172 

173 def form_valid(self, form): 

174 if not form.save(self.request): 

175 return render(self.request, self.template_name, {"form": form}) 

176 

177 messages.success(self.request, _("Address added successfully")) 

178 return super().form_valid(form) 

179 

180 

181@method_decorator(login_required, name="dispatch") 

182class RemoveUnconfirmedEmailView(SuccessMessageMixin, View): 

183 """ 

184 View class for removing a unconfirmed email address 

185 """ 

186 

187 @staticmethod 

188 def post(request, *args, **kwargs): # pylint: disable=unused-argument 

189 """ 

190 Handle post request - removing unconfirmed email 

191 """ 

192 try: 

193 email = UnconfirmedEmail.objects.get( # pylint: disable=no-member 

194 user=request.user, id=kwargs["email_id"] 

195 ) 

196 email.delete() 

197 messages.success(request, _("Address removed")) 

198 except UnconfirmedEmail.DoesNotExist: # pylint: disable=no-member 

199 messages.error(request, _("Address does not exist")) 

200 return HttpResponseRedirect(reverse_lazy("profile")) 

201 

202 

203class ConfirmEmailView(SuccessMessageMixin, TemplateView): 

204 """ 

205 View class for confirming an unconfirmed email address 

206 """ 

207 

208 template_name = "email_confirmed.html" 

209 

210 def get(self, request, *args, **kwargs): 

211 # be tolerant of extra crap added by mail clients 

212 key = kwargs["verification_key"].replace(" ", "") 

213 

214 if len(key) != 64: 

215 messages.error(request, _("Verification key incorrect")) 

216 return HttpResponseRedirect(reverse_lazy("profile")) 

217 

218 try: 

219 unconfirmed = UnconfirmedEmail.objects.get( 

220 verification_key=key 

221 ) # pylint: disable=no-member 

222 except UnconfirmedEmail.DoesNotExist: # pylint: disable=no-member 

223 messages.error(request, _("Verification key does not exist")) 

224 return HttpResponseRedirect(reverse_lazy("profile")) 

225 

226 if ConfirmedEmail.objects.filter(email=unconfirmed.email).count() > 0: 

227 messages.error( 

228 request, 

229 _("This mail address has been taken already and cannot be confirmed"), 

230 ) 

231 return HttpResponseRedirect(reverse_lazy("profile")) 

232 

233 # TODO: Check for a reasonable expiration time in unconfirmed email 

234 

235 (confirmed_id, external_photos) = ConfirmedEmail.objects.create_confirmed_email( 

236 unconfirmed.user, unconfirmed.email, not request.user.is_anonymous 

237 ) 

238 

239 unconfirmed.delete() 

240 

241 # if there's a single image in this user's profile, 

242 # assign it to the new email 

243 confirmed = ConfirmedEmail.objects.get(id=confirmed_id) 

244 if confirmed.user.photo_set.count() == 1: 

245 confirmed.set_photo(confirmed.user.photo_set.first()) 

246 kwargs["photos"] = external_photos 

247 kwargs["email_id"] = confirmed_id 

248 return super().get(request, *args, **kwargs) 

249 

250 

251@method_decorator(login_required, name="dispatch") 

252class RemoveConfirmedEmailView(SuccessMessageMixin, View): 

253 """ 

254 View class for removing a confirmed email address 

255 """ 

256 

257 @staticmethod 

258 def post(request, *args, **kwargs): # pylint: disable=unused-argument 

259 """ 

260 Handle post request - removing confirmed email 

261 """ 

262 try: 

263 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"]) 

264 email.delete() 

265 messages.success(request, _("Address removed")) 

266 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member 

267 messages.error(request, _("Address does not exist")) 

268 return HttpResponseRedirect(reverse_lazy("profile")) 

269 

270 

271@method_decorator(login_required, name="dispatch") 

272class AssignPhotoEmailView(SuccessMessageMixin, TemplateView): 

273 """ 

274 View class for assigning a photo to an email address 

275 """ 

276 

277 model = Photo 

278 template_name = "assign_photo_email.html" 

279 

280 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument 

281 """ 

282 Handle post request - assign photo to email 

283 """ 

284 photo = None 

285 

286 try: 

287 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"]) 

288 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member 

289 messages.error(request, _("Invalid request")) 

290 return HttpResponseRedirect(reverse_lazy("profile")) 

291 

292 if "photoNone" in request.POST: 

293 email.photo = None 

294 email.bluesky_handle = None 

295 elif "photoBluesky" in request.POST: 

296 # Keep the existing Bluesky handle, clear the photo 

297 email.photo = None 

298 # Don't clear bluesky_handle - keep it as is 

299 else: 

300 if "photo_id" not in request.POST: 

301 messages.error(request, _("Invalid request [photo_id] missing")) 

302 return HttpResponseRedirect(reverse_lazy("profile")) 

303 

304 if request.POST["photo_id"] == "bluesky": 

305 # Handle Bluesky photo selection 

306 email.photo = None 

307 # Don't clear bluesky_handle - keep it as is 

308 else: 

309 try: 

310 photo = self.model.objects.get( # pylint: disable=no-member 

311 id=request.POST["photo_id"], user=request.user 

312 ) 

313 except self.model.DoesNotExist: # pylint: disable=no-member 

314 messages.error(request, _("Photo does not exist")) 

315 return HttpResponseRedirect(reverse_lazy("profile")) 

316 email.photo = photo 

317 email.bluesky_handle = None 

318 email.save() 

319 

320 messages.success(request, _("Successfully changed photo")) 

321 return HttpResponseRedirect(reverse_lazy("profile")) 

322 

323 def get_context_data(self, **kwargs): 

324 data = super().get_context_data(**kwargs) 

325 data["email"] = ConfirmedEmail.objects.get(pk=kwargs["email_id"]) 

326 return data 

327 

328 

329@method_decorator(login_required, name="dispatch") 

330class AssignPhotoOpenIDView(SuccessMessageMixin, TemplateView): 

331 """ 

332 View class for assigning a photo to an openid address 

333 """ 

334 

335 model = Photo 

336 template_name = "assign_photo_openid.html" 

337 

338 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument 

339 """ 

340 Handle post - assign photo to openid 

341 """ 

342 photo = None 

343 

344 try: 

345 openid = ConfirmedOpenId.objects.get( # pylint: disable=no-member 

346 user=request.user, id=kwargs["openid_id"] 

347 ) 

348 except ConfirmedOpenId.DoesNotExist: # pylint: disable=no-member 

349 messages.error(request, _("Invalid request")) 

350 return HttpResponseRedirect(reverse_lazy("profile")) 

351 

352 if "photoNone" in request.POST: 

353 openid.photo = None 

354 else: 

355 if "photo_id" not in request.POST: 

356 messages.error(request, _("Invalid request [photo_id] missing")) 

357 return HttpResponseRedirect(reverse_lazy("profile")) 

358 

359 try: 

360 photo = self.model.objects.get( # pylint: disable=no-member 

361 id=request.POST["photo_id"], user=request.user 

362 ) 

363 except self.model.DoesNotExist: # pylint: disable=no-member 

364 messages.error(request, _("Photo does not exist")) 

365 return HttpResponseRedirect(reverse_lazy("profile")) 

366 openid.photo = photo 

367 openid.bluesky_handle = None 

368 openid.save() 

369 

370 messages.success(request, _("Successfully changed photo")) 

371 return HttpResponseRedirect(reverse_lazy("profile")) 

372 

373 def get_context_data(self, **kwargs): 

374 data = super().get_context_data(**kwargs) 

375 data["openid"] = ConfirmedOpenId.objects.get( 

376 pk=kwargs["openid_id"] 

377 ) # pylint: disable=no-member 

378 return data 

379 

380 

381@method_decorator(login_required, name="dispatch") 

382class AssignBlueskyHandleToEmailView(SuccessMessageMixin, TemplateView): 

383 """ 

384 View class for assigning a Bluesky handle to an email address 

385 """ 

386 

387 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument 

388 """ 

389 Handle post request - assign bluesky handle to email 

390 """ 

391 

392 try: 

393 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"]) 

394 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member 

395 messages.error(request, _("Invalid request")) 

396 return HttpResponseRedirect(reverse_lazy("profile")) 

397 

398 if "bluesky_handle" not in request.POST: 

399 messages.error(request, _("Invalid request [bluesky_handle] missing")) 

400 return HttpResponseRedirect(reverse_lazy("profile")) 

401 bluesky_handle = request.POST["bluesky_handle"] 

402 

403 try: 

404 bs = Bluesky() 

405 

406 bs.get_avatar(bluesky_handle) 

407 except Exception as e: 

408 messages.error(request, _(f"Handle '{bluesky_handle}' not found: {e}")) 

409 return HttpResponseRedirect( 

410 reverse_lazy( 

411 "assign_photo_email", kwargs={"email_id": int(kwargs["email_id"])} 

412 ) 

413 ) 

414 try: 

415 email.set_bluesky_handle(bluesky_handle) 

416 except Exception as e: 

417 messages.error(request, _(f"Error: {e}")) 

418 return HttpResponseRedirect( 

419 reverse_lazy( 

420 "assign_photo_email", kwargs={"email_id": int(kwargs["email_id"])} 

421 ) 

422 ) 

423 email.photo = None 

424 email.save() 

425 

426 messages.success(request, _("Successfully assigned Bluesky handle")) 

427 return HttpResponseRedirect(reverse_lazy("profile")) 

428 

429 def get_context_data(self, **kwargs): 

430 data = super().get_context_data(**kwargs) 

431 data["email"] = ConfirmedEmail.objects.get(pk=kwargs["email_id"]) 

432 return data 

433 

434 

435@method_decorator(login_required, name="dispatch") 

436class AssignBlueskyHandleToOpenIdView(SuccessMessageMixin, TemplateView): 

437 """ 

438 View class for assigning a Bluesky handle to an email address 

439 """ 

440 

441 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument 

442 """ 

443 Handle post request - assign bluesky handle to email 

444 """ 

445 

446 try: 

447 openid = ConfirmedOpenId.objects.get( 

448 user=request.user, id=kwargs["open_id"] 

449 ) 

450 except ConfirmedOpenId.DoesNotExist: # pylint: disable=no-member 

451 messages.error(request, _("Invalid request")) 

452 return HttpResponseRedirect(reverse_lazy("profile")) 

453 

454 if "bluesky_handle" not in request.POST: 

455 messages.error(request, _("Invalid request [bluesky_handle] missing")) 

456 return HttpResponseRedirect(reverse_lazy("profile")) 

457 bluesky_handle = request.POST["bluesky_handle"] 

458 

459 try: 

460 bs = Bluesky() 

461 

462 bs.get_avatar(bluesky_handle) 

463 except Exception as e: 

464 messages.error(request, _(f"Handle '{bluesky_handle}' not found: {e}")) 

465 return HttpResponseRedirect( 

466 reverse_lazy( 

467 "assign_photo_openid", kwargs={"openid_id": int(kwargs["open_id"])} 

468 ) 

469 ) 

470 try: 

471 openid.set_bluesky_handle(bluesky_handle) 

472 except Exception as e: 

473 messages.error(request, _(f"Error: {e}")) 

474 return HttpResponseRedirect( 

475 reverse_lazy( 

476 "assign_photo_openid", kwargs={"openid_id": int(kwargs["open_id"])} 

477 ) 

478 ) 

479 openid.photo = None 

480 openid.save() 

481 

482 messages.success(request, _("Successfully assigned Bluesky handle")) 

483 return HttpResponseRedirect(reverse_lazy("profile")) 

484 

485 def get_context_data(self, **kwargs): 

486 data = super().get_context_data(**kwargs) 

487 data["openid"] = ConfirmedOpenId.objects.get(pk=kwargs["open_id"]) 

488 return data 

489 

490 

491@method_decorator(login_required, name="dispatch") 

492class ImportPhotoView(SuccessMessageMixin, TemplateView): 

493 """ 

494 View class to import a photo from another service 

495 Currently only Gravatar is supported 

496 """ 

497 

498 template_name = "import_photo.html" 

499 

500 def get_context_data(self, **kwargs): 

501 context = super().get_context_data(**kwargs) 

502 context["photos"] = [] 

503 addr = None 

504 if "email_id" in kwargs: 

505 try: 

506 addr = ConfirmedEmail.objects.get(pk=kwargs["email_id"]).email 

507 except ConfirmedEmail.ObjectDoesNotExist: # pylint: disable=no-member 

508 messages.error(self.request, _("Address does not exist")) 

509 return context 

510 

511 if addr := kwargs.get("email_addr", None): 

512 if gravatar := get_gravatar_photo(addr): 

513 context["photos"].append(gravatar) 

514 

515 if libravatar_service_url := libravatar_url( 

516 email=addr, 

517 default=404, 

518 size=AVATAR_MAX_SIZE, 

519 ): 

520 try: 

521 urlopen(libravatar_service_url) 

522 except OSError as exc: 

523 logger.warning(f"Exception caught during photo import: {exc}") 

524 else: 

525 context["photos"].append( 

526 { 

527 "service_url": libravatar_service_url, 

528 "thumbnail_url": f"{libravatar_service_url}&s=80", 

529 "image_url": f"{libravatar_service_url}&s=512", 

530 "width": 80, 

531 "height": 80, 

532 "service_name": "Libravatar", 

533 } 

534 ) 

535 

536 return context 

537 

538 def post( 

539 self, request, *args, **kwargs 

540 ): # pylint: disable=no-self-use,unused-argument,too-many-branches,line-too-long 

541 """ 

542 Handle post to photo import 

543 """ 

544 

545 imported = None 

546 

547 email_id = kwargs.get("email_id", request.POST.get("email_id", None)) 

548 addr = kwargs.get("email", request.POST.get("email_addr", None)) 

549 

550 if email_id: 

551 email = ConfirmedEmail.objects.filter(id=email_id, user=request.user) 

552 if email.exists(): 

553 addr = email.first().email 

554 else: 

555 messages.error(request, _("Address does not exist")) 

556 return HttpResponseRedirect(reverse_lazy("profile")) 

557 

558 if "photo_Gravatar" in request.POST: 

559 photo = Photo() 

560 photo.user = request.user 

561 photo.ip_address = get_client_ip(request)[0] 

562 if photo.import_image("Gravatar", addr): 

563 messages.success(request, _("Gravatar image successfully imported")) 

564 else: 

565 # Honestly, I'm not sure how to test this... 

566 messages.error( 

567 request, _("Gravatar image import not successful") 

568 ) # pragma: no cover 

569 imported = True 

570 

571 if "photo_Libravatar" in request.POST: 

572 photo = Photo() 

573 photo.user = request.user 

574 photo.ip_address = get_client_ip(request)[0] 

575 if photo.import_image("Libravatar", addr): 

576 messages.success(request, _("Libravatar image successfully imported")) 

577 else: 

578 # Honestly, I'm not sure how to test this... 

579 messages.error( 

580 request, _("Libravatar image import not successful") 

581 ) # pragma: no cover 

582 imported = True 

583 if not imported: 

584 messages.warning(request, _("Nothing importable")) 

585 return HttpResponseRedirect(reverse_lazy("profile")) 

586 

587 

588@method_decorator(login_required, name="dispatch") 

589class RawImageView(DetailView): 

590 """ 

591 View to return (binary) raw image data, for use in <img/>-tags 

592 """ 

593 

594 model = Photo 

595 

596 def get(self, request, *args, **kwargs): 

597 photo = self.model.objects.get(pk=kwargs["pk"]) # pylint: disable=no-member 

598 if photo.user.id != request.user.id and not request.user.is_staff: 

599 return HttpResponseRedirect(reverse_lazy("home")) 

600 return HttpResponse(BytesIO(photo.data), content_type=f"image/{photo.format}") 

601 

602 

603@method_decorator(login_required, name="dispatch") 

604class DeletePhotoView(SuccessMessageMixin, View): 

605 """ 

606 View class for deleting a photo 

607 """ 

608 

609 model = Photo 

610 

611 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument 

612 """ 

613 Handle get - delete photo 

614 """ 

615 try: 

616 photo = self.model.objects.get( # pylint: disable=no-member 

617 pk=kwargs["pk"], user=request.user 

618 ) 

619 photo.delete() 

620 except (self.model.DoesNotExist, ProtectedError): # pylint: disable=no-member 

621 messages.error(request, _("No such image or no permission to delete it")) 

622 return HttpResponseRedirect(reverse_lazy("profile")) 

623 messages.success(request, _("Photo deleted successfully")) 

624 return HttpResponseRedirect(reverse_lazy("profile")) 

625 

626 

627@method_decorator(login_required, name="dispatch") 

628class UploadPhotoView(SuccessMessageMixin, FormView): 

629 """ 

630 View class responsible for photo upload with enhanced security 

631 """ 

632 

633 model = Photo 

634 template_name = "upload_photo.html" 

635 form_class = UploadPhotoForm 

636 success_message = _("Successfully uploaded") 

637 success_url = reverse_lazy("profile") 

638 

639 def post(self, request, *args, **kwargs): 

640 # Check maximum number of photos 

641 num_photos = request.user.photo_set.count() 

642 if num_photos >= MAX_NUM_PHOTOS: 

643 messages.error( 

644 request, _("Maximum number of photos (%i) reached" % MAX_NUM_PHOTOS) 

645 ) 

646 return HttpResponseRedirect(reverse_lazy("profile")) 

647 

648 return super().post(request, *args, **kwargs) 

649 

650 @trace_file_upload("photo_upload") 

651 def form_valid(self, form): 

652 photo_data = self.request.FILES["photo"] 

653 

654 # Additional size check (redundant but good for security) 

655 if photo_data.size > MAX_PHOTO_SIZE: 

656 messages.error(self.request, _("Image too big")) 

657 avatar_metrics.record_file_upload( 

658 file_size=photo_data.size, 

659 content_type=photo_data.content_type, 

660 success=False, 

661 ) 

662 return HttpResponseRedirect(reverse_lazy("profile")) 

663 

664 # Enhanced security logging 

665 security_logger.info( 

666 f"Photo upload attempt by user {self.request.user.id} " 

667 f"from IP {get_client_ip(self.request)[0]}, " 

668 f"file size: {photo_data.size} bytes" 

669 ) 

670 

671 photo = form.save(self.request, photo_data) 

672 

673 if not photo: 

674 security_logger.warning( 

675 f"Photo upload failed for user {self.request.user.id} - invalid format" 

676 ) 

677 messages.error(self.request, _("Invalid Format")) 

678 avatar_metrics.record_file_upload( 

679 file_size=photo_data.size, 

680 content_type=photo_data.content_type, 

681 success=False, 

682 ) 

683 return HttpResponseRedirect(reverse_lazy("profile")) 

684 

685 # Log successful upload 

686 security_logger.info( 

687 f"Photo uploaded successfully by user {self.request.user.id}, " 

688 f"photo ID: {photo.pk}" 

689 ) 

690 

691 # Record successful file upload metrics 

692 avatar_metrics.record_file_upload( 

693 file_size=photo_data.size, 

694 content_type=photo_data.content_type, 

695 success=True, 

696 ) 

697 

698 # Override success URL -> Redirect to crop page. 

699 self.success_url = reverse_lazy("crop_photo", args=[photo.pk]) 

700 return super().form_valid(form) 

701 

702 

703@method_decorator(login_required, name="dispatch") 

704class AddOpenIDView(SuccessMessageMixin, FormView): 

705 """ 

706 View class for adding OpenID 

707 """ 

708 

709 template_name = "add_openid.html" 

710 form_class = AddOpenIDForm 

711 success_url = reverse_lazy("profile") 

712 

713 def form_valid(self, form): 

714 if openid_id := form.save(self.request.user): 

715 # At this point we have an unconfirmed OpenID, but 

716 # we do not add the message, that we successfully added it, 

717 # since this is misleading 

718 return HttpResponseRedirect( 

719 reverse_lazy("openid_redirection", args=[openid_id]) 

720 ) 

721 else: 

722 return render(self.request, self.template_name, {"form": form}) 

723 

724 

725@method_decorator(login_required, name="dispatch") 

726class RemoveUnconfirmedOpenIDView(View): 

727 """ 

728 View class for removing a unconfirmed OpenID 

729 """ 

730 

731 model = UnconfirmedOpenId 

732 

733 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument 

734 """ 

735 Handle post - remove unconfirmed openid 

736 """ 

737 try: 

738 openid = self.model.objects.get( # pylint: disable=no-member 

739 user=request.user, id=kwargs["openid_id"] 

740 ) 

741 openid.delete() 

742 messages.success(request, _("ID removed")) 

743 except ( 

744 self.model.DoesNotExist 

745 ): # pragma: no cover pylint: disable=no-member,line-too-long 

746 messages.error(request, _("ID does not exist")) 

747 return HttpResponseRedirect(reverse_lazy("profile")) 

748 

749 

750@method_decorator(login_required, name="dispatch") 

751class RemoveConfirmedOpenIDView(View): 

752 """ 

753 View class for removing a confirmed OpenID 

754 """ 

755 

756 model = ConfirmedOpenId 

757 

758 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument 

759 """ 

760 Handle post - remove confirmed openid 

761 """ 

762 try: 

763 openid = self.model.objects.get( # pylint: disable=no-member 

764 user=request.user, id=kwargs["openid_id"] 

765 ) 

766 try: 

767 openidobj = ( 

768 UserOpenID.objects.get( # pylint: disable=no-member,line-too-long 

769 user_id=request.user.id, claimed_id=openid.openid 

770 ) 

771 ) 

772 openidobj.delete() 

773 except Exception as exc: # pylint: disable=broad-except 

774 # Why it is not there? 

775 logger.warning(f"How did we get here: {exc}") 

776 openid.delete() 

777 messages.success(request, _("ID removed")) 

778 except self.model.DoesNotExist: # pylint: disable=no-member 

779 messages.error(request, _("ID does not exist")) 

780 return HttpResponseRedirect(reverse_lazy("profile")) 

781 

782 

783@method_decorator(login_required, name="dispatch") 

784class RedirectOpenIDView(View): 

785 """ 

786 Redirect view for OpenID 

787 """ 

788 

789 model = UnconfirmedOpenId 

790 

791 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument 

792 """ 

793 Handle get for OpenID redirect view 

794 """ 

795 try: 

796 unconfirmed = self.model.objects.get( # pylint: disable=no-member 

797 user=request.user, id=kwargs["openid_id"] 

798 ) 

799 except ( 

800 self.model.DoesNotExist 

801 ): # pragma: no cover pylint: disable=no-member,line-too-long 

802 messages.error(request, _("ID does not exist")) 

803 return HttpResponseRedirect(reverse_lazy("profile")) 

804 

805 user_url = unconfirmed.openid 

806 session = {"id": request.session.session_key} 

807 

808 oidutil.log = openid_logging 

809 openid_consumer = consumer.Consumer(session, DjangoOpenIDStore()) 

810 

811 try: 

812 auth_request = openid_consumer.begin(user_url) 

813 except consumer.DiscoveryFailure as exc: 

814 messages.error(request, _(f"OpenID discovery failed: {exc}")) 

815 return HttpResponseRedirect(reverse_lazy("profile")) 

816 except UnicodeDecodeError as exc: # pragma: no cover 

817 msg = _( 

818 "OpenID discovery failed (userid=%(userid)s) for " 

819 "%(userurl)s: %(message)s" 

820 % { 

821 "userid": request.user.id, 

822 "userurl": user_url.encode("utf-8"), 

823 "message": exc, 

824 } 

825 ) 

826 logger.error(f"message: {msg}") 

827 messages.error(request, msg) 

828 

829 if auth_request is None: # pragma: no cover 

830 messages.error(request, _("OpenID discovery failed")) 

831 return HttpResponseRedirect(reverse_lazy("profile")) 

832 

833 realm = request.build_absolute_uri("/")[:-1] # pragma: no cover 

834 return_url = realm + reverse( # pragma: no cover 

835 "confirm_openid", args=[kwargs["openid_id"]] 

836 ) 

837 return HttpResponseRedirect( # pragma: no cover 

838 auth_request.redirectURL(realm, return_url) 

839 ) 

840 

841 

842@method_decorator(login_required, name="dispatch") 

843class ConfirmOpenIDView(View): # pragma: no cover 

844 """ 

845 Confirm OpenID view 

846 """ 

847 

848 model = UnconfirmedOpenId 

849 model_confirmed = ConfirmedOpenId 

850 

851 def do_request(self, data, *args, **kwargs): # pylint: disable=unused-argument 

852 """ 

853 Handle request, called by get() or post() 

854 """ 

855 session = {"id": self.request.session.session_key} 

856 current_url = self.request.build_absolute_uri("/")[:-1] + self.request.path 

857 openid_consumer = consumer.Consumer(session, DjangoOpenIDStore()) 

858 info = openid_consumer.complete(data, current_url) 

859 if info.status == consumer.FAILURE: 

860 messages.error( 

861 self.request, _('Confirmation failed: "') + str(info.message) + '"' 

862 ) 

863 return HttpResponseRedirect(reverse_lazy("profile")) 

864 

865 if info.status == consumer.CANCEL: 

866 messages.error(self.request, _("Cancelled by user")) 

867 return HttpResponseRedirect(reverse_lazy("profile")) 

868 

869 if info.status != consumer.SUCCESS: 

870 messages.error(self.request, _("Unknown verification error")) 

871 return HttpResponseRedirect(reverse_lazy("profile")) 

872 

873 try: 

874 unconfirmed = self.model.objects.get( # pylint: disable=no-member 

875 user=self.request.user, id=kwargs["openid_id"] 

876 ) 

877 except self.model.DoesNotExist: # pylint: disable=no-member 

878 messages.error(self.request, _("ID does not exist")) 

879 return HttpResponseRedirect(reverse_lazy("profile")) 

880 

881 # TODO: Check for a reasonable expiration time 

882 confirmed = self.model_confirmed() 

883 confirmed.user = unconfirmed.user 

884 confirmed.ip_address = get_client_ip(self.request)[0] 

885 confirmed.openid = unconfirmed.openid 

886 confirmed.save() 

887 

888 unconfirmed.delete() 

889 

890 # If there is a single image in this user's profile 

891 # assign it to the new id 

892 if self.request.user.photo_set.count() == 1: 

893 confirmed.set_photo(self.request.user.photo_set.first()) 

894 

895 # Also allow user to login using this OpenID (if not already taken) 

896 if not UserOpenID.objects.filter( # pylint: disable=no-member 

897 claimed_id=confirmed.openid 

898 ).exists(): 

899 user_openid = UserOpenID() 

900 user_openid.user = self.request.user 

901 user_openid.claimed_id = confirmed.openid 

902 user_openid.display_id = confirmed.openid 

903 user_openid.save() 

904 return HttpResponseRedirect(reverse_lazy("profile")) 

905 

906 def get(self, request, *args, **kwargs): 

907 """ 

908 Handle get - confirm openid 

909 """ 

910 return self.do_request(request.GET, *args, **kwargs) 

911 

912 def post(self, request, *args, **kwargs): 

913 """ 

914 Handle post - confirm openid 

915 """ 

916 return self.do_request(request.POST, *args, **kwargs) 

917 

918 

919@method_decorator(login_required, name="dispatch") 

920class CropPhotoView(TemplateView): 

921 """ 

922 View class for cropping photos 

923 """ 

924 

925 template_name = "crop_photo.html" 

926 success_url = reverse_lazy("profile") 

927 model = Photo 

928 

929 def get(self, request, *args, **kwargs): 

930 photo = self.model.objects.get( 

931 pk=kwargs["pk"], user=request.user 

932 ) # pylint: disable=no-member 

933 email = request.GET.get("email") 

934 openid = request.GET.get("openid") 

935 return render( 

936 self.request, 

937 self.template_name, 

938 { 

939 "photo": photo, 

940 "email": email, 

941 "openid": openid, 

942 }, 

943 ) 

944 

945 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument 

946 """ 

947 Handle post - crop photo 

948 """ 

949 photo = self.model.objects.get( 

950 pk=kwargs["pk"], user=request.user 

951 ) # pylint: disable=no-member 

952 dimensions = { 

953 "x": int(float(request.POST["x"])), 

954 "y": int(float(request.POST["y"])), 

955 "w": int(float(request.POST["w"])), 

956 "h": int(float(request.POST["h"])), 

957 } 

958 email = openid = None 

959 if "email" in request.POST: 

960 with contextlib.suppress(ConfirmedEmail.DoesNotExist): 

961 email = ConfirmedEmail.objects.get(email=request.POST["email"]) 

962 if "openid" in request.POST: 

963 with contextlib.suppress(ConfirmedOpenId.DoesNotExist): 

964 openid = ConfirmedOpenId.objects.get( # pylint: disable=no-member 

965 openid=request.POST["openid"] 

966 ) 

967 return photo.perform_crop(request, dimensions, email, openid) 

968 

969 

970@method_decorator(login_required, name="dispatch") # pylint: disable=too-many-ancestors 

971class UserPreferenceView(FormView, UpdateView): 

972 """ 

973 View class for user preferences view/update 

974 """ 

975 

976 template_name = "preferences.html" 

977 model = UserPreference 

978 form_class = UpdatePreferenceForm 

979 success_url = reverse_lazy("user_preference") 

980 

981 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument 

982 """ 

983 Process POST-ed data from this form 

984 """ 

985 userpref = None 

986 try: 

987 userpref = self.request.user.userpreference 

988 except ObjectDoesNotExist: 

989 userpref = UserPreference(user=self.request.user) 

990 userpref.theme = request.POST["theme"] 

991 userpref.save() 

992 try: 

993 if request.POST["email"] != self.request.user.email: 

994 addresses = list( 

995 self.request.user.confirmedemail_set.all().values_list( 

996 "email", flat=True 

997 ) 

998 ) 

999 if request.POST["email"] not in addresses: 

1000 messages.error( 

1001 self.request, 

1002 _(f'Mail address not allowed: {request.POST["email"]}'), 

1003 ) 

1004 else: 

1005 self.request.user.email = request.POST["email"] 

1006 self.request.user.save() 

1007 messages.info(self.request, _("Mail address changed.")) 

1008 except Exception as e: # pylint: disable=broad-except 

1009 messages.error(self.request, _(f"Error setting new mail address: {e}")) 

1010 

1011 try: 

1012 if request.POST["first_name"] or request.POST["last_name"]: 

1013 if request.POST["first_name"] != self.request.user.first_name: 

1014 self.request.user.first_name = request.POST["first_name"] 

1015 messages.info(self.request, _("First name changed.")) 

1016 if request.POST["last_name"] != self.request.user.last_name: 

1017 self.request.user.last_name = request.POST["last_name"] 

1018 messages.info(self.request, _("Last name changed.")) 

1019 self.request.user.save() 

1020 except Exception as e: # pylint: disable=broad-except 

1021 messages.error(self.request, _(f"Error setting names: {e}")) 

1022 

1023 return HttpResponseRedirect(reverse_lazy("user_preference")) 

1024 

1025 def get(self, request, *args, **kwargs): 

1026 return render( 

1027 self.request, 

1028 self.template_name, 

1029 { 

1030 "THEMES": UserPreference.THEMES, 

1031 }, 

1032 ) 

1033 

1034 def get_object(self, queryset=None): 

1035 (obj, created) = UserPreference.objects.get_or_create( 

1036 user=self.request.user 

1037 ) # pylint: disable=no-member,unused-variable 

1038 return obj 

1039 

1040 

1041@method_decorator(login_required, name="dispatch") 

1042class UploadLibravatarExportView(SuccessMessageMixin, FormView): 

1043 """ 

1044 View class responsible for libravatar user data export upload 

1045 """ 

1046 

1047 template_name = "upload_libravatar_export.html" 

1048 form_class = UploadLibravatarExportForm 

1049 success_message = _("Successfully uploaded") 

1050 success_url = reverse_lazy("profile") 

1051 model = User 

1052 

1053 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument 

1054 """ 

1055 Handle post request - choose items to import 

1056 """ 

1057 if "save" in kwargs: # pylint: disable=too-many-nested-blocks 

1058 if kwargs["save"] == "save": 

1059 for arg in request.POST: 

1060 if arg.startswith("email_"): 

1061 email = request.POST[arg] 

1062 if not ConfirmedEmail.objects.filter( 

1063 email=email 

1064 ) and not UnconfirmedEmail.objects.filter( 

1065 email=email 

1066 ): # pylint: disable=no-member 

1067 try: 

1068 unconfirmed = UnconfirmedEmail.objects.create( # pylint: disable=no-member 

1069 user=request.user, email=email 

1070 ) 

1071 unconfirmed.save() 

1072 unconfirmed.send_confirmation_mail( 

1073 url=request.build_absolute_uri("/")[:-1] 

1074 ) 

1075 messages.info( 

1076 request, 

1077 "%s: %s" 

1078 % ( 

1079 email, 

1080 _( 

1081 "address added successfully,\ 

1082 confirmation mail sent" 

1083 ), 

1084 ), 

1085 ) 

1086 except Exception as exc: # pylint: disable=broad-except 

1087 # DEBUG 

1088 print( 

1089 f"Exception during adding mail address ({email}): {exc}" 

1090 ) 

1091 

1092 if arg.startswith("photo"): 

1093 try: 

1094 data = base64.decodebytes(bytes(request.POST[arg], "utf-8")) 

1095 except binascii.Error as exc: 

1096 logger.warning(f"Cannot decode photo: {exc}") 

1097 continue 

1098 try: 

1099 pilobj = Image.open(BytesIO(data)) 

1100 out = BytesIO() 

1101 pilobj.save(out, pilobj.format, quality=JPEG_QUALITY) 

1102 out.seek(0) 

1103 photo = Photo() 

1104 photo.user = request.user 

1105 photo.ip_address = get_client_ip(request)[0] 

1106 photo.format = file_format(pilobj.format) 

1107 photo.data = out.read() 

1108 photo.save() 

1109 except Exception as exc: # pylint: disable=broad-except 

1110 logger.error(f"Exception during save: {exc}") 

1111 continue 

1112 

1113 return HttpResponseRedirect(reverse_lazy("profile")) 

1114 return super().post(request, args, kwargs) 

1115 

1116 def form_valid(self, form): 

1117 data = self.request.FILES["export_file"] 

1118 try: 

1119 items = libravatar_read_gzdata(data.read()) 

1120 # DEBUG print(items) 

1121 return render( 

1122 self.request, 

1123 "choose_libravatar_export.html", 

1124 { 

1125 "emails": items["emails"], 

1126 "photos": items["photos"], 

1127 }, 

1128 ) 

1129 except Exception as e: 

1130 messages.error(self.request, _(f"Unable to parse file: {e}")) 

1131 return HttpResponseRedirect(reverse_lazy("upload_export")) 

1132 

1133 

1134@method_decorator(login_required, name="dispatch") 

1135class ResendConfirmationMailView(View): 

1136 """ 

1137 View class for resending confirmation mail 

1138 """ 

1139 

1140 model = UnconfirmedEmail 

1141 

1142 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument 

1143 """ 

1144 Handle post - resend confirmation mail for unconfirmed e-mail address 

1145 """ 

1146 try: 

1147 email = self.model.objects.get( # pylint: disable=no-member 

1148 user=request.user, id=kwargs["email_id"] 

1149 ) 

1150 except self.model.DoesNotExist: # pragma: no cover pylint: disable=no-member 

1151 messages.error(request, _("ID does not exist")) 

1152 else: 

1153 try: 

1154 email.send_confirmation_mail(url=request.build_absolute_uri("/")[:-1]) 

1155 messages.success( 

1156 request, f'{_("Confirmation mail sent to")}: {email.email}' 

1157 ) 

1158 except Exception as exc: # pylint: disable=broad-except 

1159 messages.error( 

1160 request, 

1161 f'{_("Unable to send confirmation email for")} {email.email}: {exc}', 

1162 ) 

1163 return HttpResponseRedirect(reverse_lazy("profile")) 

1164 

1165 

1166class IvatarLoginView(LoginView): 

1167 """ 

1168 View class for login 

1169 """ 

1170 

1171 template_name = "login.html" 

1172 

1173 @trace_authentication("login_attempt") 

1174 def get(self, request, *args, **kwargs): 

1175 """ 

1176 Handle get for login view 

1177 """ 

1178 if request.user: 

1179 if request.user.is_authenticated: 

1180 # Respect the 'next' parameter if present 

1181 next_url = request.GET.get("next") 

1182 if next_url: 

1183 return HttpResponseRedirect(next_url) 

1184 return HttpResponseRedirect(reverse_lazy("profile")) 

1185 return super().get(self, request, args, kwargs) 

1186 

1187 @trace_authentication("login_post") 

1188 def post(self, request, *args, **kwargs): 

1189 """ 

1190 Handle login form submission 

1191 """ 

1192 return super().post(request, *args, **kwargs) 

1193 

1194 def get_context_data(self, **kwargs): 

1195 context = super().get_context_data(**kwargs) 

1196 context["with_fedora"] = SOCIAL_AUTH_FEDORA_KEY is not None 

1197 return context 

1198 

1199 

1200@method_decorator(login_required, name="dispatch") 

1201class ProfileView(TemplateView): 

1202 """ 

1203 View class for profile 

1204 """ 

1205 

1206 template_name = "profile.html" 

1207 

1208 def get(self, request, *args, **kwargs): 

1209 if "profile_username" in kwargs: 

1210 if not request.user.is_staff: 

1211 return HttpResponseRedirect(reverse_lazy("profile")) 

1212 with contextlib.suppress(Exception): 

1213 u = User.objects.get(username=kwargs["profile_username"]) 

1214 request.user = u 

1215 self._confirm_claimed_openid() 

1216 return super().get(self, request, args, kwargs) 

1217 

1218 def get_context_data(self, **kwargs): 

1219 """ 

1220 Provide additional context data, like if max_photos is reached 

1221 already or not. 

1222 """ 

1223 context = super().get_context_data(**kwargs) 

1224 context["max_photos"] = False 

1225 if self.request.user: 

1226 if self.request.user.photo_set.all().count() >= MAX_NUM_PHOTOS: 

1227 context["max_photos"] = True 

1228 return context 

1229 

1230 def _confirm_claimed_openid(self): 

1231 openids = self.request.user.useropenid_set.all() 

1232 # If there is only one OpenID, we eventually need to add it to 

1233 # the user account 

1234 if openids.count() == 1: 

1235 # Already confirmed, skip 

1236 if ConfirmedOpenId.objects.filter( # pylint: disable=no-member 

1237 openid=openids.first().claimed_id 

1238 ).exists(): 

1239 return 

1240 # For whatever reason, this is in unconfirmed state, skip 

1241 if UnconfirmedOpenId.objects.filter( # pylint: disable=no-member 

1242 openid=openids.first().claimed_id 

1243 ).exists(): 

1244 return 

1245 logger.debug(f"need to confirm: {openids.first()}") 

1246 confirmed = ConfirmedOpenId() 

1247 confirmed.user = self.request.user 

1248 confirmed.ip_address = get_client_ip(self.request)[0] 

1249 confirmed.openid = openids.first().claimed_id 

1250 confirmed.save() 

1251 

1252 

1253class PasswordResetView(PasswordResetViewOriginal): 

1254 """ 

1255 View class for password reset 

1256 """ 

1257 

1258 def post(self, request, *args, **kwargs): 

1259 """ 

1260 Since we have the mail addresses in ConfirmedEmail model, 

1261 we need to set the email on the user object in order for the 

1262 PasswordResetView class to pick up the correct user. 

1263 In case we have the mail address in the User object, we still 

1264 need to assign a random password in order for PasswordResetView 

1265 class to pick up the user - else it will silently do nothing. 

1266 """ 

1267 if "email" in request.POST: 

1268 user = None 

1269 

1270 # Try to find the user via the normal user class 

1271 # TODO: How to handle the case that multiple user accounts 

1272 # could have the same password set? 

1273 user = User.objects.filter(email=request.POST["email"]).first() 

1274 

1275 # If we didn't find the user in the previous step, 

1276 # try the ConfirmedEmail class instead. 

1277 # If we find the user there, we need to set the mail 

1278 # attribute on the user object accordingly 

1279 if not user: 

1280 with contextlib.suppress(ObjectDoesNotExist): 

1281 confirmed_email = ConfirmedEmail.objects.get( 

1282 email=request.POST["email"] 

1283 ) 

1284 user = confirmed_email.user 

1285 user.email = confirmed_email.email 

1286 user.save() 

1287 # If we found the user, set a random password. Else, the 

1288 # ResetPasswordView class will silently ignore the password 

1289 # reset request 

1290 if user: 

1291 if not user.password or user.password.startswith("!"): 

1292 random_pass = get_random_string(12) 

1293 user.set_password(random_pass) 

1294 user.save() 

1295 

1296 # Whatever happens above, let the original function handle the rest 

1297 return super().post(self, request, args, kwargs) 

1298 

1299 

1300@method_decorator(login_required, name="dispatch") 

1301class DeleteAccountView(SuccessMessageMixin, FormView): 

1302 """ 

1303 View class for account deletion 

1304 """ 

1305 

1306 template_name = "delete.html" 

1307 form_class = DeleteAccountForm 

1308 success_url = reverse_lazy("home") 

1309 

1310 def get(self, request, *args, **kwargs): 

1311 return super().get(self, request, args, kwargs) 

1312 

1313 def post(self, request, *args, **kwargs): 

1314 """ 

1315 Handle account deletion 

1316 """ 

1317 if request.user.password: 

1318 if "password" in request.POST: 

1319 if not request.user.check_password(request.POST["password"]): 

1320 messages.error(request, _("Incorrect password")) 

1321 return HttpResponseRedirect(reverse_lazy("delete")) 

1322 else: 

1323 messages.error(request, _("No password given")) 

1324 return HttpResponseRedirect(reverse_lazy("delete")) 

1325 

1326 # should delete all confirmed/unconfirmed/photo objects 

1327 request.user.delete() 

1328 return super().post(self, request, args, kwargs) 

1329 

1330 

1331@method_decorator(login_required, name="dispatch") 

1332class ExportView(SuccessMessageMixin, TemplateView): 

1333 """ 

1334 View class responsible for libravatar user data export 

1335 """ 

1336 

1337 template_name = "export.html" 

1338 model = User 

1339 

1340 def get(self, request, *args, **kwargs): 

1341 return super().get(self, request, args, kwargs) 

1342 

1343 def post(self, request, *args, **kwargs): 

1344 """ 

1345 Handle real export 

1346 """ 

1347 SCHEMA_ROOT = "https://www.libravatar.org/schemas/export/0.2" 

1348 SCHEMA_XSD = f"{SCHEMA_ROOT}/export.xsd" 

1349 

1350 def xml_header(): 

1351 return ( 

1352 """<?xml version="1.0" encoding="UTF-8"?>""" 

1353 '''<user xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"''' 

1354 ''' xsi:schemaLocation="%s %s"''' 

1355 """ xmlns="%s">\n""" % (SCHEMA_ROOT, SCHEMA_XSD, SCHEMA_ROOT) 

1356 ) 

1357 

1358 def xml_footer(): 

1359 return "</user>\n" 

1360 

1361 def xml_account(user): 

1362 escaped_username = saxutils.quoteattr(user.username) 

1363 escaped_password = saxutils.quoteattr(user.password) 

1364 return " <account username={} password={}/>\n".format( 

1365 escaped_username, 

1366 escaped_password, 

1367 ) 

1368 

1369 def xml_email(user): 

1370 returnstring = " <emails>\n" 

1371 for email in user.confirmedemail_set.all(): 

1372 returnstring += ( 

1373 ' <email photo_id="' 

1374 + str(email.photo_id) 

1375 + '">' 

1376 + str(email.email) 

1377 + "</email>" 

1378 + "\n" 

1379 ) 

1380 returnstring += " </emails>\n" 

1381 return returnstring 

1382 

1383 def xml_openid(user): 

1384 returnstring = " <openids>\n" 

1385 for openid in user.confirmedopenid_set.all(): 

1386 returnstring += ( 

1387 ' <openid photo_id="' 

1388 + str(openid.photo_id) 

1389 + '">' 

1390 + str(openid.openid) 

1391 + "</openid>" 

1392 + "\n" 

1393 ) 

1394 returnstring += " </openids>\n" 

1395 return returnstring 

1396 

1397 def xml_photos(user): 

1398 s = " <photos>\n" 

1399 for photo in user.photo_set.all(): 

1400 encoded_photo = base64.b64encode(photo.data) 

1401 if encoded_photo: 

1402 s += ( 

1403 """ <photo id="%s" encoding="base64" format=%s>""" 

1404 """%s""" 

1405 """</photo>\n""" 

1406 % (photo.id, saxutils.quoteattr(photo.format), encoded_photo) 

1407 ) 

1408 s += " </photos>\n" 

1409 return s 

1410 

1411 user = request.user 

1412 

1413 photos = [] 

1414 for photo in user.photo_set.all(): 

1415 photo_details = {"data": photo.data, "format": photo.format} 

1416 photos.append(photo_details) 

1417 

1418 bytesobj = BytesIO() 

1419 data = gzip.GzipFile(fileobj=bytesobj, mode="w") 

1420 data.write(bytes(xml_header(), "utf-8")) 

1421 data.write(bytes(xml_account(user), "utf-8")) 

1422 data.write(bytes(xml_email(user), "utf-8")) 

1423 data.write(bytes(xml_openid(user), "utf-8")) 

1424 data.write(bytes(xml_photos(user), "utf-8")) 

1425 data.write(bytes(xml_footer(), "utf-8")) 

1426 data.close() 

1427 bytesobj.seek(0) 

1428 

1429 response = HttpResponse(content_type="application/gzip") 

1430 response["Content-Disposition"] = ( 

1431 f'attachment; filename="libravatar-export_{user.username}.xml.gz"' 

1432 ) 

1433 response.write(bytesobj.read()) 

1434 return response