Coverage for ivatar/ivataraccount/views.py: 75%
669 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 23:07 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-13 23:07 +0000
1# -*- coding: utf-8 -*-
2"""
3View classes for ivatar/ivataraccount/
4"""
6from io import BytesIO
7from ivatar.utils import urlopen, Bluesky
8import base64
9import binascii
10import contextlib
11from xml.sax import saxutils
12import gzip
14from PIL import Image
16from django.db.models import ProtectedError
17from django.core.exceptions import ObjectDoesNotExist
18from django.contrib.auth.decorators import login_required
19from django.contrib.auth.models import User
20from django.utils.decorators import method_decorator
21from django.contrib.messages.views import SuccessMessageMixin
22from django.contrib import messages
23from django.views.generic.edit import FormView, UpdateView
24from django.views.generic.base import View, TemplateView
25from django.views.generic.detail import DetailView
26from django.contrib.auth import authenticate, login
27from django.contrib.auth.forms import UserCreationForm, SetPasswordForm
28from django.contrib.auth.views import LoginView
29from django.contrib.auth.views import (
30 PasswordResetView as PasswordResetViewOriginal,
31)
32from django.utils.translation import gettext_lazy as _
33from django.http import HttpResponseRedirect, HttpResponse
34from django.urls import reverse_lazy, reverse
35from django.shortcuts import render
36from django_openid_auth.models import UserOpenID
38from openid import oidutil
39from openid.consumer import consumer
41from ipware import get_client_ip
43from email_validator import validate_email
45from libravatar import libravatar_url
46from ivatar.settings import (
47 MAX_NUM_PHOTOS,
48 MAX_PHOTO_SIZE,
49 JPEG_QUALITY,
50 AVATAR_MAX_SIZE,
51 SOCIAL_AUTH_FEDORA_KEY,
52)
53from .gravatar import get_photo as get_gravatar_photo
55from .forms import AddEmailForm, UploadPhotoForm, AddOpenIDForm
56from .forms import UpdatePreferenceForm, UploadLibravatarExportForm
57from .forms import DeleteAccountForm
58from .models import UnconfirmedEmail, ConfirmedEmail, Photo
59from .models import UnconfirmedOpenId, ConfirmedOpenId, DjangoOpenIDStore
60from .models import UserPreference
61from .models import file_format
62from .read_libravatar_export import read_gzdata as libravatar_read_gzdata
65def openid_logging(message, level=0):
66 """
67 Helper method for openid logging
68 """
69 # Normal messages are not that important
70 # No need for coverage here
71 if level > 0: # pragma: no cover
72 print(message)
75class CreateView(SuccessMessageMixin, FormView):
76 """
77 View class for creating a new user
78 """
80 template_name = "new.html"
81 form_class = UserCreationForm
83 def form_valid(self, form):
84 form.save()
85 user = authenticate(
86 username=form.cleaned_data["username"],
87 password=form.cleaned_data["password1"],
88 )
89 if user is not None:
90 # If the username looks like a mail address, automagically
91 # add it as unconfirmed mail and set it also as user's
92 # email address
93 with contextlib.suppress(Exception):
94 self._extracted_from_form_valid_(form, user)
95 login(self.request, user)
96 pref = UserPreference.objects.create(
97 user_id=user.pk
98 ) # pylint: disable=no-member
99 pref.save()
100 return HttpResponseRedirect(reverse_lazy("profile"))
101 return HttpResponseRedirect(reverse_lazy("login")) # pragma: no cover
103 def _extracted_from_form_valid_(self, form, user):
104 # This will error out if it's not a valid address
105 valid = validate_email(form.cleaned_data["username"])
106 user.email = valid.email
107 user.save()
108 # The following will also error out if it already exists
109 unconfirmed = UnconfirmedEmail()
110 unconfirmed.email = valid.email
111 unconfirmed.user = user
112 unconfirmed.save()
113 unconfirmed.send_confirmation_mail(
114 url=self.request.build_absolute_uri("/")[:-1]
115 )
117 def get(self, request, *args, **kwargs):
118 """
119 Handle get for create view
120 """
121 if request.user and request.user.is_authenticated:
122 return HttpResponseRedirect(reverse_lazy("profile"))
123 return super().get(self, request, args, kwargs)
126@method_decorator(login_required, name="dispatch")
127class PasswordSetView(SuccessMessageMixin, FormView):
128 """
129 View class for changing the password
130 """
132 template_name = "password_change.html"
133 form_class = SetPasswordForm
134 success_message = _("password changed successfully - please login again")
135 success_url = reverse_lazy("profile")
137 def get_form_kwargs(self):
138 kwargs = super(PasswordSetView, self).get_form_kwargs()
139 kwargs["user"] = self.request.user
140 return kwargs
142 def form_valid(self, form):
143 form.save()
144 super().form_valid(form)
145 return HttpResponseRedirect(reverse_lazy("login"))
148@method_decorator(login_required, name="dispatch")
149class AddEmailView(SuccessMessageMixin, FormView):
150 """
151 View class for adding email addresses
152 """
154 template_name = "add_email.html"
155 form_class = AddEmailForm
156 success_url = reverse_lazy("profile")
158 def form_valid(self, form):
159 if not form.save(self.request):
160 return render(self.request, self.template_name, {"form": form})
162 messages.success(self.request, _("Address added successfully"))
163 return super().form_valid(form)
166@method_decorator(login_required, name="dispatch")
167class RemoveUnconfirmedEmailView(SuccessMessageMixin, View):
168 """
169 View class for removing a unconfirmed email address
170 """
172 @staticmethod
173 def post(request, *args, **kwargs): # pylint: disable=unused-argument
174 """
175 Handle post request - removing unconfirmed email
176 """
177 try:
178 email = UnconfirmedEmail.objects.get( # pylint: disable=no-member
179 user=request.user, id=kwargs["email_id"]
180 )
181 email.delete()
182 messages.success(request, _("Address removed"))
183 except UnconfirmedEmail.DoesNotExist: # pylint: disable=no-member
184 messages.error(request, _("Address does not exist"))
185 return HttpResponseRedirect(reverse_lazy("profile"))
188class ConfirmEmailView(SuccessMessageMixin, TemplateView):
189 """
190 View class for confirming an unconfirmed email address
191 """
193 template_name = "email_confirmed.html"
195 def get(self, request, *args, **kwargs):
196 # be tolerant of extra crap added by mail clients
197 key = kwargs["verification_key"].replace(" ", "")
199 if len(key) != 64:
200 messages.error(request, _("Verification key incorrect"))
201 return HttpResponseRedirect(reverse_lazy("profile"))
203 try:
204 unconfirmed = UnconfirmedEmail.objects.get(
205 verification_key=key
206 ) # pylint: disable=no-member
207 except UnconfirmedEmail.DoesNotExist: # pylint: disable=no-member
208 messages.error(request, _("Verification key does not exist"))
209 return HttpResponseRedirect(reverse_lazy("profile"))
211 if ConfirmedEmail.objects.filter(email=unconfirmed.email).count() > 0:
212 messages.error(
213 request,
214 _("This mail address has been taken already and cannot be confirmed"),
215 )
216 return HttpResponseRedirect(reverse_lazy("profile"))
218 # TODO: Check for a reasonable expiration time in unconfirmed email
220 (confirmed_id, external_photos) = ConfirmedEmail.objects.create_confirmed_email(
221 unconfirmed.user, unconfirmed.email, not request.user.is_anonymous
222 )
224 unconfirmed.delete()
226 # if there's a single image in this user's profile,
227 # assign it to the new email
228 confirmed = ConfirmedEmail.objects.get(id=confirmed_id)
229 if confirmed.user.photo_set.count() == 1:
230 confirmed.set_photo(confirmed.user.photo_set.first())
231 kwargs["photos"] = external_photos
232 kwargs["email_id"] = confirmed_id
233 return super().get(request, *args, **kwargs)
236@method_decorator(login_required, name="dispatch")
237class RemoveConfirmedEmailView(SuccessMessageMixin, View):
238 """
239 View class for removing a confirmed email address
240 """
242 @staticmethod
243 def post(request, *args, **kwargs): # pylint: disable=unused-argument
244 """
245 Handle post request - removing confirmed email
246 """
247 try:
248 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
249 email.delete()
250 messages.success(request, _("Address removed"))
251 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
252 messages.error(request, _("Address does not exist"))
253 return HttpResponseRedirect(reverse_lazy("profile"))
256@method_decorator(login_required, name="dispatch")
257class AssignPhotoEmailView(SuccessMessageMixin, TemplateView):
258 """
259 View class for assigning a photo to an email address
260 """
262 model = Photo
263 template_name = "assign_photo_email.html"
265 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
266 """
267 Handle post request - assign photo to email
268 """
269 photo = None
271 try:
272 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
273 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
274 messages.error(request, _("Invalid request"))
275 return HttpResponseRedirect(reverse_lazy("profile"))
277 if "photoNone" in request.POST:
278 email.photo = None
279 email.bluesky_handle = None
280 elif "photoBluesky" in request.POST:
281 # Keep the existing Bluesky handle, clear the photo
282 email.photo = None
283 # Don't clear bluesky_handle - keep it as is
284 else:
285 if "photo_id" not in request.POST:
286 messages.error(request, _("Invalid request [photo_id] missing"))
287 return HttpResponseRedirect(reverse_lazy("profile"))
289 if request.POST["photo_id"] == "bluesky":
290 # Handle Bluesky photo selection
291 email.photo = None
292 # Don't clear bluesky_handle - keep it as is
293 else:
294 try:
295 photo = self.model.objects.get( # pylint: disable=no-member
296 id=request.POST["photo_id"], user=request.user
297 )
298 except self.model.DoesNotExist: # pylint: disable=no-member
299 messages.error(request, _("Photo does not exist"))
300 return HttpResponseRedirect(reverse_lazy("profile"))
301 email.photo = photo
302 email.bluesky_handle = None
303 email.save()
305 messages.success(request, _("Successfully changed photo"))
306 return HttpResponseRedirect(reverse_lazy("profile"))
308 def get_context_data(self, **kwargs):
309 data = super().get_context_data(**kwargs)
310 data["email"] = ConfirmedEmail.objects.get(pk=kwargs["email_id"])
311 return data
314@method_decorator(login_required, name="dispatch")
315class AssignPhotoOpenIDView(SuccessMessageMixin, TemplateView):
316 """
317 View class for assigning a photo to an openid address
318 """
320 model = Photo
321 template_name = "assign_photo_openid.html"
323 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
324 """
325 Handle post - assign photo to openid
326 """
327 photo = None
329 try:
330 openid = ConfirmedOpenId.objects.get( # pylint: disable=no-member
331 user=request.user, id=kwargs["openid_id"]
332 )
333 except ConfirmedOpenId.DoesNotExist: # pylint: disable=no-member
334 messages.error(request, _("Invalid request"))
335 return HttpResponseRedirect(reverse_lazy("profile"))
337 if "photoNone" in request.POST:
338 openid.photo = None
339 else:
340 if "photo_id" not in request.POST:
341 messages.error(request, _("Invalid request [photo_id] missing"))
342 return HttpResponseRedirect(reverse_lazy("profile"))
344 try:
345 photo = self.model.objects.get( # pylint: disable=no-member
346 id=request.POST["photo_id"], user=request.user
347 )
348 except self.model.DoesNotExist: # pylint: disable=no-member
349 messages.error(request, _("Photo does not exist"))
350 return HttpResponseRedirect(reverse_lazy("profile"))
351 openid.photo = photo
352 openid.bluesky_handle = None
353 openid.save()
355 messages.success(request, _("Successfully changed photo"))
356 return HttpResponseRedirect(reverse_lazy("profile"))
358 def get_context_data(self, **kwargs):
359 data = super().get_context_data(**kwargs)
360 data["openid"] = ConfirmedOpenId.objects.get(
361 pk=kwargs["openid_id"]
362 ) # pylint: disable=no-member
363 return data
366@method_decorator(login_required, name="dispatch")
367class AssignBlueskyHandleToEmailView(SuccessMessageMixin, TemplateView):
368 """
369 View class for assigning a Bluesky handle to an email address
370 """
372 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
373 """
374 Handle post request - assign bluesky handle to email
375 """
377 try:
378 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
379 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
380 messages.error(request, _("Invalid request"))
381 return HttpResponseRedirect(reverse_lazy("profile"))
383 if "bluesky_handle" not in request.POST:
384 messages.error(request, _("Invalid request [bluesky_handle] missing"))
385 return HttpResponseRedirect(reverse_lazy("profile"))
386 bluesky_handle = request.POST["bluesky_handle"]
388 try:
389 bs = Bluesky()
391 bs.get_avatar(bluesky_handle)
392 except Exception as e:
393 messages.error(request, _(f"Handle '{bluesky_handle}' not found: {e}"))
394 return HttpResponseRedirect(
395 reverse_lazy(
396 "assign_photo_email", kwargs={"email_id": int(kwargs["email_id"])}
397 )
398 )
399 try:
400 email.set_bluesky_handle(bluesky_handle)
401 except Exception as e:
402 messages.error(request, _(f"Error: {e}"))
403 return HttpResponseRedirect(
404 reverse_lazy(
405 "assign_photo_email", kwargs={"email_id": int(kwargs["email_id"])}
406 )
407 )
408 email.photo = None
409 email.save()
411 messages.success(request, _("Successfully assigned Bluesky handle"))
412 return HttpResponseRedirect(reverse_lazy("profile"))
414 def get_context_data(self, **kwargs):
415 data = super().get_context_data(**kwargs)
416 data["email"] = ConfirmedEmail.objects.get(pk=kwargs["email_id"])
417 return data
420@method_decorator(login_required, name="dispatch")
421class AssignBlueskyHandleToOpenIdView(SuccessMessageMixin, TemplateView):
422 """
423 View class for assigning a Bluesky handle to an email address
424 """
426 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
427 """
428 Handle post request - assign bluesky handle to email
429 """
431 try:
432 openid = ConfirmedOpenId.objects.get(
433 user=request.user, id=kwargs["open_id"]
434 )
435 except ConfirmedOpenId.DoesNotExist: # pylint: disable=no-member
436 messages.error(request, _("Invalid request"))
437 return HttpResponseRedirect(reverse_lazy("profile"))
439 if "bluesky_handle" not in request.POST:
440 messages.error(request, _("Invalid request [bluesky_handle] missing"))
441 return HttpResponseRedirect(reverse_lazy("profile"))
442 bluesky_handle = request.POST["bluesky_handle"]
444 try:
445 bs = Bluesky()
447 bs.get_avatar(bluesky_handle)
448 except Exception as e:
449 messages.error(request, _(f"Handle '{bluesky_handle}' not found: {e}"))
450 return HttpResponseRedirect(
451 reverse_lazy(
452 "assign_photo_openid", kwargs={"openid_id": int(kwargs["open_id"])}
453 )
454 )
455 try:
456 openid.set_bluesky_handle(bluesky_handle)
457 except Exception as e:
458 messages.error(request, _(f"Error: {e}"))
459 return HttpResponseRedirect(
460 reverse_lazy(
461 "assign_photo_openid", kwargs={"openid_id": int(kwargs["open_id"])}
462 )
463 )
464 openid.photo = None
465 openid.save()
467 messages.success(request, _("Successfully assigned Bluesky handle"))
468 return HttpResponseRedirect(reverse_lazy("profile"))
470 def get_context_data(self, **kwargs):
471 data = super().get_context_data(**kwargs)
472 data["openid"] = ConfirmedOpenId.objects.get(pk=kwargs["open_id"])
473 return data
476@method_decorator(login_required, name="dispatch")
477class ImportPhotoView(SuccessMessageMixin, TemplateView):
478 """
479 View class to import a photo from another service
480 Currently only Gravatar is supported
481 """
483 template_name = "import_photo.html"
485 def get_context_data(self, **kwargs):
486 context = super().get_context_data(**kwargs)
487 context["photos"] = []
488 addr = None
489 if "email_id" in kwargs:
490 try:
491 addr = ConfirmedEmail.objects.get(pk=kwargs["email_id"]).email
492 except ConfirmedEmail.ObjectDoesNotExist: # pylint: disable=no-member
493 messages.error(self.request, _("Address does not exist"))
494 return context
496 if addr := kwargs.get("email_addr", None):
497 if gravatar := get_gravatar_photo(addr):
498 context["photos"].append(gravatar)
500 if libravatar_service_url := libravatar_url(
501 email=addr,
502 default=404,
503 size=AVATAR_MAX_SIZE,
504 ):
505 try:
506 urlopen(libravatar_service_url)
507 except OSError as exc:
508 print(f"Exception caught during photo import: {exc}")
509 else:
510 context["photos"].append(
511 {
512 "service_url": libravatar_service_url,
513 "thumbnail_url": f"{libravatar_service_url}&s=80",
514 "image_url": f"{libravatar_service_url}&s=512",
515 "width": 80,
516 "height": 80,
517 "service_name": "Libravatar",
518 }
519 )
521 return context
523 def post(
524 self, request, *args, **kwargs
525 ): # pylint: disable=no-self-use,unused-argument,too-many-branches,line-too-long
526 """
527 Handle post to photo import
528 """
530 imported = None
532 email_id = kwargs.get("email_id", request.POST.get("email_id", None))
533 addr = kwargs.get("email", request.POST.get("email_addr", None))
535 if email_id:
536 email = ConfirmedEmail.objects.filter(id=email_id, user=request.user)
537 if email.exists():
538 addr = email.first().email
539 else:
540 messages.error(request, _("Address does not exist"))
541 return HttpResponseRedirect(reverse_lazy("profile"))
543 if "photo_Gravatar" in request.POST:
544 photo = Photo()
545 photo.user = request.user
546 photo.ip_address = get_client_ip(request)[0]
547 if photo.import_image("Gravatar", addr):
548 messages.success(request, _("Gravatar image successfully imported"))
549 else:
550 # Honestly, I'm not sure how to test this...
551 messages.error(
552 request, _("Gravatar image import not successful")
553 ) # pragma: no cover
554 imported = True
556 if "photo_Libravatar" in request.POST:
557 photo = Photo()
558 photo.user = request.user
559 photo.ip_address = get_client_ip(request)[0]
560 if photo.import_image("Libravatar", addr):
561 messages.success(request, _("Libravatar image successfully imported"))
562 else:
563 # Honestly, I'm not sure how to test this...
564 messages.error(
565 request, _("Libravatar image import not successful")
566 ) # pragma: no cover
567 imported = True
568 if not imported:
569 messages.warning(request, _("Nothing importable"))
570 return HttpResponseRedirect(reverse_lazy("profile"))
573@method_decorator(login_required, name="dispatch")
574class RawImageView(DetailView):
575 """
576 View to return (binary) raw image data, for use in <img/>-tags
577 """
579 model = Photo
581 def get(self, request, *args, **kwargs):
582 photo = self.model.objects.get(pk=kwargs["pk"]) # pylint: disable=no-member
583 if photo.user.id != request.user.id and not request.user.is_staff:
584 return HttpResponseRedirect(reverse_lazy("home"))
585 return HttpResponse(BytesIO(photo.data), content_type=f"image/{photo.format}")
588@method_decorator(login_required, name="dispatch")
589class DeletePhotoView(SuccessMessageMixin, View):
590 """
591 View class for deleting a photo
592 """
594 model = Photo
596 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument
597 """
598 Handle get - delete photo
599 """
600 try:
601 photo = self.model.objects.get( # pylint: disable=no-member
602 pk=kwargs["pk"], user=request.user
603 )
604 photo.delete()
605 except (self.model.DoesNotExist, ProtectedError): # pylint: disable=no-member
606 messages.error(request, _("No such image or no permission to delete it"))
607 return HttpResponseRedirect(reverse_lazy("profile"))
608 messages.success(request, _("Photo deleted successfully"))
609 return HttpResponseRedirect(reverse_lazy("profile"))
612@method_decorator(login_required, name="dispatch")
613class UploadPhotoView(SuccessMessageMixin, FormView):
614 """
615 View class responsible for photo upload
616 """
618 model = Photo
619 template_name = "upload_photo.html"
620 form_class = UploadPhotoForm
621 success_message = _("Successfully uploaded")
622 success_url = reverse_lazy("profile")
624 def post(self, request, *args, **kwargs):
625 num_photos = request.user.photo_set.count()
626 if num_photos >= MAX_NUM_PHOTOS:
627 messages.error(
628 request, _("Maximum number of photos (%i) reached" % MAX_NUM_PHOTOS)
629 )
630 return HttpResponseRedirect(reverse_lazy("profile"))
631 return super().post(request, *args, **kwargs)
633 def form_valid(self, form):
634 photo_data = self.request.FILES["photo"]
635 if photo_data.size > MAX_PHOTO_SIZE:
636 messages.error(self.request, _("Image too big"))
637 return HttpResponseRedirect(reverse_lazy("profile"))
639 photo = form.save(self.request, photo_data)
641 if not photo:
642 messages.error(self.request, _("Invalid Format"))
643 return HttpResponseRedirect(reverse_lazy("profile"))
645 # Override success URL -> Redirect to crop page.
646 self.success_url = reverse_lazy("crop_photo", args=[photo.pk])
647 return super().form_valid(form)
650@method_decorator(login_required, name="dispatch")
651class AddOpenIDView(SuccessMessageMixin, FormView):
652 """
653 View class for adding OpenID
654 """
656 template_name = "add_openid.html"
657 form_class = AddOpenIDForm
658 success_url = reverse_lazy("profile")
660 def form_valid(self, form):
661 if openid_id := form.save(self.request.user):
662 # At this point we have an unconfirmed OpenID, but
663 # we do not add the message, that we successfully added it,
664 # since this is misleading
665 return HttpResponseRedirect(
666 reverse_lazy("openid_redirection", args=[openid_id])
667 )
668 else:
669 return render(self.request, self.template_name, {"form": form})
672@method_decorator(login_required, name="dispatch")
673class RemoveUnconfirmedOpenIDView(View):
674 """
675 View class for removing a unconfirmed OpenID
676 """
678 model = UnconfirmedOpenId
680 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
681 """
682 Handle post - remove unconfirmed openid
683 """
684 try:
685 openid = self.model.objects.get( # pylint: disable=no-member
686 user=request.user, id=kwargs["openid_id"]
687 )
688 openid.delete()
689 messages.success(request, _("ID removed"))
690 except self.model.DoesNotExist: # pragma: no cover pylint: disable=no-member,line-too-long
691 messages.error(request, _("ID does not exist"))
692 return HttpResponseRedirect(reverse_lazy("profile"))
695@method_decorator(login_required, name="dispatch")
696class RemoveConfirmedOpenIDView(View):
697 """
698 View class for removing a confirmed OpenID
699 """
701 model = ConfirmedOpenId
703 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
704 """
705 Handle post - remove confirmed openid
706 """
707 try:
708 openid = self.model.objects.get( # pylint: disable=no-member
709 user=request.user, id=kwargs["openid_id"]
710 )
711 try:
712 openidobj = (
713 UserOpenID.objects.get( # pylint: disable=no-member,line-too-long
714 user_id=request.user.id, claimed_id=openid.openid
715 )
716 )
717 openidobj.delete()
718 except Exception as exc: # pylint: disable=broad-except
719 # Why it is not there?
720 print(f"How did we get here: {exc}")
721 openid.delete()
722 messages.success(request, _("ID removed"))
723 except self.model.DoesNotExist: # pylint: disable=no-member
724 messages.error(request, _("ID does not exist"))
725 return HttpResponseRedirect(reverse_lazy("profile"))
728@method_decorator(login_required, name="dispatch")
729class RedirectOpenIDView(View):
730 """
731 Redirect view for OpenID
732 """
734 model = UnconfirmedOpenId
736 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument
737 """
738 Handle get for OpenID redirect view
739 """
740 try:
741 unconfirmed = self.model.objects.get( # pylint: disable=no-member
742 user=request.user, id=kwargs["openid_id"]
743 )
744 except self.model.DoesNotExist: # pragma: no cover pylint: disable=no-member,line-too-long
745 messages.error(request, _("ID does not exist"))
746 return HttpResponseRedirect(reverse_lazy("profile"))
748 user_url = unconfirmed.openid
749 session = {"id": request.session.session_key}
751 oidutil.log = openid_logging
752 openid_consumer = consumer.Consumer(session, DjangoOpenIDStore())
754 try:
755 auth_request = openid_consumer.begin(user_url)
756 except consumer.DiscoveryFailure as exc:
757 messages.error(request, _(f"OpenID discovery failed: {exc}"))
758 return HttpResponseRedirect(reverse_lazy("profile"))
759 except UnicodeDecodeError as exc: # pragma: no cover
760 msg = _(
761 "OpenID discovery failed (userid=%(userid)s) for "
762 "%(userurl)s: %(message)s"
763 % {
764 "userid": request.user.id,
765 "userurl": user_url.encode("utf-8"),
766 "message": exc,
767 }
768 )
769 print(f"message: {msg}")
770 messages.error(request, msg)
772 if auth_request is None: # pragma: no cover
773 messages.error(request, _("OpenID discovery failed"))
774 return HttpResponseRedirect(reverse_lazy("profile"))
776 realm = request.build_absolute_uri("/")[:-1] # pragma: no cover
777 return_url = realm + reverse( # pragma: no cover
778 "confirm_openid", args=[kwargs["openid_id"]]
779 )
780 return HttpResponseRedirect( # pragma: no cover
781 auth_request.redirectURL(realm, return_url)
782 )
785@method_decorator(login_required, name="dispatch")
786class ConfirmOpenIDView(View): # pragma: no cover
787 """
788 Confirm OpenID view
789 """
791 model = UnconfirmedOpenId
792 model_confirmed = ConfirmedOpenId
794 def do_request(self, data, *args, **kwargs): # pylint: disable=unused-argument
795 """
796 Handle request, called by get() or post()
797 """
798 session = {"id": self.request.session.session_key}
799 current_url = self.request.build_absolute_uri("/")[:-1] + self.request.path
800 openid_consumer = consumer.Consumer(session, DjangoOpenIDStore())
801 info = openid_consumer.complete(data, current_url)
802 if info.status == consumer.FAILURE:
803 messages.error(
804 self.request, _('Confirmation failed: "') + str(info.message) + '"'
805 )
806 return HttpResponseRedirect(reverse_lazy("profile"))
808 if info.status == consumer.CANCEL:
809 messages.error(self.request, _("Cancelled by user"))
810 return HttpResponseRedirect(reverse_lazy("profile"))
812 if info.status != consumer.SUCCESS:
813 messages.error(self.request, _("Unknown verification error"))
814 return HttpResponseRedirect(reverse_lazy("profile"))
816 try:
817 unconfirmed = self.model.objects.get( # pylint: disable=no-member
818 user=self.request.user, id=kwargs["openid_id"]
819 )
820 except self.model.DoesNotExist: # pylint: disable=no-member
821 messages.error(self.request, _("ID does not exist"))
822 return HttpResponseRedirect(reverse_lazy("profile"))
824 # TODO: Check for a reasonable expiration time
825 confirmed = self.model_confirmed()
826 confirmed.user = unconfirmed.user
827 confirmed.ip_address = get_client_ip(self.request)[0]
828 confirmed.openid = unconfirmed.openid
829 confirmed.save()
831 unconfirmed.delete()
833 # If there is a single image in this user's profile
834 # assign it to the new id
835 if self.request.user.photo_set.count() == 1:
836 confirmed.set_photo(self.request.user.photo_set.first())
838 # Also allow user to login using this OpenID (if not already taken)
839 if not UserOpenID.objects.filter( # pylint: disable=no-member
840 claimed_id=confirmed.openid
841 ).exists():
842 user_openid = UserOpenID()
843 user_openid.user = self.request.user
844 user_openid.claimed_id = confirmed.openid
845 user_openid.display_id = confirmed.openid
846 user_openid.save()
847 return HttpResponseRedirect(reverse_lazy("profile"))
849 def get(self, request, *args, **kwargs):
850 """
851 Handle get - confirm openid
852 """
853 return self.do_request(request.GET, *args, **kwargs)
855 def post(self, request, *args, **kwargs):
856 """
857 Handle post - confirm openid
858 """
859 return self.do_request(request.POST, *args, **kwargs)
862@method_decorator(login_required, name="dispatch")
863class CropPhotoView(TemplateView):
864 """
865 View class for cropping photos
866 """
868 template_name = "crop_photo.html"
869 success_url = reverse_lazy("profile")
870 model = Photo
872 def get(self, request, *args, **kwargs):
873 photo = self.model.objects.get(
874 pk=kwargs["pk"], user=request.user
875 ) # pylint: disable=no-member
876 email = request.GET.get("email")
877 openid = request.GET.get("openid")
878 return render(
879 self.request,
880 self.template_name,
881 {
882 "photo": photo,
883 "email": email,
884 "openid": openid,
885 },
886 )
888 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
889 """
890 Handle post - crop photo
891 """
892 photo = self.model.objects.get(
893 pk=kwargs["pk"], user=request.user
894 ) # pylint: disable=no-member
895 dimensions = {
896 "x": int(float(request.POST["x"])),
897 "y": int(float(request.POST["y"])),
898 "w": int(float(request.POST["w"])),
899 "h": int(float(request.POST["h"])),
900 }
901 email = openid = None
902 if "email" in request.POST:
903 with contextlib.suppress(ConfirmedEmail.DoesNotExist):
904 email = ConfirmedEmail.objects.get(email=request.POST["email"])
905 if "openid" in request.POST:
906 with contextlib.suppress(ConfirmedOpenId.DoesNotExist):
907 openid = ConfirmedOpenId.objects.get( # pylint: disable=no-member
908 openid=request.POST["openid"]
909 )
910 return photo.perform_crop(request, dimensions, email, openid)
913@method_decorator(login_required, name="dispatch") # pylint: disable=too-many-ancestors
914class UserPreferenceView(FormView, UpdateView):
915 """
916 View class for user preferences view/update
917 """
919 template_name = "preferences.html"
920 model = UserPreference
921 form_class = UpdatePreferenceForm
922 success_url = reverse_lazy("user_preference")
924 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
925 """
926 Process POST-ed data from this form
927 """
928 userpref = None
929 try:
930 userpref = self.request.user.userpreference
931 except ObjectDoesNotExist:
932 userpref = UserPreference(user=self.request.user)
933 userpref.theme = request.POST["theme"]
934 userpref.save()
935 try:
936 if request.POST["email"] != self.request.user.email:
937 addresses = list(
938 self.request.user.confirmedemail_set.all().values_list(
939 "email", flat=True
940 )
941 )
942 if request.POST["email"] not in addresses:
943 messages.error(
944 self.request,
945 _(f'Mail address not allowed: {request.POST["email"]}'),
946 )
947 else:
948 self.request.user.email = request.POST["email"]
949 self.request.user.save()
950 messages.info(self.request, _("Mail address changed."))
951 except Exception as e: # pylint: disable=broad-except
952 messages.error(self.request, _(f"Error setting new mail address: {e}"))
954 try:
955 if request.POST["first_name"] or request.POST["last_name"]:
956 if request.POST["first_name"] != self.request.user.first_name:
957 self.request.user.first_name = request.POST["first_name"]
958 messages.info(self.request, _("First name changed."))
959 if request.POST["last_name"] != self.request.user.last_name:
960 self.request.user.last_name = request.POST["last_name"]
961 messages.info(self.request, _("Last name changed."))
962 self.request.user.save()
963 except Exception as e: # pylint: disable=broad-except
964 messages.error(self.request, _(f"Error setting names: {e}"))
966 return HttpResponseRedirect(reverse_lazy("user_preference"))
968 def get(self, request, *args, **kwargs):
969 return render(
970 self.request,
971 self.template_name,
972 {
973 "THEMES": UserPreference.THEMES,
974 },
975 )
977 def get_object(self, queryset=None):
978 (obj, created) = UserPreference.objects.get_or_create(
979 user=self.request.user
980 ) # pylint: disable=no-member,unused-variable
981 return obj
984@method_decorator(login_required, name="dispatch")
985class UploadLibravatarExportView(SuccessMessageMixin, FormView):
986 """
987 View class responsible for libravatar user data export upload
988 """
990 template_name = "upload_libravatar_export.html"
991 form_class = UploadLibravatarExportForm
992 success_message = _("Successfully uploaded")
993 success_url = reverse_lazy("profile")
994 model = User
996 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
997 """
998 Handle post request - choose items to import
999 """
1000 if "save" in kwargs: # pylint: disable=too-many-nested-blocks
1001 if kwargs["save"] == "save":
1002 for arg in request.POST:
1003 if arg.startswith("email_"):
1004 email = request.POST[arg]
1005 if not ConfirmedEmail.objects.filter(
1006 email=email
1007 ) and not UnconfirmedEmail.objects.filter(
1008 email=email
1009 ): # pylint: disable=no-member
1010 try:
1011 unconfirmed = UnconfirmedEmail.objects.create( # pylint: disable=no-member
1012 user=request.user, email=email
1013 )
1014 unconfirmed.save()
1015 unconfirmed.send_confirmation_mail(
1016 url=request.build_absolute_uri("/")[:-1]
1017 )
1018 messages.info(
1019 request,
1020 "%s: %s"
1021 % (
1022 email,
1023 _(
1024 "address added successfully,\
1025 confirmation mail sent"
1026 ),
1027 ),
1028 )
1029 except Exception as exc: # pylint: disable=broad-except
1030 # DEBUG
1031 print(
1032 f"Exception during adding mail address ({email}): {exc}"
1033 )
1035 if arg.startswith("photo"):
1036 try:
1037 data = base64.decodebytes(bytes(request.POST[arg], "utf-8"))
1038 except binascii.Error as exc:
1039 print(f"Cannot decode photo: {exc}")
1040 continue
1041 try:
1042 pilobj = Image.open(BytesIO(data))
1043 out = BytesIO()
1044 pilobj.save(out, pilobj.format, quality=JPEG_QUALITY)
1045 out.seek(0)
1046 photo = Photo()
1047 photo.user = request.user
1048 photo.ip_address = get_client_ip(request)[0]
1049 photo.format = file_format(pilobj.format)
1050 photo.data = out.read()
1051 photo.save()
1052 except Exception as exc: # pylint: disable=broad-except
1053 print(f"Exception during save: {exc}")
1054 continue
1056 return HttpResponseRedirect(reverse_lazy("profile"))
1057 return super().post(request, args, kwargs)
1059 def form_valid(self, form):
1060 data = self.request.FILES["export_file"]
1061 try:
1062 items = libravatar_read_gzdata(data.read())
1063 # DEBUG print(items)
1064 return render(
1065 self.request,
1066 "choose_libravatar_export.html",
1067 {
1068 "emails": items["emails"],
1069 "photos": items["photos"],
1070 },
1071 )
1072 except Exception as e:
1073 messages.error(self.request, _(f"Unable to parse file: {e}"))
1074 return HttpResponseRedirect(reverse_lazy("upload_export"))
1077@method_decorator(login_required, name="dispatch")
1078class ResendConfirmationMailView(View):
1079 """
1080 View class for resending confirmation mail
1081 """
1083 model = UnconfirmedEmail
1085 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument
1086 """
1087 Handle post - resend confirmation mail for unconfirmed e-mail address
1088 """
1089 try:
1090 email = self.model.objects.get( # pylint: disable=no-member
1091 user=request.user, id=kwargs["email_id"]
1092 )
1093 except self.model.DoesNotExist: # pragma: no cover pylint: disable=no-member
1094 messages.error(request, _("ID does not exist"))
1095 else:
1096 try:
1097 email.send_confirmation_mail(url=request.build_absolute_uri("/")[:-1])
1098 messages.success(
1099 request, f'{_("Confirmation mail sent to")}: {email.email}'
1100 )
1101 except Exception as exc: # pylint: disable=broad-except
1102 messages.error(
1103 request,
1104 f'{_("Unable to send confirmation email for")} {email.email}: {exc}',
1105 )
1106 return HttpResponseRedirect(reverse_lazy("profile"))
1109class IvatarLoginView(LoginView):
1110 """
1111 View class for login
1112 """
1114 template_name = "login.html"
1116 def get(self, request, *args, **kwargs):
1117 """
1118 Handle get for login view
1119 """
1120 if request.user:
1121 if request.user.is_authenticated:
1122 # Respect the 'next' parameter if present
1123 next_url = request.GET.get("next")
1124 if next_url:
1125 return HttpResponseRedirect(next_url)
1126 return HttpResponseRedirect(reverse_lazy("profile"))
1127 return super().get(self, request, args, kwargs)
1129 def get_context_data(self, **kwargs):
1130 context = super().get_context_data(**kwargs)
1131 context["with_fedora"] = SOCIAL_AUTH_FEDORA_KEY is not None
1132 return context
1135@method_decorator(login_required, name="dispatch")
1136class ProfileView(TemplateView):
1137 """
1138 View class for profile
1139 """
1141 template_name = "profile.html"
1143 def get(self, request, *args, **kwargs):
1144 if "profile_username" in kwargs:
1145 if not request.user.is_staff:
1146 return HttpResponseRedirect(reverse_lazy("profile"))
1147 with contextlib.suppress(Exception):
1148 u = User.objects.get(username=kwargs["profile_username"])
1149 request.user = u
1150 self._confirm_claimed_openid()
1151 return super().get(self, request, args, kwargs)
1153 def get_context_data(self, **kwargs):
1154 """
1155 Provide additional context data, like if max_photos is reached
1156 already or not.
1157 """
1158 context = super().get_context_data(**kwargs)
1159 context["max_photos"] = False
1160 if self.request.user:
1161 if self.request.user.photo_set.all().count() >= MAX_NUM_PHOTOS:
1162 context["max_photos"] = True
1163 return context
1165 def _confirm_claimed_openid(self):
1166 openids = self.request.user.useropenid_set.all()
1167 # If there is only one OpenID, we eventually need to add it to
1168 # the user account
1169 if openids.count() == 1:
1170 # Already confirmed, skip
1171 if ConfirmedOpenId.objects.filter( # pylint: disable=no-member
1172 openid=openids.first().claimed_id
1173 ).exists():
1174 return
1175 # For whatever reason, this is in unconfirmed state, skip
1176 if UnconfirmedOpenId.objects.filter( # pylint: disable=no-member
1177 openid=openids.first().claimed_id
1178 ).exists():
1179 return
1180 print(f"need to confirm: {openids.first()}")
1181 confirmed = ConfirmedOpenId()
1182 confirmed.user = self.request.user
1183 confirmed.ip_address = get_client_ip(self.request)[0]
1184 confirmed.openid = openids.first().claimed_id
1185 confirmed.save()
1188class PasswordResetView(PasswordResetViewOriginal):
1189 """
1190 View class for password reset
1191 """
1193 def post(self, request, *args, **kwargs):
1194 """
1195 Since we have the mail addresses in ConfirmedEmail model,
1196 we need to set the email on the user object in order for the
1197 PasswordResetView class to pick up the correct user.
1198 In case we have the mail address in the User object, we still
1199 need to assign a random password in order for PasswordResetView
1200 class to pick up the user - else it will silently do nothing.
1201 """
1202 if "email" in request.POST:
1203 user = None
1205 # Try to find the user via the normal user class
1206 # TODO: How to handle the case that multiple user accounts
1207 # could have the same password set?
1208 user = User.objects.filter(email=request.POST["email"]).first()
1210 # If we didn't find the user in the previous step,
1211 # try the ConfirmedEmail class instead.
1212 # If we find the user there, we need to set the mail
1213 # attribute on the user object accordingly
1214 if not user:
1215 with contextlib.suppress(ObjectDoesNotExist):
1216 confirmed_email = ConfirmedEmail.objects.get(
1217 email=request.POST["email"]
1218 )
1219 user = confirmed_email.user
1220 user.email = confirmed_email.email
1221 user.save()
1222 # If we found the user, set a random password. Else, the
1223 # ResetPasswordView class will silently ignore the password
1224 # reset request
1225 if user:
1226 if not user.password or user.password.startswith("!"):
1227 random_pass = User.objects.make_random_password()
1228 user.set_password(random_pass)
1229 user.save()
1231 # Whatever happens above, let the original function handle the rest
1232 return super().post(self, request, args, kwargs)
1235@method_decorator(login_required, name="dispatch")
1236class DeleteAccountView(SuccessMessageMixin, FormView):
1237 """
1238 View class for account deletion
1239 """
1241 template_name = "delete.html"
1242 form_class = DeleteAccountForm
1243 success_url = reverse_lazy("home")
1245 def get(self, request, *args, **kwargs):
1246 return super().get(self, request, args, kwargs)
1248 def post(self, request, *args, **kwargs):
1249 """
1250 Handle account deletion
1251 """
1252 if request.user.password:
1253 if "password" in request.POST:
1254 if not request.user.check_password(request.POST["password"]):
1255 messages.error(request, _("Incorrect password"))
1256 return HttpResponseRedirect(reverse_lazy("delete"))
1257 else:
1258 messages.error(request, _("No password given"))
1259 return HttpResponseRedirect(reverse_lazy("delete"))
1261 # should delete all confirmed/unconfirmed/photo objects
1262 request.user.delete()
1263 return super().post(self, request, args, kwargs)
1266@method_decorator(login_required, name="dispatch")
1267class ExportView(SuccessMessageMixin, TemplateView):
1268 """
1269 View class responsible for libravatar user data export
1270 """
1272 template_name = "export.html"
1273 model = User
1275 def get(self, request, *args, **kwargs):
1276 return super().get(self, request, args, kwargs)
1278 def post(self, request, *args, **kwargs):
1279 """
1280 Handle real export
1281 """
1282 SCHEMA_ROOT = "https://www.libravatar.org/schemas/export/0.2"
1283 SCHEMA_XSD = f"{SCHEMA_ROOT}/export.xsd"
1285 def xml_header():
1286 return (
1287 """<?xml version="1.0" encoding="UTF-8"?>"""
1288 '''<user xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"'''
1289 ''' xsi:schemaLocation="%s %s"'''
1290 """ xmlns="%s">\n""" % (SCHEMA_ROOT, SCHEMA_XSD, SCHEMA_ROOT)
1291 )
1293 def xml_footer():
1294 return "</user>\n"
1296 def xml_account(user):
1297 escaped_username = saxutils.quoteattr(user.username)
1298 escaped_password = saxutils.quoteattr(user.password)
1299 return " <account username=%s password=%s/>\n" % (
1300 escaped_username,
1301 escaped_password,
1302 )
1304 def xml_email(user):
1305 returnstring = " <emails>\n"
1306 for email in user.confirmedemail_set.all():
1307 returnstring += (
1308 ' <email photo_id="'
1309 + str(email.photo_id)
1310 + '">'
1311 + str(email.email)
1312 + "</email>"
1313 + "\n"
1314 )
1315 returnstring += " </emails>\n"
1316 return returnstring
1318 def xml_openid(user):
1319 returnstring = " <openids>\n"
1320 for openid in user.confirmedopenid_set.all():
1321 returnstring += (
1322 ' <openid photo_id="'
1323 + str(openid.photo_id)
1324 + '">'
1325 + str(openid.openid)
1326 + "</openid>"
1327 + "\n"
1328 )
1329 returnstring += " </openids>\n"
1330 return returnstring
1332 def xml_photos(user):
1333 s = " <photos>\n"
1334 for photo in user.photo_set.all():
1335 encoded_photo = base64.b64encode(photo.data)
1336 if encoded_photo:
1337 s += (
1338 """ <photo id="%s" encoding="base64" format=%s>"""
1339 """%s"""
1340 """</photo>\n"""
1341 % (photo.id, saxutils.quoteattr(photo.format), encoded_photo)
1342 )
1343 s += " </photos>\n"
1344 return s
1346 user = request.user
1348 photos = []
1349 for photo in user.photo_set.all():
1350 photo_details = {"data": photo.data, "format": photo.format}
1351 photos.append(photo_details)
1353 bytesobj = BytesIO()
1354 data = gzip.GzipFile(fileobj=bytesobj, mode="w")
1355 data.write(bytes(xml_header(), "utf-8"))
1356 data.write(bytes(xml_account(user), "utf-8"))
1357 data.write(bytes(xml_email(user), "utf-8"))
1358 data.write(bytes(xml_openid(user), "utf-8"))
1359 data.write(bytes(xml_photos(user), "utf-8"))
1360 data.write(bytes(xml_footer(), "utf-8"))
1361 data.close()
1362 bytesobj.seek(0)
1364 response = HttpResponse(content_type="application/gzip")
1365 response[
1366 "Content-Disposition"
1367 ] = f'attachment; filename="libravatar-export_{user.username}.xml.gz"'
1368 response.write(bytesobj.read())
1369 return response