Coverage for ivatar/ivataraccount/views.py: 75%
661 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-12 23:12 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-12 23:12 +0000
1# -*- coding: utf-8 -*-
2"""
3View classes for ivatar/ivataraccount/
4"""
6from io import BytesIO
7from ivatar.utils import urlopen, Bluesky
8import base64
9import binascii
10import contextlib
11from xml.sax import saxutils
12import gzip
14from PIL import Image
16from django.db.models import ProtectedError
17from django.core.exceptions import ObjectDoesNotExist
18from django.contrib.auth.decorators import login_required
19from django.contrib.auth.models import User
20from django.utils.decorators import method_decorator
21from django.contrib.messages.views import SuccessMessageMixin
22from django.contrib import messages
23from django.views.generic.edit import FormView, UpdateView
24from django.views.generic.base import View, TemplateView
25from django.views.generic.detail import DetailView
26from django.contrib.auth import authenticate, login
27from django.contrib.auth.forms import UserCreationForm, SetPasswordForm
28from django.contrib.auth.views import LoginView
29from django.contrib.auth.views import (
30 PasswordResetView as PasswordResetViewOriginal,
31)
32from django.utils.translation import gettext_lazy as _
33from django.http import HttpResponseRedirect, HttpResponse
34from django.urls import reverse_lazy, reverse
35from django.shortcuts import render
36from django_openid_auth.models import UserOpenID
38from openid import oidutil
39from openid.consumer import consumer
41from ipware import get_client_ip
43from email_validator import validate_email
45from libravatar import libravatar_url
46from ivatar.settings import (
47 MAX_NUM_PHOTOS,
48 MAX_PHOTO_SIZE,
49 JPEG_QUALITY,
50 AVATAR_MAX_SIZE,
51 SOCIAL_AUTH_FEDORA_KEY,
52)
53from .gravatar import get_photo as get_gravatar_photo
55from .forms import AddEmailForm, UploadPhotoForm, AddOpenIDForm
56from .forms import UpdatePreferenceForm, UploadLibravatarExportForm
57from .forms import DeleteAccountForm
58from .models import UnconfirmedEmail, ConfirmedEmail, Photo
59from .models import UnconfirmedOpenId, ConfirmedOpenId, DjangoOpenIDStore
60from .models import UserPreference
61from .models import file_format
62from .read_libravatar_export import read_gzdata as libravatar_read_gzdata
65def openid_logging(message, level=0):
66 """
67 Helper method for openid logging
68 """
69 # Normal messages are not that important
70 # No need for coverage here
71 if level > 0: # pragma: no cover
72 print(message)
75class CreateView(SuccessMessageMixin, FormView):
76 """
77 View class for creating a new user
78 """
80 template_name = "new.html"
81 form_class = UserCreationForm
83 def form_valid(self, form):
84 form.save()
85 user = authenticate(
86 username=form.cleaned_data["username"],
87 password=form.cleaned_data["password1"],
88 )
89 if user is not None:
90 # If the username looks like a mail address, automagically
91 # add it as unconfirmed mail and set it also as user's
92 # email address
93 with contextlib.suppress(Exception):
94 self._extracted_from_form_valid_(form, user)
95 login(self.request, user)
96 pref = UserPreference.objects.create(
97 user_id=user.pk
98 ) # pylint: disable=no-member
99 pref.save()
100 return HttpResponseRedirect(reverse_lazy("profile"))
101 return HttpResponseRedirect(reverse_lazy("login")) # pragma: no cover
103 def _extracted_from_form_valid_(self, form, user):
104 # This will error out if it's not a valid address
105 valid = validate_email(form.cleaned_data["username"])
106 user.email = valid.email
107 user.save()
108 # The following will also error out if it already exists
109 unconfirmed = UnconfirmedEmail()
110 unconfirmed.email = valid.email
111 unconfirmed.user = user
112 unconfirmed.save()
113 unconfirmed.send_confirmation_mail(
114 url=self.request.build_absolute_uri("/")[:-1]
115 )
117 def get(self, request, *args, **kwargs):
118 """
119 Handle get for create view
120 """
121 if request.user and request.user.is_authenticated:
122 return HttpResponseRedirect(reverse_lazy("profile"))
123 return super().get(self, request, args, kwargs)
126@method_decorator(login_required, name="dispatch")
127class PasswordSetView(SuccessMessageMixin, FormView):
128 """
129 View class for changing the password
130 """
132 template_name = "password_change.html"
133 form_class = SetPasswordForm
134 success_message = _("password changed successfully - please login again")
135 success_url = reverse_lazy("profile")
137 def get_form_kwargs(self):
138 kwargs = super(PasswordSetView, self).get_form_kwargs()
139 kwargs["user"] = self.request.user
140 return kwargs
142 def form_valid(self, form):
143 form.save()
144 super().form_valid(form)
145 return HttpResponseRedirect(reverse_lazy("login"))
148@method_decorator(login_required, name="dispatch")
149class AddEmailView(SuccessMessageMixin, FormView):
150 """
151 View class for adding email addresses
152 """
154 template_name = "add_email.html"
155 form_class = AddEmailForm
156 success_url = reverse_lazy("profile")
158 def form_valid(self, form):
159 if not form.save(self.request):
160 return render(self.request, self.template_name, {"form": form})
162 messages.success(self.request, _("Address added successfully"))
163 return super().form_valid(form)
166@method_decorator(login_required, name="dispatch")
167class RemoveUnconfirmedEmailView(SuccessMessageMixin, View):
168 """
169 View class for removing a unconfirmed email address
170 """
172 @staticmethod
173 def post(request, *args, **kwargs): # pylint: disable=unused-argument
174 """
175 Handle post request - removing unconfirmed email
176 """
177 try:
178 email = UnconfirmedEmail.objects.get( # pylint: disable=no-member
179 user=request.user, id=kwargs["email_id"]
180 )
181 email.delete()
182 messages.success(request, _("Address removed"))
183 except UnconfirmedEmail.DoesNotExist: # pylint: disable=no-member
184 messages.error(request, _("Address does not exist"))
185 return HttpResponseRedirect(reverse_lazy("profile"))
188class ConfirmEmailView(SuccessMessageMixin, TemplateView):
189 """
190 View class for confirming an unconfirmed email address
191 """
193 template_name = "email_confirmed.html"
195 def get(self, request, *args, **kwargs):
196 # be tolerant of extra crap added by mail clients
197 key = kwargs["verification_key"].replace(" ", "")
199 if len(key) != 64:
200 messages.error(request, _("Verification key incorrect"))
201 return HttpResponseRedirect(reverse_lazy("profile"))
203 try:
204 unconfirmed = UnconfirmedEmail.objects.get(
205 verification_key=key
206 ) # pylint: disable=no-member
207 except UnconfirmedEmail.DoesNotExist: # pylint: disable=no-member
208 messages.error(request, _("Verification key does not exist"))
209 return HttpResponseRedirect(reverse_lazy("profile"))
211 if ConfirmedEmail.objects.filter(email=unconfirmed.email).count() > 0:
212 messages.error(
213 request,
214 _("This mail address has been taken already and cannot be confirmed"),
215 )
216 return HttpResponseRedirect(reverse_lazy("profile"))
218 # TODO: Check for a reasonable expiration time in unconfirmed email
220 (confirmed_id, external_photos) = ConfirmedEmail.objects.create_confirmed_email(
221 unconfirmed.user, unconfirmed.email, not request.user.is_anonymous
222 )
224 unconfirmed.delete()
226 # if there's a single image in this user's profile,
227 # assign it to the new email
228 confirmed = ConfirmedEmail.objects.get(id=confirmed_id)
229 if confirmed.user.photo_set.count() == 1:
230 confirmed.set_photo(confirmed.user.photo_set.first())
231 kwargs["photos"] = external_photos
232 kwargs["email_id"] = confirmed_id
233 return super().get(request, *args, **kwargs)
236@method_decorator(login_required, name="dispatch")
237class RemoveConfirmedEmailView(SuccessMessageMixin, View):
238 """
239 View class for removing a confirmed email address
240 """
242 @staticmethod
243 def post(request, *args, **kwargs): # pylint: disable=unused-argument
244 """
245 Handle post request - removing confirmed email
246 """
247 try:
248 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
249 email.delete()
250 messages.success(request, _("Address removed"))
251 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
252 messages.error(request, _("Address does not exist"))
253 return HttpResponseRedirect(reverse_lazy("profile"))
256@method_decorator(login_required, name="dispatch")
257class AssignPhotoEmailView(SuccessMessageMixin, TemplateView):
258 """
259 View class for assigning a photo to an email address
260 """
262 model = Photo
263 template_name = "assign_photo_email.html"
265 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
266 """
267 Handle post request - assign photo to email
268 """
269 photo = None
271 try:
272 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
273 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
274 messages.error(request, _("Invalid request"))
275 return HttpResponseRedirect(reverse_lazy("profile"))
277 if "photoNone" in request.POST:
278 email.photo = None
279 else:
280 if "photo_id" not in request.POST:
281 messages.error(request, _("Invalid request [photo_id] missing"))
282 return HttpResponseRedirect(reverse_lazy("profile"))
284 try:
285 photo = self.model.objects.get( # pylint: disable=no-member
286 id=request.POST["photo_id"], user=request.user
287 )
288 except self.model.DoesNotExist: # pylint: disable=no-member
289 messages.error(request, _("Photo does not exist"))
290 return HttpResponseRedirect(reverse_lazy("profile"))
291 email.photo = photo
292 email.bluesky_handle = None
293 email.save()
295 messages.success(request, _("Successfully changed photo"))
296 return HttpResponseRedirect(reverse_lazy("profile"))
298 def get_context_data(self, **kwargs):
299 data = super().get_context_data(**kwargs)
300 data["email"] = ConfirmedEmail.objects.get(pk=kwargs["email_id"])
301 return data
304@method_decorator(login_required, name="dispatch")
305class AssignPhotoOpenIDView(SuccessMessageMixin, TemplateView):
306 """
307 View class for assigning a photo to an openid address
308 """
310 model = Photo
311 template_name = "assign_photo_openid.html"
313 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
314 """
315 Handle post - assign photo to openid
316 """
317 photo = None
319 try:
320 openid = ConfirmedOpenId.objects.get( # pylint: disable=no-member
321 user=request.user, id=kwargs["openid_id"]
322 )
323 except ConfirmedOpenId.DoesNotExist: # pylint: disable=no-member
324 messages.error(request, _("Invalid request"))
325 return HttpResponseRedirect(reverse_lazy("profile"))
327 if "photoNone" in request.POST:
328 openid.photo = None
329 else:
330 if "photo_id" not in request.POST:
331 messages.error(request, _("Invalid request [photo_id] missing"))
332 return HttpResponseRedirect(reverse_lazy("profile"))
334 try:
335 photo = self.model.objects.get( # pylint: disable=no-member
336 id=request.POST["photo_id"], user=request.user
337 )
338 except self.model.DoesNotExist: # pylint: disable=no-member
339 messages.error(request, _("Photo does not exist"))
340 return HttpResponseRedirect(reverse_lazy("profile"))
341 openid.photo = photo
342 openid.bluesky_handle = None
343 openid.save()
345 messages.success(request, _("Successfully changed photo"))
346 return HttpResponseRedirect(reverse_lazy("profile"))
348 def get_context_data(self, **kwargs):
349 data = super().get_context_data(**kwargs)
350 data["openid"] = ConfirmedOpenId.objects.get(
351 pk=kwargs["openid_id"]
352 ) # pylint: disable=no-member
353 return data
356@method_decorator(login_required, name="dispatch")
357class AssignBlueskyHandleToEmailView(SuccessMessageMixin, TemplateView):
358 """
359 View class for assigning a Bluesky handle to an email address
360 """
362 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
363 """
364 Handle post request - assign bluesky handle to email
365 """
367 try:
368 email = ConfirmedEmail.objects.get(user=request.user, id=kwargs["email_id"])
369 except ConfirmedEmail.DoesNotExist: # pylint: disable=no-member
370 messages.error(request, _("Invalid request"))
371 return HttpResponseRedirect(reverse_lazy("profile"))
373 if "bluesky_handle" not in request.POST:
374 messages.error(request, _("Invalid request [bluesky_handle] missing"))
375 return HttpResponseRedirect(reverse_lazy("profile"))
376 bluesky_handle = request.POST["bluesky_handle"]
378 try:
379 bs = Bluesky()
381 bs.get_avatar(bluesky_handle)
382 except Exception as e:
383 messages.error(request, _(f"Handle '{bluesky_handle}' not found: {e}"))
384 return HttpResponseRedirect(
385 reverse_lazy(
386 "assign_photo_email", kwargs={"email_id": int(kwargs["email_id"])}
387 )
388 )
389 try:
390 email.set_bluesky_handle(bluesky_handle)
391 except Exception as e:
392 messages.error(request, _(f"Error: {e}"))
393 return HttpResponseRedirect(
394 reverse_lazy(
395 "assign_photo_email", kwargs={"email_id": int(kwargs["email_id"])}
396 )
397 )
398 email.photo = None
399 email.save()
401 messages.success(request, _("Successfully assigned Bluesky handle"))
402 return HttpResponseRedirect(reverse_lazy("profile"))
404 def get_context_data(self, **kwargs):
405 data = super().get_context_data(**kwargs)
406 data["email"] = ConfirmedEmail.objects.get(pk=kwargs["email_id"])
407 return data
410@method_decorator(login_required, name="dispatch")
411class AssignBlueskyHandleToOpenIdView(SuccessMessageMixin, TemplateView):
412 """
413 View class for assigning a Bluesky handle to an email address
414 """
416 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
417 """
418 Handle post request - assign bluesky handle to email
419 """
421 try:
422 openid = ConfirmedOpenId.objects.get(
423 user=request.user, id=kwargs["open_id"]
424 )
425 except ConfirmedOpenId.DoesNotExist: # pylint: disable=no-member
426 messages.error(request, _("Invalid request"))
427 return HttpResponseRedirect(reverse_lazy("profile"))
429 if "bluesky_handle" not in request.POST:
430 messages.error(request, _("Invalid request [bluesky_handle] missing"))
431 return HttpResponseRedirect(reverse_lazy("profile"))
432 bluesky_handle = request.POST["bluesky_handle"]
434 try:
435 bs = Bluesky()
437 bs.get_avatar(bluesky_handle)
438 except Exception as e:
439 messages.error(request, _(f"Handle '{bluesky_handle}' not found: {e}"))
440 return HttpResponseRedirect(
441 reverse_lazy(
442 "assign_photo_openid", kwargs={"openid_id": int(kwargs["open_id"])}
443 )
444 )
445 try:
446 openid.set_bluesky_handle(bluesky_handle)
447 except Exception as e:
448 messages.error(request, _(f"Error: {e}"))
449 return HttpResponseRedirect(
450 reverse_lazy(
451 "assign_photo_openid", kwargs={"openid_id": int(kwargs["open_id"])}
452 )
453 )
454 openid.photo = None
455 openid.save()
457 messages.success(request, _("Successfully assigned Bluesky handle"))
458 return HttpResponseRedirect(reverse_lazy("profile"))
460 def get_context_data(self, **kwargs):
461 data = super().get_context_data(**kwargs)
462 data["openid"] = ConfirmedOpenId.objects.get(pk=kwargs["open_id"])
463 return data
466@method_decorator(login_required, name="dispatch")
467class ImportPhotoView(SuccessMessageMixin, TemplateView):
468 """
469 View class to import a photo from another service
470 Currently only Gravatar is supported
471 """
473 template_name = "import_photo.html"
475 def get_context_data(self, **kwargs):
476 context = super().get_context_data(**kwargs)
477 context["photos"] = []
478 addr = None
479 if "email_id" in kwargs:
480 try:
481 addr = ConfirmedEmail.objects.get(pk=kwargs["email_id"]).email
482 except ConfirmedEmail.ObjectDoesNotExist: # pylint: disable=no-member
483 messages.error(self.request, _("Address does not exist"))
484 return context
486 if addr := kwargs.get("email_addr", None):
487 if gravatar := get_gravatar_photo(addr):
488 context["photos"].append(gravatar)
490 if libravatar_service_url := libravatar_url(
491 email=addr,
492 default=404,
493 size=AVATAR_MAX_SIZE,
494 ):
495 try:
496 urlopen(libravatar_service_url)
497 except OSError as exc:
498 print(f"Exception caught during photo import: {exc}")
499 else:
500 context["photos"].append(
501 {
502 "service_url": libravatar_service_url,
503 "thumbnail_url": f"{libravatar_service_url}&s=80",
504 "image_url": f"{libravatar_service_url}&s=512",
505 "width": 80,
506 "height": 80,
507 "service_name": "Libravatar",
508 }
509 )
511 return context
513 def post(
514 self, request, *args, **kwargs
515 ): # pylint: disable=no-self-use,unused-argument,too-many-branches,line-too-long
516 """
517 Handle post to photo import
518 """
520 imported = None
522 email_id = kwargs.get("email_id", request.POST.get("email_id", None))
523 addr = kwargs.get("email", request.POST.get("email_addr", None))
525 if email_id:
526 email = ConfirmedEmail.objects.filter(id=email_id, user=request.user)
527 if email.exists():
528 addr = email.first().email
529 else:
530 messages.error(request, _("Address does not exist"))
531 return HttpResponseRedirect(reverse_lazy("profile"))
533 if "photo_Gravatar" in request.POST:
534 photo = Photo()
535 photo.user = request.user
536 photo.ip_address = get_client_ip(request)[0]
537 if photo.import_image("Gravatar", addr):
538 messages.success(request, _("Gravatar image successfully imported"))
539 else:
540 # Honestly, I'm not sure how to test this...
541 messages.error(
542 request, _("Gravatar image import not successful")
543 ) # pragma: no cover
544 imported = True
546 if "photo_Libravatar" in request.POST:
547 photo = Photo()
548 photo.user = request.user
549 photo.ip_address = get_client_ip(request)[0]
550 if photo.import_image("Libravatar", addr):
551 messages.success(request, _("Libravatar image successfully imported"))
552 else:
553 # Honestly, I'm not sure how to test this...
554 messages.error(
555 request, _("Libravatar image import not successful")
556 ) # pragma: no cover
557 imported = True
558 if not imported:
559 messages.warning(request, _("Nothing importable"))
560 return HttpResponseRedirect(reverse_lazy("profile"))
563@method_decorator(login_required, name="dispatch")
564class RawImageView(DetailView):
565 """
566 View to return (binary) raw image data, for use in <img/>-tags
567 """
569 model = Photo
571 def get(self, request, *args, **kwargs):
572 photo = self.model.objects.get(pk=kwargs["pk"]) # pylint: disable=no-member
573 if photo.user.id != request.user.id and not request.user.is_staff:
574 return HttpResponseRedirect(reverse_lazy("home"))
575 return HttpResponse(BytesIO(photo.data), content_type=f"image/{photo.format}")
578@method_decorator(login_required, name="dispatch")
579class DeletePhotoView(SuccessMessageMixin, View):
580 """
581 View class for deleting a photo
582 """
584 model = Photo
586 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument
587 """
588 Handle get - delete photo
589 """
590 try:
591 photo = self.model.objects.get( # pylint: disable=no-member
592 pk=kwargs["pk"], user=request.user
593 )
594 photo.delete()
595 except (self.model.DoesNotExist, ProtectedError): # pylint: disable=no-member
596 messages.error(request, _("No such image or no permission to delete it"))
597 return HttpResponseRedirect(reverse_lazy("profile"))
598 messages.success(request, _("Photo deleted successfully"))
599 return HttpResponseRedirect(reverse_lazy("profile"))
602@method_decorator(login_required, name="dispatch")
603class UploadPhotoView(SuccessMessageMixin, FormView):
604 """
605 View class responsible for photo upload
606 """
608 model = Photo
609 template_name = "upload_photo.html"
610 form_class = UploadPhotoForm
611 success_message = _("Successfully uploaded")
612 success_url = reverse_lazy("profile")
614 def post(self, request, *args, **kwargs):
615 num_photos = request.user.photo_set.count()
616 if num_photos >= MAX_NUM_PHOTOS:
617 messages.error(
618 request, _("Maximum number of photos (%i) reached" % MAX_NUM_PHOTOS)
619 )
620 return HttpResponseRedirect(reverse_lazy("profile"))
621 return super().post(request, *args, **kwargs)
623 def form_valid(self, form):
624 photo_data = self.request.FILES["photo"]
625 if photo_data.size > MAX_PHOTO_SIZE:
626 messages.error(self.request, _("Image too big"))
627 return HttpResponseRedirect(reverse_lazy("profile"))
629 photo = form.save(self.request, photo_data)
631 if not photo:
632 messages.error(self.request, _("Invalid Format"))
633 return HttpResponseRedirect(reverse_lazy("profile"))
635 # Override success URL -> Redirect to crop page.
636 self.success_url = reverse_lazy("crop_photo", args=[photo.pk])
637 return super().form_valid(form)
640@method_decorator(login_required, name="dispatch")
641class AddOpenIDView(SuccessMessageMixin, FormView):
642 """
643 View class for adding OpenID
644 """
646 template_name = "add_openid.html"
647 form_class = AddOpenIDForm
648 success_url = reverse_lazy("profile")
650 def form_valid(self, form):
651 if openid_id := form.save(self.request.user):
652 # At this point we have an unconfirmed OpenID, but
653 # we do not add the message, that we successfully added it,
654 # since this is misleading
655 return HttpResponseRedirect(
656 reverse_lazy("openid_redirection", args=[openid_id])
657 )
658 else:
659 return render(self.request, self.template_name, {"form": form})
662@method_decorator(login_required, name="dispatch")
663class RemoveUnconfirmedOpenIDView(View):
664 """
665 View class for removing a unconfirmed OpenID
666 """
668 model = UnconfirmedOpenId
670 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
671 """
672 Handle post - remove unconfirmed openid
673 """
674 try:
675 openid = self.model.objects.get( # pylint: disable=no-member
676 user=request.user, id=kwargs["openid_id"]
677 )
678 openid.delete()
679 messages.success(request, _("ID removed"))
680 except self.model.DoesNotExist: # pragma: no cover pylint: disable=no-member,line-too-long
681 messages.error(request, _("ID does not exist"))
682 return HttpResponseRedirect(reverse_lazy("profile"))
685@method_decorator(login_required, name="dispatch")
686class RemoveConfirmedOpenIDView(View):
687 """
688 View class for removing a confirmed OpenID
689 """
691 model = ConfirmedOpenId
693 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
694 """
695 Handle post - remove confirmed openid
696 """
697 try:
698 openid = self.model.objects.get( # pylint: disable=no-member
699 user=request.user, id=kwargs["openid_id"]
700 )
701 try:
702 openidobj = (
703 UserOpenID.objects.get( # pylint: disable=no-member,line-too-long
704 user_id=request.user.id, claimed_id=openid.openid
705 )
706 )
707 openidobj.delete()
708 except Exception as exc: # pylint: disable=broad-except
709 # Why it is not there?
710 print(f"How did we get here: {exc}")
711 openid.delete()
712 messages.success(request, _("ID removed"))
713 except self.model.DoesNotExist: # pylint: disable=no-member
714 messages.error(request, _("ID does not exist"))
715 return HttpResponseRedirect(reverse_lazy("profile"))
718@method_decorator(login_required, name="dispatch")
719class RedirectOpenIDView(View):
720 """
721 Redirect view for OpenID
722 """
724 model = UnconfirmedOpenId
726 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument
727 """
728 Handle get for OpenID redirect view
729 """
730 try:
731 unconfirmed = self.model.objects.get( # pylint: disable=no-member
732 user=request.user, id=kwargs["openid_id"]
733 )
734 except self.model.DoesNotExist: # pragma: no cover pylint: disable=no-member,line-too-long
735 messages.error(request, _("ID does not exist"))
736 return HttpResponseRedirect(reverse_lazy("profile"))
738 user_url = unconfirmed.openid
739 session = {"id": request.session.session_key}
741 oidutil.log = openid_logging
742 openid_consumer = consumer.Consumer(session, DjangoOpenIDStore())
744 try:
745 auth_request = openid_consumer.begin(user_url)
746 except consumer.DiscoveryFailure as exc:
747 messages.error(request, _(f"OpenID discovery failed: {exc}"))
748 return HttpResponseRedirect(reverse_lazy("profile"))
749 except UnicodeDecodeError as exc: # pragma: no cover
750 msg = _(
751 "OpenID discovery failed (userid=%(userid)s) for "
752 "%(userurl)s: %(message)s"
753 % {
754 "userid": request.user.id,
755 "userurl": user_url.encode("utf-8"),
756 "message": exc,
757 }
758 )
759 print(f"message: {msg}")
760 messages.error(request, msg)
762 if auth_request is None: # pragma: no cover
763 messages.error(request, _("OpenID discovery failed"))
764 return HttpResponseRedirect(reverse_lazy("profile"))
766 realm = request.build_absolute_uri("/")[:-1] # pragma: no cover
767 return_url = realm + reverse( # pragma: no cover
768 "confirm_openid", args=[kwargs["openid_id"]]
769 )
770 return HttpResponseRedirect( # pragma: no cover
771 auth_request.redirectURL(realm, return_url)
772 )
775@method_decorator(login_required, name="dispatch")
776class ConfirmOpenIDView(View): # pragma: no cover
777 """
778 Confirm OpenID view
779 """
781 model = UnconfirmedOpenId
782 model_confirmed = ConfirmedOpenId
784 def do_request(self, data, *args, **kwargs): # pylint: disable=unused-argument
785 """
786 Handle request, called by get() or post()
787 """
788 session = {"id": self.request.session.session_key}
789 current_url = self.request.build_absolute_uri("/")[:-1] + self.request.path
790 openid_consumer = consumer.Consumer(session, DjangoOpenIDStore())
791 info = openid_consumer.complete(data, current_url)
792 if info.status == consumer.FAILURE:
793 messages.error(
794 self.request, _('Confirmation failed: "') + str(info.message) + '"'
795 )
796 return HttpResponseRedirect(reverse_lazy("profile"))
798 if info.status == consumer.CANCEL:
799 messages.error(self.request, _("Cancelled by user"))
800 return HttpResponseRedirect(reverse_lazy("profile"))
802 if info.status != consumer.SUCCESS:
803 messages.error(self.request, _("Unknown verification error"))
804 return HttpResponseRedirect(reverse_lazy("profile"))
806 try:
807 unconfirmed = self.model.objects.get( # pylint: disable=no-member
808 user=self.request.user, id=kwargs["openid_id"]
809 )
810 except self.model.DoesNotExist: # pylint: disable=no-member
811 messages.error(self.request, _("ID does not exist"))
812 return HttpResponseRedirect(reverse_lazy("profile"))
814 # TODO: Check for a reasonable expiration time
815 confirmed = self.model_confirmed()
816 confirmed.user = unconfirmed.user
817 confirmed.ip_address = get_client_ip(self.request)[0]
818 confirmed.openid = unconfirmed.openid
819 confirmed.save()
821 unconfirmed.delete()
823 # If there is a single image in this user's profile
824 # assign it to the new id
825 if self.request.user.photo_set.count() == 1:
826 confirmed.set_photo(self.request.user.photo_set.first())
828 # Also allow user to login using this OpenID (if not already taken)
829 if not UserOpenID.objects.filter( # pylint: disable=no-member
830 claimed_id=confirmed.openid
831 ).exists():
832 user_openid = UserOpenID()
833 user_openid.user = self.request.user
834 user_openid.claimed_id = confirmed.openid
835 user_openid.display_id = confirmed.openid
836 user_openid.save()
837 return HttpResponseRedirect(reverse_lazy("profile"))
839 def get(self, request, *args, **kwargs):
840 """
841 Handle get - confirm openid
842 """
843 return self.do_request(request.GET, *args, **kwargs)
845 def post(self, request, *args, **kwargs):
846 """
847 Handle post - confirm openid
848 """
849 return self.do_request(request.POST, *args, **kwargs)
852@method_decorator(login_required, name="dispatch")
853class CropPhotoView(TemplateView):
854 """
855 View class for cropping photos
856 """
858 template_name = "crop_photo.html"
859 success_url = reverse_lazy("profile")
860 model = Photo
862 def get(self, request, *args, **kwargs):
863 photo = self.model.objects.get(
864 pk=kwargs["pk"], user=request.user
865 ) # pylint: disable=no-member
866 email = request.GET.get("email")
867 openid = request.GET.get("openid")
868 return render(
869 self.request,
870 self.template_name,
871 {
872 "photo": photo,
873 "email": email,
874 "openid": openid,
875 },
876 )
878 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
879 """
880 Handle post - crop photo
881 """
882 photo = self.model.objects.get(
883 pk=kwargs["pk"], user=request.user
884 ) # pylint: disable=no-member
885 dimensions = {
886 "x": int(float(request.POST["x"])),
887 "y": int(float(request.POST["y"])),
888 "w": int(float(request.POST["w"])),
889 "h": int(float(request.POST["h"])),
890 }
891 email = openid = None
892 if "email" in request.POST:
893 with contextlib.suppress(ConfirmedEmail.DoesNotExist):
894 email = ConfirmedEmail.objects.get(email=request.POST["email"])
895 if "openid" in request.POST:
896 with contextlib.suppress(ConfirmedOpenId.DoesNotExist):
897 openid = ConfirmedOpenId.objects.get( # pylint: disable=no-member
898 openid=request.POST["openid"]
899 )
900 return photo.perform_crop(request, dimensions, email, openid)
903@method_decorator(login_required, name="dispatch") # pylint: disable=too-many-ancestors
904class UserPreferenceView(FormView, UpdateView):
905 """
906 View class for user preferences view/update
907 """
909 template_name = "preferences.html"
910 model = UserPreference
911 form_class = UpdatePreferenceForm
912 success_url = reverse_lazy("user_preference")
914 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
915 """
916 Process POST-ed data from this form
917 """
918 userpref = None
919 try:
920 userpref = self.request.user.userpreference
921 except ObjectDoesNotExist:
922 userpref = UserPreference(user=self.request.user)
923 userpref.theme = request.POST["theme"]
924 userpref.save()
925 try:
926 if request.POST["email"] != self.request.user.email:
927 addresses = list(
928 self.request.user.confirmedemail_set.all().values_list(
929 "email", flat=True
930 )
931 )
932 if request.POST["email"] not in addresses:
933 messages.error(
934 self.request,
935 _(f'Mail address not allowed: {request.POST["email"]}'),
936 )
937 else:
938 self.request.user.email = request.POST["email"]
939 self.request.user.save()
940 messages.info(self.request, _("Mail address changed."))
941 except Exception as e: # pylint: disable=broad-except
942 messages.error(self.request, _(f"Error setting new mail address: {e}"))
944 try:
945 if request.POST["first_name"] or request.POST["last_name"]:
946 if request.POST["first_name"] != self.request.user.first_name:
947 self.request.user.first_name = request.POST["first_name"]
948 messages.info(self.request, _("First name changed."))
949 if request.POST["last_name"] != self.request.user.last_name:
950 self.request.user.last_name = request.POST["last_name"]
951 messages.info(self.request, _("Last name changed."))
952 self.request.user.save()
953 except Exception as e: # pylint: disable=broad-except
954 messages.error(self.request, _(f"Error setting names: {e}"))
956 return HttpResponseRedirect(reverse_lazy("user_preference"))
958 def get(self, request, *args, **kwargs):
959 return render(
960 self.request,
961 self.template_name,
962 {
963 "THEMES": UserPreference.THEMES,
964 },
965 )
967 def get_object(self, queryset=None):
968 (obj, created) = UserPreference.objects.get_or_create(
969 user=self.request.user
970 ) # pylint: disable=no-member,unused-variable
971 return obj
974@method_decorator(login_required, name="dispatch")
975class UploadLibravatarExportView(SuccessMessageMixin, FormView):
976 """
977 View class responsible for libravatar user data export upload
978 """
980 template_name = "upload_libravatar_export.html"
981 form_class = UploadLibravatarExportForm
982 success_message = _("Successfully uploaded")
983 success_url = reverse_lazy("profile")
984 model = User
986 def post(self, request, *args, **kwargs): # pylint: disable=unused-argument
987 """
988 Handle post request - choose items to import
989 """
990 if "save" in kwargs: # pylint: disable=too-many-nested-blocks
991 if kwargs["save"] == "save":
992 for arg in request.POST:
993 if arg.startswith("email_"):
994 email = request.POST[arg]
995 if not ConfirmedEmail.objects.filter(
996 email=email
997 ) and not UnconfirmedEmail.objects.filter(
998 email=email
999 ): # pylint: disable=no-member
1000 try:
1001 unconfirmed = UnconfirmedEmail.objects.create( # pylint: disable=no-member
1002 user=request.user, email=email
1003 )
1004 unconfirmed.save()
1005 unconfirmed.send_confirmation_mail(
1006 url=request.build_absolute_uri("/")[:-1]
1007 )
1008 messages.info(
1009 request,
1010 "%s: %s"
1011 % (
1012 email,
1013 _(
1014 "address added successfully,\
1015 confirmation mail sent"
1016 ),
1017 ),
1018 )
1019 except Exception as exc: # pylint: disable=broad-except
1020 # DEBUG
1021 print(
1022 f"Exception during adding mail address ({email}): {exc}"
1023 )
1025 if arg.startswith("photo"):
1026 try:
1027 data = base64.decodebytes(bytes(request.POST[arg], "utf-8"))
1028 except binascii.Error as exc:
1029 print(f"Cannot decode photo: {exc}")
1030 continue
1031 try:
1032 pilobj = Image.open(BytesIO(data))
1033 out = BytesIO()
1034 pilobj.save(out, pilobj.format, quality=JPEG_QUALITY)
1035 out.seek(0)
1036 photo = Photo()
1037 photo.user = request.user
1038 photo.ip_address = get_client_ip(request)[0]
1039 photo.format = file_format(pilobj.format)
1040 photo.data = out.read()
1041 photo.save()
1042 except Exception as exc: # pylint: disable=broad-except
1043 print(f"Exception during save: {exc}")
1044 continue
1046 return HttpResponseRedirect(reverse_lazy("profile"))
1047 return super().post(request, args, kwargs)
1049 def form_valid(self, form):
1050 data = self.request.FILES["export_file"]
1051 try:
1052 items = libravatar_read_gzdata(data.read())
1053 # DEBUG print(items)
1054 return render(
1055 self.request,
1056 "choose_libravatar_export.html",
1057 {
1058 "emails": items["emails"],
1059 "photos": items["photos"],
1060 },
1061 )
1062 except Exception as e:
1063 messages.error(self.request, _(f"Unable to parse file: {e}"))
1064 return HttpResponseRedirect(reverse_lazy("upload_export"))
1067@method_decorator(login_required, name="dispatch")
1068class ResendConfirmationMailView(View):
1069 """
1070 View class for resending confirmation mail
1071 """
1073 model = UnconfirmedEmail
1075 def get(self, request, *args, **kwargs): # pylint: disable=unused-argument
1076 """
1077 Handle post - resend confirmation mail for unconfirmed e-mail address
1078 """
1079 try:
1080 email = self.model.objects.get( # pylint: disable=no-member
1081 user=request.user, id=kwargs["email_id"]
1082 )
1083 except self.model.DoesNotExist: # pragma: no cover pylint: disable=no-member
1084 messages.error(request, _("ID does not exist"))
1085 else:
1086 try:
1087 email.send_confirmation_mail(url=request.build_absolute_uri("/")[:-1])
1088 messages.success(
1089 request, f'{_("Confirmation mail sent to")}: {email.email}'
1090 )
1091 except Exception as exc: # pylint: disable=broad-except
1092 messages.error(
1093 request,
1094 f'{_("Unable to send confirmation email for")} {email.email}: {exc}',
1095 )
1096 return HttpResponseRedirect(reverse_lazy("profile"))
1099class IvatarLoginView(LoginView):
1100 """
1101 View class for login
1102 """
1104 template_name = "login.html"
1106 def get(self, request, *args, **kwargs):
1107 """
1108 Handle get for login view
1109 """
1110 if request.user:
1111 if request.user.is_authenticated:
1112 return HttpResponseRedirect(reverse_lazy("profile"))
1113 return super().get(self, request, args, kwargs)
1115 def get_context_data(self, **kwargs):
1116 context = super().get_context_data(**kwargs)
1117 context["with_fedora"] = SOCIAL_AUTH_FEDORA_KEY is not None
1118 return context
1121@method_decorator(login_required, name="dispatch")
1122class ProfileView(TemplateView):
1123 """
1124 View class for profile
1125 """
1127 template_name = "profile.html"
1129 def get(self, request, *args, **kwargs):
1130 if "profile_username" in kwargs:
1131 if not request.user.is_staff:
1132 return HttpResponseRedirect(reverse_lazy("profile"))
1133 with contextlib.suppress(Exception):
1134 u = User.objects.get(username=kwargs["profile_username"])
1135 request.user = u
1136 self._confirm_claimed_openid()
1137 return super().get(self, request, args, kwargs)
1139 def get_context_data(self, **kwargs):
1140 """
1141 Provide additional context data, like if max_photos is reached
1142 already or not.
1143 """
1144 context = super().get_context_data(**kwargs)
1145 context["max_photos"] = False
1146 if self.request.user:
1147 if self.request.user.photo_set.all().count() >= MAX_NUM_PHOTOS:
1148 context["max_photos"] = True
1149 return context
1151 def _confirm_claimed_openid(self):
1152 openids = self.request.user.useropenid_set.all()
1153 # If there is only one OpenID, we eventually need to add it to
1154 # the user account
1155 if openids.count() == 1:
1156 # Already confirmed, skip
1157 if ConfirmedOpenId.objects.filter( # pylint: disable=no-member
1158 openid=openids.first().claimed_id
1159 ).exists():
1160 return
1161 # For whatever reason, this is in unconfirmed state, skip
1162 if UnconfirmedOpenId.objects.filter( # pylint: disable=no-member
1163 openid=openids.first().claimed_id
1164 ).exists():
1165 return
1166 print(f"need to confirm: {openids.first()}")
1167 confirmed = ConfirmedOpenId()
1168 confirmed.user = self.request.user
1169 confirmed.ip_address = get_client_ip(self.request)[0]
1170 confirmed.openid = openids.first().claimed_id
1171 confirmed.save()
1174class PasswordResetView(PasswordResetViewOriginal):
1175 """
1176 View class for password reset
1177 """
1179 def post(self, request, *args, **kwargs):
1180 """
1181 Since we have the mail addresses in ConfirmedEmail model,
1182 we need to set the email on the user object in order for the
1183 PasswordResetView class to pick up the correct user.
1184 In case we have the mail address in the User object, we still
1185 need to assign a random password in order for PasswordResetView
1186 class to pick up the user - else it will silently do nothing.
1187 """
1188 if "email" in request.POST:
1189 user = None
1191 # Try to find the user via the normal user class
1192 # TODO: How to handle the case that multiple user accounts
1193 # could have the same password set?
1194 user = User.objects.filter(email=request.POST["email"]).first()
1196 # If we didn't find the user in the previous step,
1197 # try the ConfirmedEmail class instead.
1198 # If we find the user there, we need to set the mail
1199 # attribute on the user object accordingly
1200 if not user:
1201 with contextlib.suppress(ObjectDoesNotExist):
1202 confirmed_email = ConfirmedEmail.objects.get(
1203 email=request.POST["email"]
1204 )
1205 user = confirmed_email.user
1206 user.email = confirmed_email.email
1207 user.save()
1208 # If we found the user, set a random password. Else, the
1209 # ResetPasswordView class will silently ignore the password
1210 # reset request
1211 if user:
1212 if not user.password or user.password.startswith("!"):
1213 random_pass = User.objects.make_random_password()
1214 user.set_password(random_pass)
1215 user.save()
1217 # Whatever happens above, let the original function handle the rest
1218 return super().post(self, request, args, kwargs)
1221@method_decorator(login_required, name="dispatch")
1222class DeleteAccountView(SuccessMessageMixin, FormView):
1223 """
1224 View class for account deletion
1225 """
1227 template_name = "delete.html"
1228 form_class = DeleteAccountForm
1229 success_url = reverse_lazy("home")
1231 def get(self, request, *args, **kwargs):
1232 return super().get(self, request, args, kwargs)
1234 def post(self, request, *args, **kwargs):
1235 """
1236 Handle account deletion
1237 """
1238 if request.user.password:
1239 if "password" in request.POST:
1240 if not request.user.check_password(request.POST["password"]):
1241 messages.error(request, _("Incorrect password"))
1242 return HttpResponseRedirect(reverse_lazy("delete"))
1243 else:
1244 messages.error(request, _("No password given"))
1245 return HttpResponseRedirect(reverse_lazy("delete"))
1247 # should delete all confirmed/unconfirmed/photo objects
1248 request.user.delete()
1249 return super().post(self, request, args, kwargs)
1252@method_decorator(login_required, name="dispatch")
1253class ExportView(SuccessMessageMixin, TemplateView):
1254 """
1255 View class responsible for libravatar user data export
1256 """
1258 template_name = "export.html"
1259 model = User
1261 def get(self, request, *args, **kwargs):
1262 return super().get(self, request, args, kwargs)
1264 def post(self, request, *args, **kwargs):
1265 """
1266 Handle real export
1267 """
1268 SCHEMA_ROOT = "https://www.libravatar.org/schemas/export/0.2"
1269 SCHEMA_XSD = f"{SCHEMA_ROOT}/export.xsd"
1271 def xml_header():
1272 return (
1273 """<?xml version="1.0" encoding="UTF-8"?>"""
1274 '''<user xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"'''
1275 ''' xsi:schemaLocation="%s %s"'''
1276 """ xmlns="%s">\n""" % (SCHEMA_ROOT, SCHEMA_XSD, SCHEMA_ROOT)
1277 )
1279 def xml_footer():
1280 return "</user>\n"
1282 def xml_account(user):
1283 escaped_username = saxutils.quoteattr(user.username)
1284 escaped_password = saxutils.quoteattr(user.password)
1285 return " <account username=%s password=%s/>\n" % (
1286 escaped_username,
1287 escaped_password,
1288 )
1290 def xml_email(user):
1291 returnstring = " <emails>\n"
1292 for email in user.confirmedemail_set.all():
1293 returnstring += (
1294 ' <email photo_id="'
1295 + str(email.photo_id)
1296 + '">'
1297 + str(email.email)
1298 + "</email>"
1299 + "\n"
1300 )
1301 returnstring += " </emails>\n"
1302 return returnstring
1304 def xml_openid(user):
1305 returnstring = " <openids>\n"
1306 for openid in user.confirmedopenid_set.all():
1307 returnstring += (
1308 ' <openid photo_id="'
1309 + str(openid.photo_id)
1310 + '">'
1311 + str(openid.openid)
1312 + "</openid>"
1313 + "\n"
1314 )
1315 returnstring += " </openids>\n"
1316 return returnstring
1318 def xml_photos(user):
1319 s = " <photos>\n"
1320 for photo in user.photo_set.all():
1321 encoded_photo = base64.b64encode(photo.data)
1322 if encoded_photo:
1323 s += (
1324 """ <photo id="%s" encoding="base64" format=%s>"""
1325 """%s"""
1326 """</photo>\n"""
1327 % (photo.id, saxutils.quoteattr(photo.format), encoded_photo)
1328 )
1329 s += " </photos>\n"
1330 return s
1332 user = request.user
1334 photos = []
1335 for photo in user.photo_set.all():
1336 photo_details = {"data": photo.data, "format": photo.format}
1337 photos.append(photo_details)
1339 bytesobj = BytesIO()
1340 data = gzip.GzipFile(fileobj=bytesobj, mode="w")
1341 data.write(bytes(xml_header(), "utf-8"))
1342 data.write(bytes(xml_account(user), "utf-8"))
1343 data.write(bytes(xml_email(user), "utf-8"))
1344 data.write(bytes(xml_openid(user), "utf-8"))
1345 data.write(bytes(xml_photos(user), "utf-8"))
1346 data.write(bytes(xml_footer(), "utf-8"))
1347 data.close()
1348 bytesobj.seek(0)
1350 response = HttpResponse(content_type="application/gzip")
1351 response[
1352 "Content-Disposition"
1353 ] = f'attachment; filename="libravatar-export_{user.username}.xml.gz"'
1354 response.write(bytesobj.read())
1355 return response