Coverage for ivatar/views.py: 58%
272 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-26 00:11 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-26 00:11 +0000
1# -*- coding: utf-8 -*-
2"""
3views under /
4"""
5from io import BytesIO
6from os import path
7import hashlib
8from urllib.request import urlopen
9from urllib.error import HTTPError, URLError
10from ssl import SSLError
11from django.views.generic.base import TemplateView, View
12from django.http import HttpResponse, HttpResponseRedirect
13from django.http import HttpResponseNotFound, JsonResponse
14from django.core.exceptions import ObjectDoesNotExist
15from django.core.cache import cache, caches
16from django.utils.translation import gettext_lazy as _
17from django.urls import reverse_lazy
18from django.db.models import Q
19from django.contrib.auth.models import User
21from PIL import Image
23from monsterid.id import build_monster as BuildMonster
24import Identicon
25from pydenticon5 import Pydenticon5
26import pagan
27from robohash import Robohash
29from ivatar.settings import AVATAR_MAX_SIZE, JPEG_QUALITY, DEFAULT_AVATAR_SIZE
30from ivatar.settings import CACHE_RESPONSE
31from ivatar.settings import CACHE_IMAGES_MAX_AGE
32from ivatar.settings import TRUSTED_DEFAULT_URLS
33from .ivataraccount.models import ConfirmedEmail, ConfirmedOpenId
34from .ivataraccount.models import UnconfirmedEmail, UnconfirmedOpenId
35from .ivataraccount.models import Photo
36from .ivataraccount.models import pil_format, file_format
37from .utils import is_trusted_url, mm_ng, resize_animated_gif
39URL_TIMEOUT = 5 # in seconds
42def get_size(request, size=DEFAULT_AVATAR_SIZE):
43 """
44 Get size from the URL arguments
45 """
46 sizetemp = None
47 if "s" in request.GET:
48 sizetemp = request.GET["s"]
49 if "size" in request.GET:
50 sizetemp = request.GET["size"]
51 if sizetemp:
52 if sizetemp != "" and sizetemp is not None and sizetemp != "0":
53 try:
54 if int(sizetemp) > 0:
55 size = int(sizetemp)
56 # Should we receive something we cannot convert to int, leave
57 # the user with the default value of 80
58 except ValueError:
59 pass
61 if size > int(AVATAR_MAX_SIZE):
62 size = int(AVATAR_MAX_SIZE)
63 return size
66class CachingHttpResponse(HttpResponse):
67 """
68 Handle caching of response
69 """
71 def __init__(
72 self,
73 uri,
74 content=b"",
75 content_type=None,
76 status=200, # pylint: disable=too-many-arguments
77 reason=None,
78 charset=None,
79 ):
80 if CACHE_RESPONSE:
81 caches["filesystem"].set(
82 uri,
83 {
84 "content": content,
85 "content_type": content_type,
86 "status": status,
87 "reason": reason,
88 "charset": charset,
89 },
90 )
91 super().__init__(content, content_type, status, reason, charset)
94class AvatarImageView(TemplateView):
95 """
96 View to return (binary) image, based on OpenID/Email (both by digest)
97 """
99 # TODO: Do cache resize images!! Memcached?
101 def options(self, request, *args, **kwargs):
102 response = HttpResponse("", content_type="text/plain")
103 response["Allow"] = "404 mm mp retro pagan wavatar monsterid robohash identicon"
104 return response
106 def get(
107 self, request, *args, **kwargs
108 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,too-many-return-statements
109 """
110 Override get from parent class
111 """
112 model = ConfirmedEmail
113 size = get_size(request)
114 imgformat = "png"
115 obj = None
116 default = None
117 forcedefault = False
118 gravatarredirect = False
119 gravatarproxy = True
120 uri = request.build_absolute_uri()
122 # Check the cache first
123 if CACHE_RESPONSE:
124 centry = caches["filesystem"].get(uri)
125 if centry:
126 # For DEBUG purpose only print('Cached entry for %s' % uri)
127 return HttpResponse(
128 centry["content"],
129 content_type=centry["content_type"],
130 status=centry["status"],
131 reason=centry["reason"],
132 charset=centry["charset"],
133 )
135 # In case no digest at all is provided, return to home page
136 if "digest" not in kwargs:
137 return HttpResponseRedirect(reverse_lazy("home"))
139 if "d" in request.GET:
140 default = request.GET["d"]
141 if "default" in request.GET:
142 default = request.GET["default"]
144 if default is not None:
145 if TRUSTED_DEFAULT_URLS is None:
146 print("Query parameter `default` is disabled.")
147 default = None
148 elif default.find("://") > 0:
149 # Check if it's trusted, if not, reset to None
150 trusted_url = is_trusted_url(default, TRUSTED_DEFAULT_URLS)
152 if not trusted_url:
153 print(
154 "Default URL is not in trusted URLs: '%s' ; Kicking it!"
155 % default
156 )
157 default = None
159 if "f" in request.GET:
160 if request.GET["f"] == "y":
161 forcedefault = True
162 if "forcedefault" in request.GET:
163 if request.GET["forcedefault"] == "y":
164 forcedefault = True
166 if "gravatarredirect" in request.GET:
167 if request.GET["gravatarredirect"] == "y":
168 gravatarredirect = True
170 if "gravatarproxy" in request.GET:
171 if request.GET["gravatarproxy"] == "n":
172 gravatarproxy = False
174 try:
175 obj = model.objects.get(digest=kwargs["digest"])
176 except ObjectDoesNotExist:
177 try:
178 obj = model.objects.get(digest_sha256=kwargs["digest"])
179 except ObjectDoesNotExist:
180 model = ConfirmedOpenId
181 try:
182 d = kwargs["digest"] # pylint: disable=invalid-name
183 # OpenID is tricky. http vs. https, versus trailing slash or not
184 # However, some users eventually have added their variations already
185 # and therfore we need to use filter() and first()
186 obj = model.objects.filter(
187 Q(digest=d)
188 | Q(alt_digest1=d)
189 | Q(alt_digest2=d)
190 | Q(alt_digest3=d)
191 ).first()
192 except Exception: # pylint: disable=bare-except
193 pass
195 # If that mail/openid doesn't exist, or has no photo linked to it
196 if not obj or not obj.photo or forcedefault:
197 gravatar_url = (
198 "https://secure.gravatar.com/avatar/"
199 + kwargs["digest"]
200 + "?s=%i" % size
201 )
203 # If we have redirection to Gravatar enabled, this overrides all
204 # default= settings, except forcedefault!
205 if gravatarredirect and not forcedefault:
206 return HttpResponseRedirect(gravatar_url)
208 # Request to proxy Gravatar image - only if not forcedefault
209 if gravatarproxy and not forcedefault:
210 url = (
211 reverse_lazy("gravatarproxy", args=[kwargs["digest"]])
212 + "?s=%i" % size
213 )
214 # Ensure we do not convert None to string 'None'
215 if default:
216 url += "&default=%s" % default
217 return HttpResponseRedirect(url)
219 # Return the default URL, as specified, or 404 Not Found, if default=404
220 if default:
221 # Proxy to gravatar to generate wavatar - lazy me
222 if str(default) == "wavatar":
223 url = (
224 reverse_lazy("gravatarproxy", args=[kwargs["digest"]])
225 + "?s=%i" % size
226 + "&default=%s&f=y" % default
227 )
228 return HttpResponseRedirect(url)
230 if str(default) == str(404):
231 return HttpResponseNotFound(_("<h1>Image not found</h1>"))
233 if str(default) == "monsterid":
234 monsterdata = BuildMonster(seed=kwargs["digest"], size=(size, size))
235 data = BytesIO()
236 monsterdata.save(data, "PNG", quality=JPEG_QUALITY)
237 data.seek(0)
238 response = CachingHttpResponse(uri, data, content_type="image/png")
239 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
240 return response
242 if str(default) == "robohash":
243 roboset = "any"
244 if request.GET.get("robohash"):
245 roboset = request.GET.get("robohash")
246 robohash = Robohash(kwargs["digest"])
247 robohash.assemble(roboset=roboset, sizex=size, sizey=size)
248 data = BytesIO()
249 robohash.img.save(data, format="png")
250 data.seek(0)
251 response = CachingHttpResponse(uri, data, content_type="image/png")
252 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
253 return response
255 if str(default) == "retro":
256 identicon = Identicon.render(kwargs["digest"])
257 data = BytesIO()
258 img = Image.open(BytesIO(identicon))
259 img = img.resize((size, size), Image.LANCZOS)
260 img.save(data, "PNG", quality=JPEG_QUALITY)
261 data.seek(0)
262 response = CachingHttpResponse(uri, data, content_type="image/png")
263 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
264 return response
266 if str(default) == "pagan":
267 paganobj = pagan.Avatar(kwargs["digest"])
268 data = BytesIO()
269 img = paganobj.img.resize((size, size), Image.LANCZOS)
270 img.save(data, "PNG", quality=JPEG_QUALITY)
271 data.seek(0)
272 response = CachingHttpResponse(uri, data, content_type="image/png")
273 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
274 return response
276 if str(default) == "identicon":
277 p = Pydenticon5() # pylint: disable=invalid-name
278 # In order to make use of the whole 32 bytes digest, we need to redigest them.
279 newdigest = hashlib.md5(
280 bytes(kwargs["digest"], "utf-8")
281 ).hexdigest()
282 img = p.draw(newdigest, size, 0)
283 data = BytesIO()
284 img.save(data, "PNG", quality=JPEG_QUALITY)
285 data.seek(0)
286 response = CachingHttpResponse(uri, data, content_type="image/png")
287 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
288 return response
290 if str(default) == "mmng":
291 mmngimg = mm_ng(idhash=kwargs["digest"], size=size)
292 data = BytesIO()
293 mmngimg.save(data, "PNG", quality=JPEG_QUALITY)
294 data.seek(0)
295 response = CachingHttpResponse(uri, data, content_type="image/png")
296 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
297 return response
299 if str(default) == "mm" or str(default) == "mp":
300 # If mm is explicitly given, we need to catch that
301 static_img = path.join(
302 "static", "img", "mm", "%s%s" % (str(size), ".png")
303 )
304 if not path.isfile(static_img):
305 # We trust this exists!!!
306 static_img = path.join("static", "img", "mm", "512.png")
307 # We trust static/ is mapped to /static/
308 return HttpResponseRedirect("/" + static_img)
309 return HttpResponseRedirect(default)
311 static_img = path.join(
312 "static", "img", "nobody", "%s%s" % (str(size), ".png")
313 )
314 if not path.isfile(static_img):
315 # We trust this exists!!!
316 static_img = path.join("static", "img", "nobody", "512.png")
317 # We trust static/ is mapped to /static/
318 return HttpResponseRedirect("/" + static_img)
320 imgformat = obj.photo.format
321 photodata = Image.open(BytesIO(obj.photo.data))
323 data = BytesIO()
325 # Animated GIFs need additional handling
326 if imgformat == "gif" and photodata.is_animated:
327 # Debug only
328 # print("Object is animated and has %i frames" % photodata.n_frames)
329 data = resize_animated_gif(photodata, (size, size))
330 else:
331 # If the image is smaller than what was requested, we need
332 # to use the function resize
333 if photodata.size[0] < size or photodata.size[1] < size:
334 photodata = photodata.resize((size, size), Image.LANCZOS)
335 else:
336 photodata.thumbnail((size, size), Image.LANCZOS)
337 photodata.save(data, pil_format(imgformat), quality=JPEG_QUALITY)
339 data.seek(0)
340 obj.photo.access_count += 1
341 obj.photo.save()
342 obj.access_count += 1
343 obj.save()
344 if imgformat == "jpg":
345 imgformat = "jpeg"
346 response = CachingHttpResponse(uri, data, content_type="image/%s" % imgformat)
347 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
348 return response
351class GravatarProxyView(View):
352 """
353 Proxy request to Gravatar and return the image from there
354 """
356 # TODO: Do cache images!! Memcached?
358 def get(
359 self, request, *args, **kwargs
360 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
361 """
362 Override get from parent class
363 """
365 def redir_default(default=None):
366 url = (
367 reverse_lazy("avatar_view", args=[kwargs["digest"]])
368 + "?s=%i" % size
369 + "&forcedefault=y"
370 )
371 if default is not None:
372 url += "&default=%s" % default
373 return HttpResponseRedirect(url)
375 size = get_size(request)
376 gravatarimagedata = None
377 default = None
379 try:
380 if str(request.GET["default"]) != "None":
381 default = request.GET["default"]
382 except Exception: # pylint: disable=bare-except
383 pass
385 if str(default) != "wavatar":
386 # This part is special/hackish
387 # Check if the image returned by Gravatar is their default image, if so,
388 # redirect to our default instead.
389 gravatar_test_url = (
390 "https://secure.gravatar.com/avatar/"
391 + kwargs["digest"]
392 + "?s=%i&d=%i" % (50, 404)
393 )
394 if cache.get(gravatar_test_url) == "default":
395 # DEBUG only
396 # print("Cached Gravatar response: Default.")
397 return redir_default(default)
398 try:
399 urlopen(gravatar_test_url, timeout=URL_TIMEOUT)
400 except HTTPError as exc:
401 if exc.code == 404:
402 cache.set(gravatar_test_url, "default", 60)
403 else:
404 print("Gravatar test url fetch failed: %s" % exc)
405 return redir_default(default)
407 gravatar_url = (
408 "https://secure.gravatar.com/avatar/" + kwargs["digest"] + "?s=%i" % size
409 )
410 if default:
411 gravatar_url += "&d=%s" % default
413 try:
414 if cache.get(gravatar_url) == "err":
415 print("Cached Gravatar fetch failed with URL error: %s" % gravatar_url)
416 return redir_default(default)
418 gravatarimagedata = urlopen(gravatar_url, timeout=URL_TIMEOUT)
419 except HTTPError as exc:
420 if exc.code != 404 and exc.code != 503:
421 print(
422 "Gravatar fetch failed with an unexpected %s HTTP error: %s"
423 % (exc.code, gravatar_url)
424 )
425 cache.set(gravatar_url, "err", 30)
426 return redir_default(default)
427 except URLError as exc:
428 print("Gravatar fetch failed with URL error: %s" % exc.reason)
429 cache.set(gravatar_url, "err", 30)
430 return redir_default(default)
431 except SSLError as exc:
432 print("Gravatar fetch failed with SSL error: %s" % exc.reason)
433 cache.set(gravatar_url, "err", 30)
434 return redir_default(default)
435 try:
436 data = BytesIO(gravatarimagedata.read())
437 img = Image.open(data)
438 data.seek(0)
439 response = HttpResponse(
440 data.read(), content_type="image/%s" % file_format(img.format)
441 )
442 response["Cache-Control"] = "max-age=%i" % CACHE_IMAGES_MAX_AGE
443 return response
445 except ValueError as exc:
446 print("Value error: %s" % exc)
447 return redir_default(default)
449 # We shouldn't reach this point... But make sure we do something
450 return redir_default(default)
453class StatsView(TemplateView, JsonResponse):
454 """
455 Return stats
456 """
458 def get(
459 self, request, *args, **kwargs
460 ): # pylint: disable=too-many-branches,too-many-statements,too-many-locals,no-self-use,unused-argument,too-many-return-statements
461 retval = {
462 "users": User.objects.count(),
463 "mails": ConfirmedEmail.objects.count(),
464 "openids": ConfirmedOpenId.objects.count(), # pylint: disable=no-member
465 "unconfirmed_mails": UnconfirmedEmail.objects.count(), # pylint: disable=no-member
466 "unconfirmed_openids": UnconfirmedOpenId.objects.count(), # pylint: disable=no-member
467 "avatars": Photo.objects.count(), # pylint: disable=no-member
468 }
470 return JsonResponse(retval)