Coverage for ivatar/ivataraccount/views.py: 75%
664 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-14 23:13 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-14 23:13 +0000
1# -*- coding: utf-8 -*-
2"""
3View classes for ivatar/ivataraccount/
4"""
6from io import BytesIO
7from ivatar.utils import urlopen, Bluesky
8import base64
9import binascii
10import contextlib
11from xml.sax import saxutils
12import gzip
14from PIL import Image
16from django.db.models import ProtectedError
17from django.core.exceptions import ObjectDoesNotExist
18from django.contrib.auth.decorators import login_required
19from django.contrib.auth.models import User
20from django.utils.decorators import method_decorator
21from django.contrib.messages.views import SuccessMessageMixin
22from django.contrib import messages
23from django.views.generic.edit import FormView, UpdateView
24from django.views.generic.base import View, TemplateView
25from django.views.generic.detail import DetailView
26from django.contrib.auth import authenticate, login
27from django.contrib.auth.forms import UserCreationForm, SetPasswordForm
28from django.contrib.auth.views import LoginView
29from django.contrib.auth.views import (
30 PasswordResetView as PasswordResetViewOriginal,
31)
32from django.utils.translation import gettext_lazy as _
33from django.http import HttpResponseRedirect, HttpResponse
34from django.urls import reverse_lazy, reverse
35from django.shortcuts import render
36from django_openid_auth.models import UserOpenID
38from openid import oidutil
39from openid.consumer import consumer
41from ipware import get_client_ip
43from email_validator import validate_email
45from libravatar import libravatar_url
46from ivatar.settings import (
47 MAX_NUM_PHOTOS,
48 MAX_PHOTO_SIZE,
49 JPEG_QUALITY,
50 AVATAR_MAX_SIZE,
51 SOCIAL_AUTH_FEDORA_KEY,
52)
53from .gravatar import get_photo as get_gravatar_photo
55from .forms import AddEmailForm, UploadPhotoForm, AddOpenIDForm
56from .forms import UpdatePreferenceForm, UploadLibravatarExportForm
57from .forms import DeleteAccountForm
58from .models import UnconfirmedEmail, ConfirmedEmail, Photo
59from .models import UnconfirmedOpenId, ConfirmedOpenId, DjangoOpenIDStore
60from .models import UserPreference
61from .models import file_format
62from .read_libravatar_export import read_gzdata as libravatar_read_gzdata
65def openid_logging(message, level=0):
66 """
67 Helper method for openid logging
68 """
69 # Normal messages are not that important
70 # No need for coverage here
71 if level > 0: # pragma: no cover
72 print(message)
75class CreateView(SuccessMessageMixin, FormView):
76 """
77 View class for creating a new user
78 """
80 template_name = "new.html"
81 form_class = UserCreationForm
83 def form_valid(self, form):
84 form.save()
85 user = authenticate(
86 username=form.cleaned_data["username"],
87 password=form.cleaned_data["password1"],
88 )
89 if user is not None:
90 # If the username looks like a mail address, automagically
91 # add it as unconfirmed mail and set it also as user's
92 # email address
93 with contextlib.suppress(Exception):
94 self._extracted_from_form_valid_(form, user)
95 login(self.request, user)
96 pref = UserPreference.objects.create(
97 user_id=user.pk
98 ) # pylint: disable=no-member
99 pref.save()
100 return HttpResponseRedirect(reverse_lazy("profile"))
101 return HttpResponseRedirect(reverse_lazy("login")) # pragma: no cover
103 def _extracted_from_form_valid_(self, form, user):
104 # This will error out if it's not a valid address
105 valid = validate_email(form.cleaned_data["username"])
106 user.email = valid.email
107 user.save()
108 # The following will also error out if it already exists
109 unconfirmed = UnconfirmedEmail()
110 unconfirmed.email = valid.email
111 unconfirmed.user = user
112 unconfirmed.save()
113 unconfirmed.send_confirmation_mail(
114 url=self.request.build_absolute_uri("/")[:-1]
115 )
117 def get(self, request, *args, **kwargs):
118 """
119 Handle get for create view
120 """
121 if request.user and request.user.is_authenticated:
122 return HttpResponseRedirect(reverse_lazy("profile"))
123 return super().get(self, request, args, kwargs)
126@method_decorator(login_required, name="dispatch")
127class PasswordSetView(SuccessMessageMixin, FormView):
128 """
129 View class for changing the password
130 """
132 template_name = "password_change.html"
133 form_class = SetPasswordForm
134 success_message = _("password changed successfully - please login again")
135 success_url = reverse_lazy("profile")
137 def get_form_kwargs(self):
138 kwargs = super(PasswordSetView, self).get_form_kwargs()
139 kwargs["user"] = self.request.user
140 return kwargs
142 def form_valid(self, form):
143 form.save()
144 super().form_valid(form)
145 return HttpResponseRedirect(reverse_lazy("login"))
148@method_decorator(login_required, name="dispatch")
149class AddEmailView(SuccessMessageMixin, FormView):
150 """
151 View class for adding email addresses
152 """
154 template_name = "add_email.html"
155 form_class = AddEmailForm
156 success_url = reverse_lazy("profile")
158 def form_valid(self, form):
159 if not form.save(self.request):
160 return render(self.request, self.template_name, {"form": form})
162 messages.success(self.request, _("Address added successfully"))
163 return super().form_valid(form)
166@method_decorator(login_required, name="dispatch")
167class RemoveUnconfirmedEmailView(SuccessMessageMixin, View):
168 """
169 View class for removing a unconfirmed email address
170 """
172 @staticmethod
173 def post(request, *args, **kwargs): # pylint: disable=unused-argument
174 """
175 Handle post request - removing unconfirmed email
176 """
177 try:
178 email = UnconfirmedEmail.objects.get( # pylint: disable=no-member
179 user=request.user, id=kwargs["email_id"]
180 )
181 email.delete()
182 messages.success(request, _("Address removed"))
183 except UnconfirmedEmail.DoesNotExist: # pylint: disable=no-member
184 messages.error(request, _("Address does not exist"))
185 return HttpResponseRedirect(reverse_lazy("profile"))
188class ConfirmEmailView(SuccessMessageMixin, TemplateView):
189 """
190 View class for confirming an unconfirmed email address
191 """
193 template_name = "email_confirmed.html"
195 def get(self, request, *args, **kwargs):
196 # be tolerant of extra crap added by mail clients
197 key = kwargs["verification_key"].replace(" ", "")
199 if len(key) != 64:
200 messages.error(request, _("Verification key incorrect"))
201 return HttpResponseRedirect(reverse_lazy("profile"))
203 try:
204 unconfirmed = UnconfirmedEmail.objects.get(
205 verification_key=key
206 ) # pylint: disable=no-member
207 except UnconfirmedEmail.DoesNotExist: # pylint: disable=no-member
208 messages.error(request, _("Verification key does not exist"))
209 return HttpResponseRedirect(reverse_lazy("profile"))
211 if ConfirmedEmail.objects.filter(email=unconfirmed.email).count() > 0:
212 messages.error(
213 request,
214 _("This mail address has been taken already and cannot be confirmed"),
215 )
216 return HttpResponseRedirect(reverse_lazy("profile"))
218 # TODO: Check for a reasonable expiration time in unconfirmed email
220 (confirmed_id, external_photos) = ConfirmedEmail.objects.create_confirmed_email(
221 unconfirmed.user, unconfirmed.email, not request.user.is_anonymous
222 )
224 unconfirmed.delete()
226 # if there's a single image in this user's profile,
227 # assign it to the new email
228 confirmed = ConfirmedEmail.objects.get(id=confirmed_id)
229 if confirmed.user.photo_set.count() == 1:
230 confirmed.set_photo(confirmed.user.photo_set.first())
231 kwargs["photos"] = external_photos
232 kwargs["email_id"] = confirmed_id
233 return super().get(request, *args, **kwargs)
236@method_decorator(login_required, name="dispatch")
237class RemoveConfirmedEmailView(SuccessMessageMixin, View):
238 """
239 View class for removing a confirmed email address
240 """
242 @staticmethod
243 def post(request, *args, **kwargs): # pylint: disable=unused-argument
244 """
245 Handle post request - removing confirmed email
246 """
247 try:
248 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
249 email.delete()
250 messages.success(request, _("Address removed"))
251 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
252 messages.error(request, _("Address does not exist"))
253 return HttpResponseRedirect(reverse_lazy("profile"))
256@method_decorator(login_required, name="dispatch")
257class AssignPhotoEmailView(SuccessMessageMixin, TemplateView):
258 """
259 View class for assigning a photo to an email address
260 """
262 model = Photo
263 template_name = "assign_photo_email.html"
265 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
266 """
267 Handle post request - assign photo to email
268 """
269 photo = None
271 try:
272 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
273 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
274 messages.error(request, _("Invalid request"))
275 return HttpResponseRedirect(reverse_lazy("profile"))
277 if "photoNone" in request.POST:
278 email.photo = None
279 else:
280 if "photo_id" not in request.POST:
281 messages.error(request, _("Invalid request [photo_id] missing"))
282 return HttpResponseRedirect(reverse_lazy("profile"))
284 try:
285 photo = self.model.objects.get( # pylint: disable=no-member
286 id=request.POST["photo_id"], user=request.user
287 )
288 except self.model.DoesNotExist: # pylint: disable=no-member
289 messages.error(request, _("Photo does not exist"))
290 return HttpResponseRedirect(reverse_lazy("profile"))
291 email.photo = photo
292 email.bluesky_handle = None
293 email.save()
295 messages.success(request, _("Successfully changed photo"))
296 return HttpResponseRedirect(reverse_lazy("profile"))
298 def get_context_data(self, **kwargs):
299 data = super().get_context_data(**kwargs)
300 data["email"] = ConfirmedEmail.objects.get(pk=kwargs["email_id"])
301 return data
304@method_decorator(login_required, name="dispatch")
305class AssignPhotoOpenIDView(SuccessMessageMixin, TemplateView):
306 """
307 View class for assigning a photo to an openid address
308 """
310 model = Photo
311 template_name = "assign_photo_openid.html"
313 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
314 """
315 Handle post - assign photo to openid
316 """
317 photo = None
319 try:
320 openid = ConfirmedOpenId.objects.get( # pylint: disable=no-member
321 user=request.user, id=kwargs["openid_id"]
322 )
323 except ConfirmedOpenId.DoesNotExist: # pylint: disable=no-member
324 messages.error(request, _("Invalid request"))
325 return HttpResponseRedirect(reverse_lazy("profile"))
327 if "photoNone" in request.POST:
328 openid.photo = None
329 else:
330 if "photo_id" not in request.POST:
331 messages.error(request, _("Invalid request [photo_id] missing"))
332 return HttpResponseRedirect(reverse_lazy("profile"))
334 try:
335 photo = self.model.objects.get( # pylint: disable=no-member
336 id=request.POST["photo_id"], user=request.user
337 )
338 except self.model.DoesNotExist: # pylint: disable=no-member
339 messages.error(request, _("Photo does not exist"))
340 return HttpResponseRedirect(reverse_lazy("profile"))
341 openid.photo = photo
342 openid.bluesky_handle = None
343 openid.save()
345 messages.success(request, _("Successfully changed photo"))
346 return HttpResponseRedirect(reverse_lazy("profile"))
348 def get_context_data(self, **kwargs):
349 data = super().get_context_data(**kwargs)
350 data["openid"] = ConfirmedOpenId.objects.get(
351 pk=kwargs["openid_id"]
352 ) # pylint: disable=no-member
353 return data
356@method_decorator(login_required, name="dispatch")
357class AssignBlueskyHandleToEmailView(SuccessMessageMixin, TemplateView):
358 """
359 View class for assigning a Bluesky handle to an email address
360 """
362 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
363 """
364 Handle post request - assign bluesky handle to email
365 """
367 try:
368 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
369 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
370 messages.error(request, _("Invalid request"))
371 return HttpResponseRedirect(reverse_lazy("profile"))
373 if "bluesky_handle" not in request.POST:
374 messages.error(request, _("Invalid request [bluesky_handle] missing"))
375 return HttpResponseRedirect(reverse_lazy("profile"))
376 bluesky_handle = request.POST["bluesky_handle"]
378 try:
379 bs = Bluesky()
381 bs.get_avatar(bluesky_handle)
382 except Exception as e:
383 messages.error(request, _(f"Handle '{bluesky_handle}' not found: {e}"))
384 return HttpResponseRedirect(
385 reverse_lazy(
386 "assign_photo_email", kwargs={"email_id": int(kwargs["email_id"])}
387 )
388 )
389 try:
390 email.set_bluesky_handle(bluesky_handle)
391 except Exception as e:
392 messages.error(request, _(f"Error: {e}"))
393 return HttpResponseRedirect(
394 reverse_lazy(
395 "assign_photo_email", kwargs={"email_id": int(kwargs["email_id"])}
396 )
397 )
398 email.photo = None
399 email.save()
401 messages.success(request, _("Successfully assigned Bluesky handle"))
402 return HttpResponseRedirect(reverse_lazy("profile"))
404 def get_context_data(self, **kwargs):
405 data = super().get_context_data(**kwargs)
406 data["email"] = ConfirmedEmail.objects.get(pk=kwargs["email_id"])
407 return data
410@method_decorator(login_required, name="dispatch")
411class AssignBlueskyHandleToOpenIdView(SuccessMessageMixin, TemplateView):
412 """
413 View class for assigning a Bluesky handle to an email address
414 """
416 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
417 """
418 Handle post request - assign bluesky handle to email
419 """
421 try:
422 openid = ConfirmedOpenId.objects.get(
423 user=request.user, id=kwargs["open_id"]
424 )
425 except ConfirmedOpenId.DoesNotExist: # pylint: disable=no-member
426 messages.error(request, _("Invalid request"))
427 return HttpResponseRedirect(reverse_lazy("profile"))
429 if "bluesky_handle" not in request.POST:
430 messages.error(request, _("Invalid request [bluesky_handle] missing"))
431 return HttpResponseRedirect(reverse_lazy("profile"))
432 bluesky_handle = request.POST["bluesky_handle"]
434 try:
435 bs = Bluesky()
437 bs.get_avatar(bluesky_handle)
438 except Exception as e:
439 messages.error(request, _(f"Handle '{bluesky_handle}' not found: {e}"))
440 return HttpResponseRedirect(
441 reverse_lazy(
442 "assign_photo_openid", kwargs={"openid_id": int(kwargs["open_id"])}
443 )
444 )
445 try:
446 openid.set_bluesky_handle(bluesky_handle)
447 except Exception as e:
448 messages.error(request, _(f"Error: {e}"))
449 return HttpResponseRedirect(
450 reverse_lazy(
451 "assign_photo_openid", kwargs={"openid_id": int(kwargs["open_id"])}
452 )
453 )
454 openid.photo = None
455 openid.save()
457 messages.success(request, _("Successfully assigned Bluesky handle"))
458 return HttpResponseRedirect(reverse_lazy("profile"))
460 def get_context_data(self, **kwargs):
461 data = super().get_context_data(**kwargs)
462 data["openid"] = ConfirmedOpenId.objects.get(pk=kwargs["open_id"])
463 return data
466@method_decorator(login_required, name="dispatch")
467class ImportPhotoView(SuccessMessageMixin, TemplateView):
468 """
469 View class to import a photo from another service
470 Currently only Gravatar is supported
471 """
473 template_name = "import_photo.html"
475 def get_context_data(self, **kwargs):
476 context = super().get_context_data(**kwargs)
477 context["photos"] = []
478 addr = None
479 if "email_id" in kwargs:
480 try:
481 addr = ConfirmedEmail.objects.get(pk=kwargs["email_id"]).email
482 except ConfirmedEmail.ObjectDoesNotExist: # pylint: disable=no-member
483 messages.error(self.request, _("Address does not exist"))
484 return context
486 if addr := kwargs.get("email_addr", None):
487 if gravatar := get_gravatar_photo(addr):
488 context["photos"].append(gravatar)
490 if libravatar_service_url := libravatar_url(
491 email=addr,
492 default=404,
493 size=AVATAR_MAX_SIZE,
494 ):
495 try:
496 urlopen(libravatar_service_url)
497 except OSError as exc:
498 print(f"Exception caught during photo import: {exc}")
499 else:
500 context["photos"].append(
501 {
502 "service_url": libravatar_service_url,
503 "thumbnail_url": f"{libravatar_service_url}&s=80",
504 "image_url": f"{libravatar_service_url}&s=512",
505 "width": 80,
506 "height": 80,
507 "service_name": "Libravatar",
508 }
509 )
511 return context
513 def post(
514 self, request, *args, **kwargs
515 ): # pylint: disable=no-self-use,unused-argument,too-many-branches,line-too-long
516 """
517 Handle post to photo import
518 """
520 imported = None
522 email_id = kwargs.get("email_id", request.POST.get("email_id", None))
523 addr = kwargs.get("email", request.POST.get("email_addr", None))
525 if email_id:
526 email = ConfirmedEmail.objects.filter(id=email_id, user=request.user)
527 if email.exists():
528 addr = email.first().email
529 else:
530 messages.error(request, _("Address does not exist"))
531 return HttpResponseRedirect(reverse_lazy("profile"))
533 if "photo_Gravatar" in request.POST:
534 photo = Photo()
535 photo.user = request.user
536 photo.ip_address = get_client_ip(request)[0]
537 if photo.import_image("Gravatar", addr):
538 messages.success(request, _("Gravatar image successfully imported"))
539 else:
540 # Honestly, I'm not sure how to test this...
541 messages.error(
542 request, _("Gravatar image import not successful")
543 ) # pragma: no cover
544 imported = True
546 if "photo_Libravatar" in request.POST:
547 photo = Photo()
548 photo.user = request.user
549 photo.ip_address = get_client_ip(request)[0]
550 if photo.import_image("Libravatar", addr):
551 messages.success(request, _("Libravatar image successfully imported"))
552 else:
553 # Honestly, I'm not sure how to test this...
554 messages.error(
555 request, _("Libravatar image import not successful")
556 ) # pragma: no cover
557 imported = True
558 if not imported:
559 messages.warning(request, _("Nothing importable"))
560 return HttpResponseRedirect(reverse_lazy("profile"))
563@method_decorator(login_required, name="dispatch")
564class RawImageView(DetailView):
565 """
566 View to return (binary) raw image data, for use in <img/>-tags
567 """
569 model = Photo
571 def get(self, request, *args, **kwargs):
572 photo = self.model.objects.get(pk=kwargs["pk"]) # pylint: disable=no-member
573 if photo.user.id != request.user.id and not request.user.is_staff:
574 return HttpResponseRedirect(reverse_lazy("home"))
575 return HttpResponse(BytesIO(photo.data), content_type=f"image/{photo.format}")
578@method_decorator(login_required, name="dispatch")
579class DeletePhotoView(SuccessMessageMixin, View):
580 """
581 View class for deleting a photo
582 """
584 model = Photo
586 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument
587 """
588 Handle get - delete photo
589 """
590 try:
591 photo = self.model.objects.get( # pylint: disable=no-member
592 pk=kwargs["pk"], user=request.user
593 )
594 photo.delete()
595 except (self.model.DoesNotExist, ProtectedError): # pylint: disable=no-member
596 messages.error(request, _("No such image or no permission to delete it"))
597 return HttpResponseRedirect(reverse_lazy("profile"))
598 messages.success(request, _("Photo deleted successfully"))
599 return HttpResponseRedirect(reverse_lazy("profile"))
602@method_decorator(login_required, name="dispatch")
603class UploadPhotoView(SuccessMessageMixin, FormView):
604 """
605 View class responsible for photo upload
606 """
608 model = Photo
609 template_name = "upload_photo.html"
610 form_class = UploadPhotoForm
611 success_message = _("Successfully uploaded")
612 success_url = reverse_lazy("profile")
614 def post(self, request, *args, **kwargs):
615 num_photos = request.user.photo_set.count()
616 if num_photos >= MAX_NUM_PHOTOS:
617 messages.error(
618 request, _("Maximum number of photos (%i) reached" % MAX_NUM_PHOTOS)
619 )
620 return HttpResponseRedirect(reverse_lazy("profile"))
621 return super().post(request, *args, **kwargs)
623 def form_valid(self, form):
624 photo_data = self.request.FILES["photo"]
625 if photo_data.size > MAX_PHOTO_SIZE:
626 messages.error(self.request, _("Image too big"))
627 return HttpResponseRedirect(reverse_lazy("profile"))
629 photo = form.save(self.request, photo_data)
631 if not photo:
632 messages.error(self.request, _("Invalid Format"))
633 return HttpResponseRedirect(reverse_lazy("profile"))
635 # Override success URL -> Redirect to crop page.
636 self.success_url = reverse_lazy("crop_photo", args=[photo.pk])
637 return super().form_valid(form)
640@method_decorator(login_required, name="dispatch")
641class AddOpenIDView(SuccessMessageMixin, FormView):
642 """
643 View class for adding OpenID
644 """
646 template_name = "add_openid.html"
647 form_class = AddOpenIDForm
648 success_url = reverse_lazy("profile")
650 def form_valid(self, form):
651 if openid_id := form.save(self.request.user):
652 # At this point we have an unconfirmed OpenID, but
653 # we do not add the message, that we successfully added it,
654 # since this is misleading
655 return HttpResponseRedirect(
656 reverse_lazy("openid_redirection", args=[openid_id])
657 )
658 else:
659 return render(self.request, self.template_name, {"form": form})
662@method_decorator(login_required, name="dispatch")
663class RemoveUnconfirmedOpenIDView(View):
664 """
665 View class for removing a unconfirmed OpenID
666 """
668 model = UnconfirmedOpenId
670 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
671 """
672 Handle post - remove unconfirmed openid
673 """
674 try:
675 openid = self.model.objects.get( # pylint: disable=no-member
676 user=request.user, id=kwargs["openid_id"]
677 )
678 openid.delete()
679 messages.success(request, _("ID removed"))
680 except self.model.DoesNotExist: # pragma: no cover pylint: disable=no-member,line-too-long
681 messages.error(request, _("ID does not exist"))
682 return HttpResponseRedirect(reverse_lazy("profile"))
685@method_decorator(login_required, name="dispatch")
686class RemoveConfirmedOpenIDView(View):
687 """
688 View class for removing a confirmed OpenID
689 """
691 model = ConfirmedOpenId
693 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
694 """
695 Handle post - remove confirmed openid
696 """
697 try:
698 openid = self.model.objects.get( # pylint: disable=no-member
699 user=request.user, id=kwargs["openid_id"]
700 )
701 try:
702 openidobj = (
703 UserOpenID.objects.get( # pylint: disable=no-member,line-too-long
704 user_id=request.user.id, claimed_id=openid.openid
705 )
706 )
707 openidobj.delete()
708 except Exception as exc: # pylint: disable=broad-except
709 # Why it is not there?
710 print(f"How did we get here: {exc}")
711 openid.delete()
712 messages.success(request, _("ID removed"))
713 except self.model.DoesNotExist: # pylint: disable=no-member
714 messages.error(request, _("ID does not exist"))
715 return HttpResponseRedirect(reverse_lazy("profile"))
718@method_decorator(login_required, name="dispatch")
719class RedirectOpenIDView(View):
720 """
721 Redirect view for OpenID
722 """
724 model = UnconfirmedOpenId
726 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument
727 """
728 Handle get for OpenID redirect view
729 """
730 try:
731 unconfirmed = self.model.objects.get( # pylint: disable=no-member
732 user=request.user, id=kwargs["openid_id"]
733 )
734 except self.model.DoesNotExist: # pragma: no cover pylint: disable=no-member,line-too-long
735 messages.error(request, _("ID does not exist"))
736 return HttpResponseRedirect(reverse_lazy("profile"))
738 user_url = unconfirmed.openid
739 session = {"id": request.session.session_key}
741 oidutil.log = openid_logging
742 openid_consumer = consumer.Consumer(session, DjangoOpenIDStore())
744 try:
745 auth_request = openid_consumer.begin(user_url)
746 except consumer.DiscoveryFailure as exc:
747 messages.error(request, _(f"OpenID discovery failed: {exc}"))
748 return HttpResponseRedirect(reverse_lazy("profile"))
749 except UnicodeDecodeError as exc: # pragma: no cover
750 msg = _(
751 "OpenID discovery failed (userid=%(userid)s) for "
752 "%(userurl)s: %(message)s"
753 % {
754 "userid": request.user.id,
755 "userurl": user_url.encode("utf-8"),
756 "message": exc,
757 }
758 )
759 print(f"message: {msg}")
760 messages.error(request, msg)
762 if auth_request is None: # pragma: no cover
763 messages.error(request, _("OpenID discovery failed"))
764 return HttpResponseRedirect(reverse_lazy("profile"))
766 realm = request.build_absolute_uri("/")[:-1] # pragma: no cover
767 return_url = realm + reverse( # pragma: no cover
768 "confirm_openid", args=[kwargs["openid_id"]]
769 )
770 return HttpResponseRedirect( # pragma: no cover
771 auth_request.redirectURL(realm, return_url)
772 )
775@method_decorator(login_required, name="dispatch")
776class ConfirmOpenIDView(View): # pragma: no cover
777 """
778 Confirm OpenID view
779 """
781 model = UnconfirmedOpenId
782 model_confirmed = ConfirmedOpenId
784 def do_request(self, data, *args, **kwargs): # pylint: disable=unused-argument
785 """
786 Handle request, called by get() or post()
787 """
788 session = {"id": self.request.session.session_key}
789 current_url = self.request.build_absolute_uri("/")[:-1] + self.request.path
790 openid_consumer = consumer.Consumer(session, DjangoOpenIDStore())
791 info = openid_consumer.complete(data, current_url)
792 if info.status == consumer.FAILURE:
793 messages.error(
794 self.request, _('Confirmation failed: "') + str(info.message) + '"'
795 )
796 return HttpResponseRedirect(reverse_lazy("profile"))
798 if info.status == consumer.CANCEL:
799 messages.error(self.request, _("Cancelled by user"))
800 return HttpResponseRedirect(reverse_lazy("profile"))
802 if info.status != consumer.SUCCESS:
803 messages.error(self.request, _("Unknown verification error"))
804 return HttpResponseRedirect(reverse_lazy("profile"))
806 try:
807 unconfirmed = self.model.objects.get( # pylint: disable=no-member
808 user=self.request.user, id=kwargs["openid_id"]
809 )
810 except self.model.DoesNotExist: # pylint: disable=no-member
811 messages.error(self.request, _("ID does not exist"))
812 return HttpResponseRedirect(reverse_lazy("profile"))
814 # TODO: Check for a reasonable expiration time
815 confirmed = self.model_confirmed()
816 confirmed.user = unconfirmed.user
817 confirmed.ip_address = get_client_ip(self.request)[0]
818 confirmed.openid = unconfirmed.openid
819 confirmed.save()
821 unconfirmed.delete()
823 # If there is a single image in this user's profile
824 # assign it to the new id
825 if self.request.user.photo_set.count() == 1:
826 confirmed.set_photo(self.request.user.photo_set.first())
828 # Also allow user to login using this OpenID (if not already taken)
829 if not UserOpenID.objects.filter( # pylint: disable=no-member
830 claimed_id=confirmed.openid
831 ).exists():
832 user_openid = UserOpenID()
833 user_openid.user = self.request.user
834 user_openid.claimed_id = confirmed.openid
835 user_openid.display_id = confirmed.openid
836 user_openid.save()
837 return HttpResponseRedirect(reverse_lazy("profile"))
839 def get(self, request, *args, **kwargs):
840 """
841 Handle get - confirm openid
842 """
843 return self.do_request(request.GET, *args, **kwargs)
845 def post(self, request, *args, **kwargs):
846 """
847 Handle post - confirm openid
848 """
849 return self.do_request(request.POST, *args, **kwargs)
852@method_decorator(login_required, name="dispatch")
853class CropPhotoView(TemplateView):
854 """
855 View class for cropping photos
856 """
858 template_name = "crop_photo.html"
859 success_url = reverse_lazy("profile")
860 model = Photo
862 def get(self, request, *args, **kwargs):
863 photo = self.model.objects.get(
864 pk=kwargs["pk"], user=request.user
865 ) # pylint: disable=no-member
866 email = request.GET.get("email")
867 openid = request.GET.get("openid")
868 return render(
869 self.request,
870 self.template_name,
871 {
872 "photo": photo,
873 "email": email,
874 "openid": openid,
875 },
876 )
878 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
879 """
880 Handle post - crop photo
881 """
882 photo = self.model.objects.get(
883 pk=kwargs["pk"], user=request.user
884 ) # pylint: disable=no-member
885 dimensions = {
886 "x": int(float(request.POST["x"])),
887 "y": int(float(request.POST["y"])),
888 "w": int(float(request.POST["w"])),
889 "h": int(float(request.POST["h"])),
890 }
891 email = openid = None
892 if "email" in request.POST:
893 with contextlib.suppress(ConfirmedEmail.DoesNotExist):
894 email = ConfirmedEmail.objects.get(email=request.POST["email"])
895 if "openid" in request.POST:
896 with contextlib.suppress(ConfirmedOpenId.DoesNotExist):
897 openid = ConfirmedOpenId.objects.get( # pylint: disable=no-member
898 openid=request.POST["openid"]
899 )
900 return photo.perform_crop(request, dimensions, email, openid)
903@method_decorator(login_required, name="dispatch") # pylint: disable=too-many-ancestors
904class UserPreferenceView(FormView, UpdateView):
905 """
906 View class for user preferences view/update
907 """
909 template_name = "preferences.html"
910 model = UserPreference
911 form_class = UpdatePreferenceForm
912 success_url = reverse_lazy("user_preference")
914 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
915 """
916 Process POST-ed data from this form
917 """
918 userpref = None
919 try:
920 userpref = self.request.user.userpreference
921 except ObjectDoesNotExist:
922 userpref = UserPreference(user=self.request.user)
923 userpref.theme = request.POST["theme"]
924 userpref.save()
925 try:
926 if request.POST["email"] != self.request.user.email:
927 addresses = list(
928 self.request.user.confirmedemail_set.all().values_list(
929 "email", flat=True
930 )
931 )
932 if request.POST["email"] not in addresses:
933 messages.error(
934 self.request,
935 _(f'Mail address not allowed: {request.POST["email"]}'),
936 )
937 else:
938 self.request.user.email = request.POST["email"]
939 self.request.user.save()
940 messages.info(self.request, _("Mail address changed."))
941 except Exception as e: # pylint: disable=broad-except
942 messages.error(self.request, _(f"Error setting new mail address: {e}"))
944 try:
945 if request.POST["first_name"] or request.POST["last_name"]:
946 if request.POST["first_name"] != self.request.user.first_name:
947 self.request.user.first_name = request.POST["first_name"]
948 messages.info(self.request, _("First name changed."))
949 if request.POST["last_name"] != self.request.user.last_name:
950 self.request.user.last_name = request.POST["last_name"]
951 messages.info(self.request, _("Last name changed."))
952 self.request.user.save()
953 except Exception as e: # pylint: disable=broad-except
954 messages.error(self.request, _(f"Error setting names: {e}"))
956 return HttpResponseRedirect(reverse_lazy("user_preference"))
958 def get(self, request, *args, **kwargs):
959 return render(
960 self.request,
961 self.template_name,
962 {
963 "THEMES": UserPreference.THEMES,
964 },
965 )
967 def get_object(self, queryset=None):
968 (obj, created) = UserPreference.objects.get_or_create(
969 user=self.request.user
970 ) # pylint: disable=no-member,unused-variable
971 return obj
974@method_decorator(login_required, name="dispatch")
975class UploadLibravatarExportView(SuccessMessageMixin, FormView):
976 """
977 View class responsible for libravatar user data export upload
978 """
980 template_name = "upload_libravatar_export.html"
981 form_class = UploadLibravatarExportForm
982 success_message = _("Successfully uploaded")
983 success_url = reverse_lazy("profile")
984 model = User
986 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
987 """
988 Handle post request - choose items to import
989 """
990 if "save" in kwargs: # pylint: disable=too-many-nested-blocks
991 if kwargs["save"] == "save":
992 for arg in request.POST:
993 if arg.startswith("email_"):
994 email = request.POST[arg]
995 if not ConfirmedEmail.objects.filter(
996 email=email
997 ) and not UnconfirmedEmail.objects.filter(
998 email=email
999 ): # pylint: disable=no-member
1000 try:
1001 unconfirmed = UnconfirmedEmail.objects.create( # pylint: disable=no-member
1002 user=request.user, email=email
1003 )
1004 unconfirmed.save()
1005 unconfirmed.send_confirmation_mail(
1006 url=request.build_absolute_uri("/")[:-1]
1007 )
1008 messages.info(
1009 request,
1010 "%s: %s"
1011 % (
1012 email,
1013 _(
1014 "address added successfully,\
1015 confirmation mail sent"
1016 ),
1017 ),
1018 )
1019 except Exception as exc: # pylint: disable=broad-except
1020 # DEBUG
1021 print(
1022 f"Exception during adding mail address ({email}): {exc}"
1023 )
1025 if arg.startswith("photo"):
1026 try:
1027 data = base64.decodebytes(bytes(request.POST[arg], "utf-8"))
1028 except binascii.Error as exc:
1029 print(f"Cannot decode photo: {exc}")
1030 continue
1031 try:
1032 pilobj = Image.open(BytesIO(data))
1033 out = BytesIO()
1034 pilobj.save(out, pilobj.format, quality=JPEG_QUALITY)
1035 out.seek(0)
1036 photo = Photo()
1037 photo.user = request.user
1038 photo.ip_address = get_client_ip(request)[0]
1039 photo.format = file_format(pilobj.format)
1040 photo.data = out.read()
1041 photo.save()
1042 except Exception as exc: # pylint: disable=broad-except
1043 print(f"Exception during save: {exc}")
1044 continue
1046 return HttpResponseRedirect(reverse_lazy("profile"))
1047 return super().post(request, args, kwargs)
1049 def form_valid(self, form):
1050 data = self.request.FILES["export_file"]
1051 try:
1052 items = libravatar_read_gzdata(data.read())
1053 # DEBUG print(items)
1054 return render(
1055 self.request,
1056 "choose_libravatar_export.html",
1057 {
1058 "emails": items["emails"],
1059 "photos": items["photos"],
1060 },
1061 )
1062 except Exception as e:
1063 messages.error(self.request, _(f"Unable to parse file: {e}"))
1064 return HttpResponseRedirect(reverse_lazy("upload_export"))
1067@method_decorator(login_required, name="dispatch")
1068class ResendConfirmationMailView(View):
1069 """
1070 View class for resending confirmation mail
1071 """
1073 model = UnconfirmedEmail
1075 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument
1076 """
1077 Handle post - resend confirmation mail for unconfirmed e-mail address
1078 """
1079 try:
1080 email = self.model.objects.get( # pylint: disable=no-member
1081 user=request.user, id=kwargs["email_id"]
1082 )
1083 except self.model.DoesNotExist: # pragma: no cover pylint: disable=no-member
1084 messages.error(request, _("ID does not exist"))
1085 else:
1086 try:
1087 email.send_confirmation_mail(url=request.build_absolute_uri("/")[:-1])
1088 messages.success(
1089 request, f'{_("Confirmation mail sent to")}: {email.email}'
1090 )
1091 except Exception as exc: # pylint: disable=broad-except
1092 messages.error(
1093 request,
1094 f'{_("Unable to send confirmation email for")} {email.email}: {exc}',
1095 )
1096 return HttpResponseRedirect(reverse_lazy("profile"))
1099class IvatarLoginView(LoginView):
1100 """
1101 View class for login
1102 """
1104 template_name = "login.html"
1106 def get(self, request, *args, **kwargs):
1107 """
1108 Handle get for login view
1109 """
1110 if request.user:
1111 if request.user.is_authenticated:
1112 # Respect the 'next' parameter if present
1113 next_url = request.GET.get("next")
1114 if next_url:
1115 return HttpResponseRedirect(next_url)
1116 return HttpResponseRedirect(reverse_lazy("profile"))
1117 return super().get(self, request, args, kwargs)
1119 def get_context_data(self, **kwargs):
1120 context = super().get_context_data(**kwargs)
1121 context["with_fedora"] = SOCIAL_AUTH_FEDORA_KEY is not None
1122 return context
1125@method_decorator(login_required, name="dispatch")
1126class ProfileView(TemplateView):
1127 """
1128 View class for profile
1129 """
1131 template_name = "profile.html"
1133 def get(self, request, *args, **kwargs):
1134 if "profile_username" in kwargs:
1135 if not request.user.is_staff:
1136 return HttpResponseRedirect(reverse_lazy("profile"))
1137 with contextlib.suppress(Exception):
1138 u = User.objects.get(username=kwargs["profile_username"])
1139 request.user = u
1140 self._confirm_claimed_openid()
1141 return super().get(self, request, args, kwargs)
1143 def get_context_data(self, **kwargs):
1144 """
1145 Provide additional context data, like if max_photos is reached
1146 already or not.
1147 """
1148 context = super().get_context_data(**kwargs)
1149 context["max_photos"] = False
1150 if self.request.user:
1151 if self.request.user.photo_set.all().count() >= MAX_NUM_PHOTOS:
1152 context["max_photos"] = True
1153 return context
1155 def _confirm_claimed_openid(self):
1156 openids = self.request.user.useropenid_set.all()
1157 # If there is only one OpenID, we eventually need to add it to
1158 # the user account
1159 if openids.count() == 1:
1160 # Already confirmed, skip
1161 if ConfirmedOpenId.objects.filter( # pylint: disable=no-member
1162 openid=openids.first().claimed_id
1163 ).exists():
1164 return
1165 # For whatever reason, this is in unconfirmed state, skip
1166 if UnconfirmedOpenId.objects.filter( # pylint: disable=no-member
1167 openid=openids.first().claimed_id
1168 ).exists():
1169 return
1170 print(f"need to confirm: {openids.first()}")
1171 confirmed = ConfirmedOpenId()
1172 confirmed.user = self.request.user
1173 confirmed.ip_address = get_client_ip(self.request)[0]
1174 confirmed.openid = openids.first().claimed_id
1175 confirmed.save()
1178class PasswordResetView(PasswordResetViewOriginal):
1179 """
1180 View class for password reset
1181 """
1183 def post(self, request, *args, **kwargs):
1184 """
1185 Since we have the mail addresses in ConfirmedEmail model,
1186 we need to set the email on the user object in order for the
1187 PasswordResetView class to pick up the correct user.
1188 In case we have the mail address in the User object, we still
1189 need to assign a random password in order for PasswordResetView
1190 class to pick up the user - else it will silently do nothing.
1191 """
1192 if "email" in request.POST:
1193 user = None
1195 # Try to find the user via the normal user class
1196 # TODO: How to handle the case that multiple user accounts
1197 # could have the same password set?
1198 user = User.objects.filter(email=request.POST["email"]).first()
1200 # If we didn't find the user in the previous step,
1201 # try the ConfirmedEmail class instead.
1202 # If we find the user there, we need to set the mail
1203 # attribute on the user object accordingly
1204 if not user:
1205 with contextlib.suppress(ObjectDoesNotExist):
1206 confirmed_email = ConfirmedEmail.objects.get(
1207 email=request.POST["email"]
1208 )
1209 user = confirmed_email.user
1210 user.email = confirmed_email.email
1211 user.save()
1212 # If we found the user, set a random password. Else, the
1213 # ResetPasswordView class will silently ignore the password
1214 # reset request
1215 if user:
1216 if not user.password or user.password.startswith("!"):
1217 random_pass = User.objects.make_random_password()
1218 user.set_password(random_pass)
1219 user.save()
1221 # Whatever happens above, let the original function handle the rest
1222 return super().post(self, request, args, kwargs)
1225@method_decorator(login_required, name="dispatch")
1226class DeleteAccountView(SuccessMessageMixin, FormView):
1227 """
1228 View class for account deletion
1229 """
1231 template_name = "delete.html"
1232 form_class = DeleteAccountForm
1233 success_url = reverse_lazy("home")
1235 def get(self, request, *args, **kwargs):
1236 return super().get(self, request, args, kwargs)
1238 def post(self, request, *args, **kwargs):
1239 """
1240 Handle account deletion
1241 """
1242 if request.user.password:
1243 if "password" in request.POST:
1244 if not request.user.check_password(request.POST["password"]):
1245 messages.error(request, _("Incorrect password"))
1246 return HttpResponseRedirect(reverse_lazy("delete"))
1247 else:
1248 messages.error(request, _("No password given"))
1249 return HttpResponseRedirect(reverse_lazy("delete"))
1251 # should delete all confirmed/unconfirmed/photo objects
1252 request.user.delete()
1253 return super().post(self, request, args, kwargs)
1256@method_decorator(login_required, name="dispatch")
1257class ExportView(SuccessMessageMixin, TemplateView):
1258 """
1259 View class responsible for libravatar user data export
1260 """
1262 template_name = "export.html"
1263 model = User
1265 def get(self, request, *args, **kwargs):
1266 return super().get(self, request, args, kwargs)
1268 def post(self, request, *args, **kwargs):
1269 """
1270 Handle real export
1271 """
1272 SCHEMA_ROOT = "https://www.libravatar.org/schemas/export/0.2"
1273 SCHEMA_XSD = f"{SCHEMA_ROOT}/export.xsd"
1275 def xml_header():
1276 return (
1277 """<?xml version="1.0" encoding="UTF-8"?>"""
1278 '''<user xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"'''
1279 ''' xsi:schemaLocation="%s %s"'''
1280 """ xmlns="%s">\n""" % (SCHEMA_ROOT, SCHEMA_XSD, SCHEMA_ROOT)
1281 )
1283 def xml_footer():
1284 return "</user>\n"
1286 def xml_account(user):
1287 escaped_username = saxutils.quoteattr(user.username)
1288 escaped_password = saxutils.quoteattr(user.password)
1289 return " <account username=%s password=%s/>\n" % (
1290 escaped_username,
1291 escaped_password,
1292 )
1294 def xml_email(user):
1295 returnstring = " <emails>\n"
1296 for email in user.confirmedemail_set.all():
1297 returnstring += (
1298 ' <email photo_id="'
1299 + str(email.photo_id)
1300 + '">'
1301 + str(email.email)
1302 + "</email>"
1303 + "\n"
1304 )
1305 returnstring += " </emails>\n"
1306 return returnstring
1308 def xml_openid(user):
1309 returnstring = " <openids>\n"
1310 for openid in user.confirmedopenid_set.all():
1311 returnstring += (
1312 ' <openid photo_id="'
1313 + str(openid.photo_id)
1314 + '">'
1315 + str(openid.openid)
1316 + "</openid>"
1317 + "\n"
1318 )
1319 returnstring += " </openids>\n"
1320 return returnstring
1322 def xml_photos(user):
1323 s = " <photos>\n"
1324 for photo in user.photo_set.all():
1325 encoded_photo = base64.b64encode(photo.data)
1326 if encoded_photo:
1327 s += (
1328 """ <photo id="%s" encoding="base64" format=%s>"""
1329 """%s"""
1330 """</photo>\n"""
1331 % (photo.id, saxutils.quoteattr(photo.format), encoded_photo)
1332 )
1333 s += " </photos>\n"
1334 return s
1336 user = request.user
1338 photos = []
1339 for photo in user.photo_set.all():
1340 photo_details = {"data": photo.data, "format": photo.format}
1341 photos.append(photo_details)
1343 bytesobj = BytesIO()
1344 data = gzip.GzipFile(fileobj=bytesobj, mode="w")
1345 data.write(bytes(xml_header(), "utf-8"))
1346 data.write(bytes(xml_account(user), "utf-8"))
1347 data.write(bytes(xml_email(user), "utf-8"))
1348 data.write(bytes(xml_openid(user), "utf-8"))
1349 data.write(bytes(xml_photos(user), "utf-8"))
1350 data.write(bytes(xml_footer(), "utf-8"))
1351 data.close()
1352 bytesobj.seek(0)
1354 response = HttpResponse(content_type="application/gzip")
1355 response[
1356 "Content-Disposition"
1357 ] = f'attachment; filename="libravatar-export_{user.username}.xml.gz"'
1358 response.write(bytesobj.read())
1359 return response