Coverage for ivatar/ivataraccount/views.py: 76%
687 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 00:07 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 00:07 +0000
1"""
2View classes for ivatar/ivataraccount/
3"""
5from io import BytesIO
6from ivatar.utils import urlopen, Bluesky
7import base64
8import binascii
9import contextlib
10from xml.sax import saxutils
11import gzip
12import logging
14from PIL import Image
16from django.db.models import ProtectedError
17from django.core.exceptions import ObjectDoesNotExist
18from django.contrib.auth.decorators import login_required
19from django.contrib.auth.models import User
20from django.utils.decorators import method_decorator
21from django.contrib.messages.views import SuccessMessageMixin
22from django.contrib import messages
23from django.views.generic.edit import FormView, UpdateView
24from django.views.generic.base import View, TemplateView
25from django.views.generic.detail import DetailView
26from django.contrib.auth import authenticate, login
27from django.contrib.auth.forms import UserCreationForm, SetPasswordForm
28from django.contrib.auth.views import LoginView
29from django.contrib.auth.views import (
30 PasswordResetView as PasswordResetViewOriginal,
31)
32from django.utils.crypto import get_random_string
33from django.utils.translation import gettext_lazy as _
34from django.http import HttpResponseRedirect, HttpResponse
35from django.urls import reverse_lazy, reverse
36from django.shortcuts import render
37from django_openid_auth.models import UserOpenID
39from openid import oidutil
40from openid.consumer import consumer
42from ipware import get_client_ip
44from email_validator import validate_email
46from libravatar import libravatar_url
47from ivatar.settings import (
48 MAX_NUM_PHOTOS,
49 MAX_PHOTO_SIZE,
50 JPEG_QUALITY,
51 AVATAR_MAX_SIZE,
52 SOCIAL_AUTH_FEDORA_KEY,
53)
54from .gravatar import get_photo as get_gravatar_photo
56from .forms import AddEmailForm, UploadPhotoForm, AddOpenIDForm
57from .forms import UpdatePreferenceForm, UploadLibravatarExportForm
58from .forms import DeleteAccountForm
59from .models import UnconfirmedEmail, ConfirmedEmail, Photo
60from .models import UnconfirmedOpenId, ConfirmedOpenId, DjangoOpenIDStore
61from .models import UserPreference
62from .models import file_format
63from .read_libravatar_export import read_gzdata as libravatar_read_gzdata
65# Initialize loggers
66logger = logging.getLogger("ivatar")
67security_logger = logging.getLogger("ivatar.security")
69# Import OpenTelemetry with graceful degradation
70from ..telemetry_utils import (
71 trace_file_upload,
72 trace_authentication,
73 get_telemetry_metrics,
74)
76avatar_metrics = get_telemetry_metrics()
79def openid_logging(message, level=0):
80 """
81 Helper method for openid logging
82 """
83 # Normal messages are not that important
84 # No need for coverage here
85 if level > 0: # pragma: no cover
86 logger.debug(message)
89class CreateView(SuccessMessageMixin, FormView):
90 """
91 View class for creating a new user
92 """
94 template_name = "new.html"
95 form_class = UserCreationForm
97 @trace_authentication("user_registration")
98 def form_valid(self, form):
99 form.save()
100 user = authenticate(
101 username=form.cleaned_data["username"],
102 password=form.cleaned_data["password1"],
103 )
104 if user is not None:
105 # If the username looks like a mail address, automagically
106 # add it as unconfirmed mail and set it also as user's
107 # email address
108 with contextlib.suppress(Exception):
109 self._extracted_from_form_valid_(form, user)
110 login(self.request, user)
111 pref = UserPreference.objects.create(
112 user_id=user.pk
113 ) # pylint: disable=no-member
114 pref.save()
115 return HttpResponseRedirect(reverse_lazy("profile"))
116 return HttpResponseRedirect(reverse_lazy("login")) # pragma: no cover
118 def _extracted_from_form_valid_(self, form, user):
119 # This will error out if it's not a valid address
120 valid = validate_email(form.cleaned_data["username"])
121 user.email = valid.email
122 user.save()
123 # The following will also error out if it already exists
124 unconfirmed = UnconfirmedEmail()
125 unconfirmed.email = valid.email
126 unconfirmed.user = user
127 unconfirmed.save()
128 unconfirmed.send_confirmation_mail(
129 url=self.request.build_absolute_uri("/")[:-1]
130 )
132 def get(self, request, *args, **kwargs):
133 """
134 Handle get for create view
135 """
136 if request.user and request.user.is_authenticated:
137 return HttpResponseRedirect(reverse_lazy("profile"))
138 return super().get(self, request, args, kwargs)
141@method_decorator(login_required, name="dispatch")
142class PasswordSetView(SuccessMessageMixin, FormView):
143 """
144 View class for changing the password
145 """
147 template_name = "password_change.html"
148 form_class = SetPasswordForm
149 success_message = _("password changed successfully - please login again")
150 success_url = reverse_lazy("profile")
152 def get_form_kwargs(self):
153 kwargs = super().get_form_kwargs()
154 kwargs["user"] = self.request.user
155 return kwargs
157 def form_valid(self, form):
158 form.save()
159 super().form_valid(form)
160 return HttpResponseRedirect(reverse_lazy("login"))
163@method_decorator(login_required, name="dispatch")
164class AddEmailView(SuccessMessageMixin, FormView):
165 """
166 View class for adding email addresses
167 """
169 template_name = "add_email.html"
170 form_class = AddEmailForm
171 success_url = reverse_lazy("profile")
173 def form_valid(self, form):
174 if not form.save(self.request):
175 return render(self.request, self.template_name, {"form": form})
177 messages.success(self.request, _("Address added successfully"))
178 return super().form_valid(form)
181@method_decorator(login_required, name="dispatch")
182class RemoveUnconfirmedEmailView(SuccessMessageMixin, View):
183 """
184 View class for removing a unconfirmed email address
185 """
187 @staticmethod
188 def post(request, *args, **kwargs): # pylint: disable=unused-argument
189 """
190 Handle post request - removing unconfirmed email
191 """
192 try:
193 email = UnconfirmedEmail.objects.get( # pylint: disable=no-member
194 user=request.user, id=kwargs["email_id"]
195 )
196 email.delete()
197 messages.success(request, _("Address removed"))
198 except UnconfirmedEmail.DoesNotExist: # pylint: disable=no-member
199 messages.error(request, _("Address does not exist"))
200 return HttpResponseRedirect(reverse_lazy("profile"))
203class ConfirmEmailView(SuccessMessageMixin, TemplateView):
204 """
205 View class for confirming an unconfirmed email address
206 """
208 template_name = "email_confirmed.html"
210 def get(self, request, *args, **kwargs):
211 # be tolerant of extra crap added by mail clients
212 key = kwargs["verification_key"].replace(" ", "")
214 if len(key) != 64:
215 messages.error(request, _("Verification key incorrect"))
216 return HttpResponseRedirect(reverse_lazy("profile"))
218 try:
219 unconfirmed = UnconfirmedEmail.objects.get(
220 verification_key=key
221 ) # pylint: disable=no-member
222 except UnconfirmedEmail.DoesNotExist: # pylint: disable=no-member
223 messages.error(request, _("Verification key does not exist"))
224 return HttpResponseRedirect(reverse_lazy("profile"))
226 if ConfirmedEmail.objects.filter(email=unconfirmed.email).count() > 0:
227 messages.error(
228 request,
229 _("This mail address has been taken already and cannot be confirmed"),
230 )
231 return HttpResponseRedirect(reverse_lazy("profile"))
233 # TODO: Check for a reasonable expiration time in unconfirmed email
235 (confirmed_id, external_photos) = ConfirmedEmail.objects.create_confirmed_email(
236 unconfirmed.user, unconfirmed.email, not request.user.is_anonymous
237 )
239 unconfirmed.delete()
241 # if there's a single image in this user's profile,
242 # assign it to the new email
243 confirmed = ConfirmedEmail.objects.get(id=confirmed_id)
244 if confirmed.user.photo_set.count() == 1:
245 confirmed.set_photo(confirmed.user.photo_set.first())
246 kwargs["photos"] = external_photos
247 kwargs["email_id"] = confirmed_id
248 return super().get(request, *args, **kwargs)
251@method_decorator(login_required, name="dispatch")
252class RemoveConfirmedEmailView(SuccessMessageMixin, View):
253 """
254 View class for removing a confirmed email address
255 """
257 @staticmethod
258 def post(request, *args, **kwargs): # pylint: disable=unused-argument
259 """
260 Handle post request - removing confirmed email
261 """
262 try:
263 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
264 email.delete()
265 messages.success(request, _("Address removed"))
266 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
267 messages.error(request, _("Address does not exist"))
268 return HttpResponseRedirect(reverse_lazy("profile"))
271@method_decorator(login_required, name="dispatch")
272class AssignPhotoEmailView(SuccessMessageMixin, TemplateView):
273 """
274 View class for assigning a photo to an email address
275 """
277 model = Photo
278 template_name = "assign_photo_email.html"
280 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
281 """
282 Handle post request - assign photo to email
283 """
284 photo = None
286 try:
287 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
288 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
289 messages.error(request, _("Invalid request"))
290 return HttpResponseRedirect(reverse_lazy("profile"))
292 if "photoNone" in request.POST:
293 email.photo = None
294 email.bluesky_handle = None
295 elif "photoBluesky" in request.POST:
296 # Keep the existing Bluesky handle, clear the photo
297 email.photo = None
298 # Don't clear bluesky_handle - keep it as is
299 else:
300 if "photo_id" not in request.POST:
301 messages.error(request, _("Invalid request [photo_id] missing"))
302 return HttpResponseRedirect(reverse_lazy("profile"))
304 if request.POST["photo_id"] == "bluesky":
305 # Handle Bluesky photo selection
306 email.photo = None
307 # Don't clear bluesky_handle - keep it as is
308 else:
309 try:
310 photo = self.model.objects.get( # pylint: disable=no-member
311 id=request.POST["photo_id"], user=request.user
312 )
313 except self.model.DoesNotExist: # pylint: disable=no-member
314 messages.error(request, _("Photo does not exist"))
315 return HttpResponseRedirect(reverse_lazy("profile"))
316 email.photo = photo
317 email.bluesky_handle = None
318 email.save()
320 messages.success(request, _("Successfully changed photo"))
321 return HttpResponseRedirect(reverse_lazy("profile"))
323 def get_context_data(self, **kwargs):
324 data = super().get_context_data(**kwargs)
325 data["email"] = ConfirmedEmail.objects.get(pk=kwargs["email_id"])
326 return data
329@method_decorator(login_required, name="dispatch")
330class AssignPhotoOpenIDView(SuccessMessageMixin, TemplateView):
331 """
332 View class for assigning a photo to an openid address
333 """
335 model = Photo
336 template_name = "assign_photo_openid.html"
338 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
339 """
340 Handle post - assign photo to openid
341 """
342 photo = None
344 try:
345 openid = ConfirmedOpenId.objects.get( # pylint: disable=no-member
346 user=request.user, id=kwargs["openid_id"]
347 )
348 except ConfirmedOpenId.DoesNotExist: # pylint: disable=no-member
349 messages.error(request, _("Invalid request"))
350 return HttpResponseRedirect(reverse_lazy("profile"))
352 if "photoNone" in request.POST:
353 openid.photo = None
354 else:
355 if "photo_id" not in request.POST:
356 messages.error(request, _("Invalid request [photo_id] missing"))
357 return HttpResponseRedirect(reverse_lazy("profile"))
359 try:
360 photo = self.model.objects.get( # pylint: disable=no-member
361 id=request.POST["photo_id"], user=request.user
362 )
363 except self.model.DoesNotExist: # pylint: disable=no-member
364 messages.error(request, _("Photo does not exist"))
365 return HttpResponseRedirect(reverse_lazy("profile"))
366 openid.photo = photo
367 openid.bluesky_handle = None
368 openid.save()
370 messages.success(request, _("Successfully changed photo"))
371 return HttpResponseRedirect(reverse_lazy("profile"))
373 def get_context_data(self, **kwargs):
374 data = super().get_context_data(**kwargs)
375 data["openid"] = ConfirmedOpenId.objects.get(
376 pk=kwargs["openid_id"]
377 ) # pylint: disable=no-member
378 return data
381@method_decorator(login_required, name="dispatch")
382class AssignBlueskyHandleToEmailView(SuccessMessageMixin, TemplateView):
383 """
384 View class for assigning a Bluesky handle to an email address
385 """
387 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
388 """
389 Handle post request - assign bluesky handle to email
390 """
392 try:
393 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
394 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
395 messages.error(request, _("Invalid request"))
396 return HttpResponseRedirect(reverse_lazy("profile"))
398 if "bluesky_handle" not in request.POST:
399 messages.error(request, _("Invalid request [bluesky_handle] missing"))
400 return HttpResponseRedirect(reverse_lazy("profile"))
401 bluesky_handle = request.POST["bluesky_handle"]
403 try:
404 bs = Bluesky()
406 bs.get_avatar(bluesky_handle)
407 except Exception as e:
408 messages.error(request, _(f"Handle '{bluesky_handle}' not found: {e}"))
409 return HttpResponseRedirect(
410 reverse_lazy(
411 "assign_photo_email", kwargs={"email_id": int(kwargs["email_id"])}
412 )
413 )
414 try:
415 email.set_bluesky_handle(bluesky_handle)
416 except Exception as e:
417 messages.error(request, _(f"Error: {e}"))
418 return HttpResponseRedirect(
419 reverse_lazy(
420 "assign_photo_email", kwargs={"email_id": int(kwargs["email_id"])}
421 )
422 )
423 email.photo = None
424 email.save()
426 messages.success(request, _("Successfully assigned Bluesky handle"))
427 return HttpResponseRedirect(reverse_lazy("profile"))
429 def get_context_data(self, **kwargs):
430 data = super().get_context_data(**kwargs)
431 data["email"] = ConfirmedEmail.objects.get(pk=kwargs["email_id"])
432 return data
435@method_decorator(login_required, name="dispatch")
436class AssignBlueskyHandleToOpenIdView(SuccessMessageMixin, TemplateView):
437 """
438 View class for assigning a Bluesky handle to an email address
439 """
441 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
442 """
443 Handle post request - assign bluesky handle to email
444 """
446 try:
447 openid = ConfirmedOpenId.objects.get(
448 user=request.user, id=kwargs["open_id"]
449 )
450 except ConfirmedOpenId.DoesNotExist: # pylint: disable=no-member
451 messages.error(request, _("Invalid request"))
452 return HttpResponseRedirect(reverse_lazy("profile"))
454 if "bluesky_handle" not in request.POST:
455 messages.error(request, _("Invalid request [bluesky_handle] missing"))
456 return HttpResponseRedirect(reverse_lazy("profile"))
457 bluesky_handle = request.POST["bluesky_handle"]
459 try:
460 bs = Bluesky()
462 bs.get_avatar(bluesky_handle)
463 except Exception as e:
464 messages.error(request, _(f"Handle '{bluesky_handle}' not found: {e}"))
465 return HttpResponseRedirect(
466 reverse_lazy(
467 "assign_photo_openid", kwargs={"openid_id": int(kwargs["open_id"])}
468 )
469 )
470 try:
471 openid.set_bluesky_handle(bluesky_handle)
472 except Exception as e:
473 messages.error(request, _(f"Error: {e}"))
474 return HttpResponseRedirect(
475 reverse_lazy(
476 "assign_photo_openid", kwargs={"openid_id": int(kwargs["open_id"])}
477 )
478 )
479 openid.photo = None
480 openid.save()
482 messages.success(request, _("Successfully assigned Bluesky handle"))
483 return HttpResponseRedirect(reverse_lazy("profile"))
485 def get_context_data(self, **kwargs):
486 data = super().get_context_data(**kwargs)
487 data["openid"] = ConfirmedOpenId.objects.get(pk=kwargs["open_id"])
488 return data
491@method_decorator(login_required, name="dispatch")
492class ImportPhotoView(SuccessMessageMixin, TemplateView):
493 """
494 View class to import a photo from another service
495 Currently only Gravatar is supported
496 """
498 template_name = "import_photo.html"
500 def get_context_data(self, **kwargs):
501 context = super().get_context_data(**kwargs)
502 context["photos"] = []
503 addr = None
504 if "email_id" in kwargs:
505 try:
506 addr = ConfirmedEmail.objects.get(pk=kwargs["email_id"]).email
507 except ConfirmedEmail.ObjectDoesNotExist: # pylint: disable=no-member
508 messages.error(self.request, _("Address does not exist"))
509 return context
511 if addr := kwargs.get("email_addr", None):
512 if gravatar := get_gravatar_photo(addr):
513 context["photos"].append(gravatar)
515 if libravatar_service_url := libravatar_url(
516 email=addr,
517 default=404,
518 size=AVATAR_MAX_SIZE,
519 ):
520 try:
521 urlopen(libravatar_service_url)
522 except OSError as exc:
523 logger.warning(f"Exception caught during photo import: {exc}")
524 else:
525 context["photos"].append(
526 {
527 "service_url": libravatar_service_url,
528 "thumbnail_url": f"{libravatar_service_url}&s=80",
529 "image_url": f"{libravatar_service_url}&s=512",
530 "width": 80,
531 "height": 80,
532 "service_name": "Libravatar",
533 }
534 )
536 return context
538 def post(
539 self, request, *args, **kwargs
540 ): # pylint: disable=no-self-use,unused-argument,too-many-branches,line-too-long
541 """
542 Handle post to photo import
543 """
545 imported = None
547 email_id = kwargs.get("email_id", request.POST.get("email_id", None))
548 addr = kwargs.get("email", request.POST.get("email_addr", None))
550 if email_id:
551 email = ConfirmedEmail.objects.filter(id=email_id, user=request.user)
552 if email.exists():
553 addr = email.first().email
554 else:
555 messages.error(request, _("Address does not exist"))
556 return HttpResponseRedirect(reverse_lazy("profile"))
558 if "photo_Gravatar" in request.POST:
559 photo = Photo()
560 photo.user = request.user
561 photo.ip_address = get_client_ip(request)[0]
562 if photo.import_image("Gravatar", addr):
563 messages.success(request, _("Gravatar image successfully imported"))
564 else:
565 # Honestly, I'm not sure how to test this...
566 messages.error(
567 request, _("Gravatar image import not successful")
568 ) # pragma: no cover
569 imported = True
571 if "photo_Libravatar" in request.POST:
572 photo = Photo()
573 photo.user = request.user
574 photo.ip_address = get_client_ip(request)[0]
575 if photo.import_image("Libravatar", addr):
576 messages.success(request, _("Libravatar image successfully imported"))
577 else:
578 # Honestly, I'm not sure how to test this...
579 messages.error(
580 request, _("Libravatar image import not successful")
581 ) # pragma: no cover
582 imported = True
583 if not imported:
584 messages.warning(request, _("Nothing importable"))
585 return HttpResponseRedirect(reverse_lazy("profile"))
588@method_decorator(login_required, name="dispatch")
589class RawImageView(DetailView):
590 """
591 View to return (binary) raw image data, for use in <img/>-tags
592 """
594 model = Photo
596 def get(self, request, *args, **kwargs):
597 photo = self.model.objects.get(pk=kwargs["pk"]) # pylint: disable=no-member
598 if photo.user.id != request.user.id and not request.user.is_staff:
599 return HttpResponseRedirect(reverse_lazy("home"))
600 return HttpResponse(BytesIO(photo.data), content_type=f"image/{photo.format}")
603@method_decorator(login_required, name="dispatch")
604class DeletePhotoView(SuccessMessageMixin, View):
605 """
606 View class for deleting a photo
607 """
609 model = Photo
611 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument
612 """
613 Handle get - delete photo
614 """
615 try:
616 photo = self.model.objects.get( # pylint: disable=no-member
617 pk=kwargs["pk"], user=request.user
618 )
619 photo.delete()
620 except (self.model.DoesNotExist, ProtectedError): # pylint: disable=no-member
621 messages.error(request, _("No such image or no permission to delete it"))
622 return HttpResponseRedirect(reverse_lazy("profile"))
623 messages.success(request, _("Photo deleted successfully"))
624 return HttpResponseRedirect(reverse_lazy("profile"))
627@method_decorator(login_required, name="dispatch")
628class UploadPhotoView(SuccessMessageMixin, FormView):
629 """
630 View class responsible for photo upload with enhanced security
631 """
633 model = Photo
634 template_name = "upload_photo.html"
635 form_class = UploadPhotoForm
636 success_message = _("Successfully uploaded")
637 success_url = reverse_lazy("profile")
639 def post(self, request, *args, **kwargs):
640 # Check maximum number of photos
641 num_photos = request.user.photo_set.count()
642 if num_photos >= MAX_NUM_PHOTOS:
643 messages.error(
644 request, _("Maximum number of photos (%i) reached" % MAX_NUM_PHOTOS)
645 )
646 return HttpResponseRedirect(reverse_lazy("profile"))
648 return super().post(request, *args, **kwargs)
650 @trace_file_upload("photo_upload")
651 def form_valid(self, form):
652 photo_data = self.request.FILES["photo"]
654 # Additional size check (redundant but good for security)
655 if photo_data.size > MAX_PHOTO_SIZE:
656 messages.error(self.request, _("Image too big"))
657 avatar_metrics.record_file_upload(
658 file_size=photo_data.size,
659 content_type=photo_data.content_type,
660 success=False,
661 )
662 return HttpResponseRedirect(reverse_lazy("profile"))
664 # Enhanced security logging
665 security_logger.info(
666 f"Photo upload attempt by user {self.request.user.id} "
667 f"from IP {get_client_ip(self.request)[0]}, "
668 f"file size: {photo_data.size} bytes"
669 )
671 photo = form.save(self.request, photo_data)
673 if not photo:
674 security_logger.warning(
675 f"Photo upload failed for user {self.request.user.id} - invalid format"
676 )
677 messages.error(self.request, _("Invalid Format"))
678 avatar_metrics.record_file_upload(
679 file_size=photo_data.size,
680 content_type=photo_data.content_type,
681 success=False,
682 )
683 return HttpResponseRedirect(reverse_lazy("profile"))
685 # Log successful upload
686 security_logger.info(
687 f"Photo uploaded successfully by user {self.request.user.id}, "
688 f"photo ID: {photo.pk}"
689 )
691 # Record successful file upload metrics
692 avatar_metrics.record_file_upload(
693 file_size=photo_data.size,
694 content_type=photo_data.content_type,
695 success=True,
696 )
698 # Override success URL -> Redirect to crop page.
699 self.success_url = reverse_lazy("crop_photo", args=[photo.pk])
700 return super().form_valid(form)
703@method_decorator(login_required, name="dispatch")
704class AddOpenIDView(SuccessMessageMixin, FormView):
705 """
706 View class for adding OpenID
707 """
709 template_name = "add_openid.html"
710 form_class = AddOpenIDForm
711 success_url = reverse_lazy("profile")
713 def form_valid(self, form):
714 if openid_id := form.save(self.request.user):
715 # At this point we have an unconfirmed OpenID, but
716 # we do not add the message, that we successfully added it,
717 # since this is misleading
718 return HttpResponseRedirect(
719 reverse_lazy("openid_redirection", args=[openid_id])
720 )
721 else:
722 return render(self.request, self.template_name, {"form": form})
725@method_decorator(login_required, name="dispatch")
726class RemoveUnconfirmedOpenIDView(View):
727 """
728 View class for removing a unconfirmed OpenID
729 """
731 model = UnconfirmedOpenId
733 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
734 """
735 Handle post - remove unconfirmed openid
736 """
737 try:
738 openid = self.model.objects.get( # pylint: disable=no-member
739 user=request.user, id=kwargs["openid_id"]
740 )
741 openid.delete()
742 messages.success(request, _("ID removed"))
743 except (
744 self.model.DoesNotExist
745 ): # pragma: no cover pylint: disable=no-member,line-too-long
746 messages.error(request, _("ID does not exist"))
747 return HttpResponseRedirect(reverse_lazy("profile"))
750@method_decorator(login_required, name="dispatch")
751class RemoveConfirmedOpenIDView(View):
752 """
753 View class for removing a confirmed OpenID
754 """
756 model = ConfirmedOpenId
758 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
759 """
760 Handle post - remove confirmed openid
761 """
762 try:
763 openid = self.model.objects.get( # pylint: disable=no-member
764 user=request.user, id=kwargs["openid_id"]
765 )
766 try:
767 openidobj = (
768 UserOpenID.objects.get( # pylint: disable=no-member,line-too-long
769 user_id=request.user.id, claimed_id=openid.openid
770 )
771 )
772 openidobj.delete()
773 except Exception as exc: # pylint: disable=broad-except
774 # Why it is not there?
775 logger.warning(f"How did we get here: {exc}")
776 openid.delete()
777 messages.success(request, _("ID removed"))
778 except self.model.DoesNotExist: # pylint: disable=no-member
779 messages.error(request, _("ID does not exist"))
780 return HttpResponseRedirect(reverse_lazy("profile"))
783@method_decorator(login_required, name="dispatch")
784class RedirectOpenIDView(View):
785 """
786 Redirect view for OpenID
787 """
789 model = UnconfirmedOpenId
791 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument
792 """
793 Handle get for OpenID redirect view
794 """
795 try:
796 unconfirmed = self.model.objects.get( # pylint: disable=no-member
797 user=request.user, id=kwargs["openid_id"]
798 )
799 except (
800 self.model.DoesNotExist
801 ): # pragma: no cover pylint: disable=no-member,line-too-long
802 messages.error(request, _("ID does not exist"))
803 return HttpResponseRedirect(reverse_lazy("profile"))
805 user_url = unconfirmed.openid
806 session = {"id": request.session.session_key}
808 oidutil.log = openid_logging
809 openid_consumer = consumer.Consumer(session, DjangoOpenIDStore())
811 try:
812 auth_request = openid_consumer.begin(user_url)
813 except consumer.DiscoveryFailure as exc:
814 messages.error(request, _(f"OpenID discovery failed: {exc}"))
815 return HttpResponseRedirect(reverse_lazy("profile"))
816 except UnicodeDecodeError as exc: # pragma: no cover
817 msg = _(
818 "OpenID discovery failed (userid=%(userid)s) for "
819 "%(userurl)s: %(message)s"
820 % {
821 "userid": request.user.id,
822 "userurl": user_url.encode("utf-8"),
823 "message": exc,
824 }
825 )
826 logger.error(f"message: {msg}")
827 messages.error(request, msg)
829 if auth_request is None: # pragma: no cover
830 messages.error(request, _("OpenID discovery failed"))
831 return HttpResponseRedirect(reverse_lazy("profile"))
833 realm = request.build_absolute_uri("/")[:-1] # pragma: no cover
834 return_url = realm + reverse( # pragma: no cover
835 "confirm_openid", args=[kwargs["openid_id"]]
836 )
837 return HttpResponseRedirect( # pragma: no cover
838 auth_request.redirectURL(realm, return_url)
839 )
842@method_decorator(login_required, name="dispatch")
843class ConfirmOpenIDView(View): # pragma: no cover
844 """
845 Confirm OpenID view
846 """
848 model = UnconfirmedOpenId
849 model_confirmed = ConfirmedOpenId
851 def do_request(self, data, *args, **kwargs): # pylint: disable=unused-argument
852 """
853 Handle request, called by get() or post()
854 """
855 session = {"id": self.request.session.session_key}
856 current_url = self.request.build_absolute_uri("/")[:-1] + self.request.path
857 openid_consumer = consumer.Consumer(session, DjangoOpenIDStore())
858 info = openid_consumer.complete(data, current_url)
859 if info.status == consumer.FAILURE:
860 messages.error(
861 self.request, _('Confirmation failed: "') + str(info.message) + '"'
862 )
863 return HttpResponseRedirect(reverse_lazy("profile"))
865 if info.status == consumer.CANCEL:
866 messages.error(self.request, _("Cancelled by user"))
867 return HttpResponseRedirect(reverse_lazy("profile"))
869 if info.status != consumer.SUCCESS:
870 messages.error(self.request, _("Unknown verification error"))
871 return HttpResponseRedirect(reverse_lazy("profile"))
873 try:
874 unconfirmed = self.model.objects.get( # pylint: disable=no-member
875 user=self.request.user, id=kwargs["openid_id"]
876 )
877 except self.model.DoesNotExist: # pylint: disable=no-member
878 messages.error(self.request, _("ID does not exist"))
879 return HttpResponseRedirect(reverse_lazy("profile"))
881 # TODO: Check for a reasonable expiration time
882 confirmed = self.model_confirmed()
883 confirmed.user = unconfirmed.user
884 confirmed.ip_address = get_client_ip(self.request)[0]
885 confirmed.openid = unconfirmed.openid
886 confirmed.save()
888 unconfirmed.delete()
890 # If there is a single image in this user's profile
891 # assign it to the new id
892 if self.request.user.photo_set.count() == 1:
893 confirmed.set_photo(self.request.user.photo_set.first())
895 # Also allow user to login using this OpenID (if not already taken)
896 if not UserOpenID.objects.filter( # pylint: disable=no-member
897 claimed_id=confirmed.openid
898 ).exists():
899 user_openid = UserOpenID()
900 user_openid.user = self.request.user
901 user_openid.claimed_id = confirmed.openid
902 user_openid.display_id = confirmed.openid
903 user_openid.save()
904 return HttpResponseRedirect(reverse_lazy("profile"))
906 def get(self, request, *args, **kwargs):
907 """
908 Handle get - confirm openid
909 """
910 return self.do_request(request.GET, *args, **kwargs)
912 def post(self, request, *args, **kwargs):
913 """
914 Handle post - confirm openid
915 """
916 return self.do_request(request.POST, *args, **kwargs)
919@method_decorator(login_required, name="dispatch")
920class CropPhotoView(TemplateView):
921 """
922 View class for cropping photos
923 """
925 template_name = "crop_photo.html"
926 success_url = reverse_lazy("profile")
927 model = Photo
929 def get(self, request, *args, **kwargs):
930 photo = self.model.objects.get(
931 pk=kwargs["pk"], user=request.user
932 ) # pylint: disable=no-member
933 email = request.GET.get("email")
934 openid = request.GET.get("openid")
935 return render(
936 self.request,
937 self.template_name,
938 {
939 "photo": photo,
940 "email": email,
941 "openid": openid,
942 },
943 )
945 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
946 """
947 Handle post - crop photo
948 """
949 photo = self.model.objects.get(
950 pk=kwargs["pk"], user=request.user
951 ) # pylint: disable=no-member
952 dimensions = {
953 "x": int(float(request.POST["x"])),
954 "y": int(float(request.POST["y"])),
955 "w": int(float(request.POST["w"])),
956 "h": int(float(request.POST["h"])),
957 }
958 email = openid = None
959 if "email" in request.POST:
960 with contextlib.suppress(ConfirmedEmail.DoesNotExist):
961 email = ConfirmedEmail.objects.get(email=request.POST["email"])
962 if "openid" in request.POST:
963 with contextlib.suppress(ConfirmedOpenId.DoesNotExist):
964 openid = ConfirmedOpenId.objects.get( # pylint: disable=no-member
965 openid=request.POST["openid"]
966 )
967 return photo.perform_crop(request, dimensions, email, openid)
970@method_decorator(login_required, name="dispatch") # pylint: disable=too-many-ancestors
971class UserPreferenceView(FormView, UpdateView):
972 """
973 View class for user preferences view/update
974 """
976 template_name = "preferences.html"
977 model = UserPreference
978 form_class = UpdatePreferenceForm
979 success_url = reverse_lazy("user_preference")
981 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
982 """
983 Process POST-ed data from this form
984 """
985 userpref = None
986 try:
987 userpref = self.request.user.userpreference
988 except ObjectDoesNotExist:
989 userpref = UserPreference(user=self.request.user)
990 userpref.theme = request.POST["theme"]
991 userpref.save()
992 try:
993 if request.POST["email"] != self.request.user.email:
994 addresses = list(
995 self.request.user.confirmedemail_set.all().values_list(
996 "email", flat=True
997 )
998 )
999 if request.POST["email"] not in addresses:
1000 messages.error(
1001 self.request,
1002 _(f'Mail address not allowed: {request.POST["email"]}'),
1003 )
1004 else:
1005 self.request.user.email = request.POST["email"]
1006 self.request.user.save()
1007 messages.info(self.request, _("Mail address changed."))
1008 except Exception as e: # pylint: disable=broad-except
1009 messages.error(self.request, _(f"Error setting new mail address: {e}"))
1011 try:
1012 if request.POST["first_name"] or request.POST["last_name"]:
1013 if request.POST["first_name"] != self.request.user.first_name:
1014 self.request.user.first_name = request.POST["first_name"]
1015 messages.info(self.request, _("First name changed."))
1016 if request.POST["last_name"] != self.request.user.last_name:
1017 self.request.user.last_name = request.POST["last_name"]
1018 messages.info(self.request, _("Last name changed."))
1019 self.request.user.save()
1020 except Exception as e: # pylint: disable=broad-except
1021 messages.error(self.request, _(f"Error setting names: {e}"))
1023 return HttpResponseRedirect(reverse_lazy("user_preference"))
1025 def get(self, request, *args, **kwargs):
1026 return render(
1027 self.request,
1028 self.template_name,
1029 {
1030 "THEMES": UserPreference.THEMES,
1031 },
1032 )
1034 def get_object(self, queryset=None):
1035 (obj, created) = UserPreference.objects.get_or_create(
1036 user=self.request.user
1037 ) # pylint: disable=no-member,unused-variable
1038 return obj
1041@method_decorator(login_required, name="dispatch")
1042class UploadLibravatarExportView(SuccessMessageMixin, FormView):
1043 """
1044 View class responsible for libravatar user data export upload
1045 """
1047 template_name = "upload_libravatar_export.html"
1048 form_class = UploadLibravatarExportForm
1049 success_message = _("Successfully uploaded")
1050 success_url = reverse_lazy("profile")
1051 model = User
1053 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
1054 """
1055 Handle post request - choose items to import
1056 """
1057 if "save" in kwargs: # pylint: disable=too-many-nested-blocks
1058 if kwargs["save"] == "save":
1059 for arg in request.POST:
1060 if arg.startswith("email_"):
1061 email = request.POST[arg]
1062 if not ConfirmedEmail.objects.filter(
1063 email=email
1064 ) and not UnconfirmedEmail.objects.filter(
1065 email=email
1066 ): # pylint: disable=no-member
1067 try:
1068 unconfirmed = UnconfirmedEmail.objects.create( # pylint: disable=no-member
1069 user=request.user, email=email
1070 )
1071 unconfirmed.save()
1072 unconfirmed.send_confirmation_mail(
1073 url=request.build_absolute_uri("/")[:-1]
1074 )
1075 messages.info(
1076 request,
1077 "%s: %s"
1078 % (
1079 email,
1080 _(
1081 "address added successfully,\
1082 confirmation mail sent"
1083 ),
1084 ),
1085 )
1086 except Exception as exc: # pylint: disable=broad-except
1087 # DEBUG
1088 print(
1089 f"Exception during adding mail address ({email}): {exc}"
1090 )
1092 if arg.startswith("photo"):
1093 try:
1094 data = base64.decodebytes(bytes(request.POST[arg], "utf-8"))
1095 except binascii.Error as exc:
1096 logger.warning(f"Cannot decode photo: {exc}")
1097 continue
1098 try:
1099 pilobj = Image.open(BytesIO(data))
1100 out = BytesIO()
1101 pilobj.save(out, pilobj.format, quality=JPEG_QUALITY)
1102 out.seek(0)
1103 photo = Photo()
1104 photo.user = request.user
1105 photo.ip_address = get_client_ip(request)[0]
1106 photo.format = file_format(pilobj.format)
1107 photo.data = out.read()
1108 photo.save()
1109 except Exception as exc: # pylint: disable=broad-except
1110 logger.error(f"Exception during save: {exc}")
1111 continue
1113 return HttpResponseRedirect(reverse_lazy("profile"))
1114 return super().post(request, args, kwargs)
1116 def form_valid(self, form):
1117 data = self.request.FILES["export_file"]
1118 try:
1119 items = libravatar_read_gzdata(data.read())
1120 # DEBUG print(items)
1121 return render(
1122 self.request,
1123 "choose_libravatar_export.html",
1124 {
1125 "emails": items["emails"],
1126 "photos": items["photos"],
1127 },
1128 )
1129 except Exception as e:
1130 messages.error(self.request, _(f"Unable to parse file: {e}"))
1131 return HttpResponseRedirect(reverse_lazy("upload_export"))
1134@method_decorator(login_required, name="dispatch")
1135class ResendConfirmationMailView(View):
1136 """
1137 View class for resending confirmation mail
1138 """
1140 model = UnconfirmedEmail
1142 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument
1143 """
1144 Handle post - resend confirmation mail for unconfirmed e-mail address
1145 """
1146 try:
1147 email = self.model.objects.get( # pylint: disable=no-member
1148 user=request.user, id=kwargs["email_id"]
1149 )
1150 except self.model.DoesNotExist: # pragma: no cover pylint: disable=no-member
1151 messages.error(request, _("ID does not exist"))
1152 else:
1153 try:
1154 email.send_confirmation_mail(url=request.build_absolute_uri("/")[:-1])
1155 messages.success(
1156 request, f'{_("Confirmation mail sent to")}: {email.email}'
1157 )
1158 except Exception as exc: # pylint: disable=broad-except
1159 messages.error(
1160 request,
1161 f'{_("Unable to send confirmation email for")} {email.email}: {exc}',
1162 )
1163 return HttpResponseRedirect(reverse_lazy("profile"))
1166class IvatarLoginView(LoginView):
1167 """
1168 View class for login
1169 """
1171 template_name = "login.html"
1173 @trace_authentication("login_attempt")
1174 def get(self, request, *args, **kwargs):
1175 """
1176 Handle get for login view
1177 """
1178 if request.user:
1179 if request.user.is_authenticated:
1180 # Respect the 'next' parameter if present
1181 next_url = request.GET.get("next")
1182 if next_url:
1183 return HttpResponseRedirect(next_url)
1184 return HttpResponseRedirect(reverse_lazy("profile"))
1185 return super().get(self, request, args, kwargs)
1187 @trace_authentication("login_post")
1188 def post(self, request, *args, **kwargs):
1189 """
1190 Handle login form submission
1191 """
1192 return super().post(request, *args, **kwargs)
1194 def get_context_data(self, **kwargs):
1195 context = super().get_context_data(**kwargs)
1196 context["with_fedora"] = SOCIAL_AUTH_FEDORA_KEY is not None
1197 return context
1200@method_decorator(login_required, name="dispatch")
1201class ProfileView(TemplateView):
1202 """
1203 View class for profile
1204 """
1206 template_name = "profile.html"
1208 def get(self, request, *args, **kwargs):
1209 if "profile_username" in kwargs:
1210 if not request.user.is_staff:
1211 return HttpResponseRedirect(reverse_lazy("profile"))
1212 with contextlib.suppress(Exception):
1213 u = User.objects.get(username=kwargs["profile_username"])
1214 request.user = u
1215 self._confirm_claimed_openid()
1216 return super().get(self, request, args, kwargs)
1218 def get_context_data(self, **kwargs):
1219 """
1220 Provide additional context data, like if max_photos is reached
1221 already or not.
1222 """
1223 context = super().get_context_data(**kwargs)
1224 context["max_photos"] = False
1225 if self.request.user:
1226 if self.request.user.photo_set.all().count() >= MAX_NUM_PHOTOS:
1227 context["max_photos"] = True
1228 return context
1230 def _confirm_claimed_openid(self):
1231 openids = self.request.user.useropenid_set.all()
1232 # If there is only one OpenID, we eventually need to add it to
1233 # the user account
1234 if openids.count() == 1:
1235 # Already confirmed, skip
1236 if ConfirmedOpenId.objects.filter( # pylint: disable=no-member
1237 openid=openids.first().claimed_id
1238 ).exists():
1239 return
1240 # For whatever reason, this is in unconfirmed state, skip
1241 if UnconfirmedOpenId.objects.filter( # pylint: disable=no-member
1242 openid=openids.first().claimed_id
1243 ).exists():
1244 return
1245 logger.debug(f"need to confirm: {openids.first()}")
1246 confirmed = ConfirmedOpenId()
1247 confirmed.user = self.request.user
1248 confirmed.ip_address = get_client_ip(self.request)[0]
1249 confirmed.openid = openids.first().claimed_id
1250 confirmed.save()
1253class PasswordResetView(PasswordResetViewOriginal):
1254 """
1255 View class for password reset
1256 """
1258 def post(self, request, *args, **kwargs):
1259 """
1260 Since we have the mail addresses in ConfirmedEmail model,
1261 we need to set the email on the user object in order for the
1262 PasswordResetView class to pick up the correct user.
1263 In case we have the mail address in the User object, we still
1264 need to assign a random password in order for PasswordResetView
1265 class to pick up the user - else it will silently do nothing.
1266 """
1267 if "email" in request.POST:
1268 user = None
1270 # Try to find the user via the normal user class
1271 # TODO: How to handle the case that multiple user accounts
1272 # could have the same password set?
1273 user = User.objects.filter(email=request.POST["email"]).first()
1275 # If we didn't find the user in the previous step,
1276 # try the ConfirmedEmail class instead.
1277 # If we find the user there, we need to set the mail
1278 # attribute on the user object accordingly
1279 if not user:
1280 with contextlib.suppress(ObjectDoesNotExist):
1281 confirmed_email = ConfirmedEmail.objects.get(
1282 email=request.POST["email"]
1283 )
1284 user = confirmed_email.user
1285 user.email = confirmed_email.email
1286 user.save()
1287 # If we found the user, set a random password. Else, the
1288 # ResetPasswordView class will silently ignore the password
1289 # reset request
1290 if user:
1291 if not user.password or user.password.startswith("!"):
1292 random_pass = get_random_string(12)
1293 user.set_password(random_pass)
1294 user.save()
1296 # Whatever happens above, let the original function handle the rest
1297 return super().post(self, request, args, kwargs)
1300@method_decorator(login_required, name="dispatch")
1301class DeleteAccountView(SuccessMessageMixin, FormView):
1302 """
1303 View class for account deletion
1304 """
1306 template_name = "delete.html"
1307 form_class = DeleteAccountForm
1308 success_url = reverse_lazy("home")
1310 def get(self, request, *args, **kwargs):
1311 return super().get(self, request, args, kwargs)
1313 def post(self, request, *args, **kwargs):
1314 """
1315 Handle account deletion
1316 """
1317 if request.user.password:
1318 if "password" in request.POST:
1319 if not request.user.check_password(request.POST["password"]):
1320 messages.error(request, _("Incorrect password"))
1321 return HttpResponseRedirect(reverse_lazy("delete"))
1322 else:
1323 messages.error(request, _("No password given"))
1324 return HttpResponseRedirect(reverse_lazy("delete"))
1326 # should delete all confirmed/unconfirmed/photo objects
1327 request.user.delete()
1328 return super().post(self, request, args, kwargs)
1331@method_decorator(login_required, name="dispatch")
1332class ExportView(SuccessMessageMixin, TemplateView):
1333 """
1334 View class responsible for libravatar user data export
1335 """
1337 template_name = "export.html"
1338 model = User
1340 def get(self, request, *args, **kwargs):
1341 return super().get(self, request, args, kwargs)
1343 def post(self, request, *args, **kwargs):
1344 """
1345 Handle real export
1346 """
1347 SCHEMA_ROOT = "https://www.libravatar.org/schemas/export/0.2"
1348 SCHEMA_XSD = f"{SCHEMA_ROOT}/export.xsd"
1350 def xml_header():
1351 return (
1352 """<?xml version="1.0" encoding="UTF-8"?>"""
1353 '''<user xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"'''
1354 ''' xsi:schemaLocation="%s %s"'''
1355 """ xmlns="%s">\n""" % (SCHEMA_ROOT, SCHEMA_XSD, SCHEMA_ROOT)
1356 )
1358 def xml_footer():
1359 return "</user>\n"
1361 def xml_account(user):
1362 escaped_username = saxutils.quoteattr(user.username)
1363 escaped_password = saxutils.quoteattr(user.password)
1364 return " <account username={} password={}/>\n".format(
1365 escaped_username,
1366 escaped_password,
1367 )
1369 def xml_email(user):
1370 returnstring = " <emails>\n"
1371 for email in user.confirmedemail_set.all():
1372 returnstring += (
1373 ' <email photo_id="'
1374 + str(email.photo_id)
1375 + '">'
1376 + str(email.email)
1377 + "</email>"
1378 + "\n"
1379 )
1380 returnstring += " </emails>\n"
1381 return returnstring
1383 def xml_openid(user):
1384 returnstring = " <openids>\n"
1385 for openid in user.confirmedopenid_set.all():
1386 returnstring += (
1387 ' <openid photo_id="'
1388 + str(openid.photo_id)
1389 + '">'
1390 + str(openid.openid)
1391 + "</openid>"
1392 + "\n"
1393 )
1394 returnstring += " </openids>\n"
1395 return returnstring
1397 def xml_photos(user):
1398 s = " <photos>\n"
1399 for photo in user.photo_set.all():
1400 encoded_photo = base64.b64encode(photo.data)
1401 if encoded_photo:
1402 s += (
1403 """ <photo id="%s" encoding="base64" format=%s>"""
1404 """%s"""
1405 """</photo>\n"""
1406 % (photo.id, saxutils.quoteattr(photo.format), encoded_photo)
1407 )
1408 s += " </photos>\n"
1409 return s
1411 user = request.user
1413 photos = []
1414 for photo in user.photo_set.all():
1415 photo_details = {"data": photo.data, "format": photo.format}
1416 photos.append(photo_details)
1418 bytesobj = BytesIO()
1419 data = gzip.GzipFile(fileobj=bytesobj, mode="w")
1420 data.write(bytes(xml_header(), "utf-8"))
1421 data.write(bytes(xml_account(user), "utf-8"))
1422 data.write(bytes(xml_email(user), "utf-8"))
1423 data.write(bytes(xml_openid(user), "utf-8"))
1424 data.write(bytes(xml_photos(user), "utf-8"))
1425 data.write(bytes(xml_footer(), "utf-8"))
1426 data.close()
1427 bytesobj.seek(0)
1429 response = HttpResponse(content_type="application/gzip")
1430 response["Content-Disposition"] = (
1431 f'attachment; filename="libravatar-export_{user.username}.xml.gz"'
1432 )
1433 response.write(bytesobj.read())
1434 return response