Coverage for ivatar/tools/views.py: 66%
156 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-12 23:12 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-12 23:12 +0000
1# -*- coding: utf-8 -*-
2"""
3View classes for ivatar/tools/
4"""
5from socket import inet_ntop, AF_INET6
6import hashlib
7import random
9from django.views.generic.edit import FormView
10from django.urls import reverse_lazy as reverse
11from django.shortcuts import render
13import DNS
15from libravatar import libravatar_url, parse_user_identity
16from libravatar import SECURE_BASE_URL as LIBRAVATAR_SECURE_BASE_URL
17from libravatar import BASE_URL as LIBRAVATAR_BASE_URL
19from ivatar.settings import SECURE_BASE_URL, BASE_URL, SITE_NAME, DEBUG
20from .forms import (
21 CheckDomainForm,
22 CheckForm,
23) # pylint: disable=relative-beyond-top-level
26class CheckDomainView(FormView):
27 """
28 View class for checking a domain
29 """
31 template_name = "check_domain.html"
32 form_class = CheckDomainForm
33 success_url = reverse("tools_check_domain")
35 def form_valid(self, form):
36 super().form_valid(form)
37 domain = form.cleaned_data["domain"]
38 result = {"avatar_server_http": lookup_avatar_server(domain, False)}
39 if result["avatar_server_http"]:
40 result["avatar_server_http_ipv4"] = lookup_ip_address(
41 result["avatar_server_http"], False
42 )
43 result["avatar_server_http_ipv6"] = lookup_ip_address(
44 result["avatar_server_http"], True
45 )
46 result["avatar_server_https"] = lookup_avatar_server(domain, True)
47 if result["avatar_server_https"]:
48 result["avatar_server_https_ipv4"] = lookup_ip_address(
49 result["avatar_server_https"], False
50 )
51 result["avatar_server_https_ipv6"] = lookup_ip_address(
52 result["avatar_server_https"], True
53 )
54 return render(
55 self.request,
56 self.template_name,
57 {
58 "form": form,
59 "result": result,
60 },
61 )
64class CheckView(FormView):
65 """
66 View class for checking an e-mail or openid address
67 """
69 template_name = "check.html"
70 form_class = CheckForm
71 success_url = reverse("tools_check")
73 def form_valid(self, form):
74 mailurl = None
75 openidurl = None
76 mailurl_secure = None
77 mailurl_secure_256 = None
78 openidurl_secure = None
79 mail_hash = None
80 mail_hash256 = None
81 openid_hash = None
82 super().form_valid(form)
84 if form.cleaned_data["default_url"]:
85 default_url = form.cleaned_data["default_url"]
86 elif (
87 form.cleaned_data["default_opt"]
88 and form.cleaned_data["default_opt"] != "none"
89 ):
90 default_url = form.cleaned_data["default_opt"]
91 else:
92 default_url = None
94 size = form.cleaned_data["size"] if "size" in form.cleaned_data else 80
95 if form.cleaned_data["mail"]:
96 mailurl = libravatar_url(
97 email=form.cleaned_data["mail"], size=size, default=default_url
98 )
99 mailurl = mailurl.replace(LIBRAVATAR_BASE_URL, BASE_URL)
100 mailurl_secure = libravatar_url(
101 email=form.cleaned_data["mail"],
102 size=size,
103 https=True,
104 default=default_url,
105 )
106 mailurl_secure = mailurl_secure.replace(
107 LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL
108 )
109 mail_hash = parse_user_identity(
110 email=form.cleaned_data["mail"], openid=None
111 )[0]
112 hash_obj = hashlib.new("sha256")
113 hash_obj.update(form.cleaned_data["mail"].encode("utf-8"))
114 mail_hash256 = hash_obj.hexdigest()
115 mailurl_secure_256 = mailurl_secure.replace(mail_hash, mail_hash256)
116 if form.cleaned_data["openid"]:
117 if not form.cleaned_data["openid"].startswith(
118 "http://"
119 ) and not form.cleaned_data["openid"].startswith("https://"):
120 form.cleaned_data["openid"] = f'http://{form.cleaned_data["openid"]}'
121 openidurl = libravatar_url(
122 openid=form.cleaned_data["openid"], size=size, default=default_url
123 )
124 openidurl = openidurl.replace(LIBRAVATAR_BASE_URL, BASE_URL)
125 openidurl_secure = libravatar_url(
126 openid=form.cleaned_data["openid"],
127 size=size,
128 https=True,
129 default=default_url,
130 )
131 openidurl_secure = openidurl_secure.replace(
132 LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL
133 )
134 openid_hash = parse_user_identity(
135 openid=form.cleaned_data["openid"], email=None
136 )[0]
138 if "DEVELOPMENT" in SITE_NAME and DEBUG:
139 if mailurl:
140 mailurl = mailurl.replace(
141 "https://avatars.linux-kernel.at",
142 f"http://{self.request.get_host()}",
143 )
144 if mailurl_secure:
145 mailurl_secure = mailurl_secure.replace(
146 "https://avatars.linux-kernel.at",
147 f"http://{self.request.get_host()}",
148 )
149 if mailurl_secure_256:
150 mailurl_secure_256 = mailurl_secure_256.replace(
151 "https://avatars.linux-kernel.at",
152 f"http://{self.request.get_host()}",
153 )
155 if openidurl:
156 openidurl = openidurl.replace(
157 "https://avatars.linux-kernel.at",
158 f"http://{self.request.get_host()}",
159 )
160 if openidurl_secure:
161 openidurl_secure = openidurl_secure.replace(
162 "https://avatars.linux-kernel.at",
163 f"http://{self.request.get_host()}",
164 )
165 print(mailurl, openidurl, mailurl_secure, mailurl_secure_256, openidurl_secure)
167 return render(
168 self.request,
169 self.template_name,
170 {
171 "form": form,
172 "mailurl": mailurl,
173 "openidurl": openidurl,
174 "mailurl_secure": mailurl_secure,
175 "mailurl_secure_256": mailurl_secure_256,
176 "openidurl_secure": openidurl_secure,
177 "mail_hash": mail_hash,
178 "mail_hash256": mail_hash256,
179 "openid_hash": openid_hash,
180 "size": size,
181 },
182 )
185def lookup_avatar_server(domain, https):
186 """
187 Extract the avatar server from an SRV record in the DNS zone
189 The SRV records should look like this:
191 _avatars._tcp.example.com. IN SRV 0 0 80 avatars.example.com
192 _avatars-sec._tcp.example.com. IN SRV 0 0 443 avatars.example.com
193 """
195 if domain and len(domain) > 60:
196 domain = domain[:60]
198 service_name = None
199 if https:
200 service_name = f"_avatars-sec._tcp.{domain}"
201 else:
202 service_name = f"_avatars._tcp.{domain}"
204 DNS.DiscoverNameServers()
205 try:
206 dns_request = DNS.Request(name=service_name, qtype="SRV").req()
207 except DNS.DNSError as message:
208 print(f"DNS Error: {message} ({domain})")
209 return None
211 if dns_request.header["status"] == "NXDOMAIN":
212 # Not an error, but no point in going any further
213 return None
215 if dns_request.header["status"] != "NOERROR":
216 print(f'DNS Error: status={dns_request.header["status"]} ({domain})')
217 return None
219 records = []
220 for answer in dns_request.answers:
221 if (
222 ("data" not in answer)
223 or (not answer["data"])
224 or (not answer["typename"])
225 or (answer["typename"] != "SRV")
226 ):
227 continue
229 record = {
230 "priority": int(answer["data"][0]),
231 "weight": int(answer["data"][1]),
232 "port": int(answer["data"][2]),
233 "target": answer["data"][3],
234 }
236 records.append(record)
238 target, port = srv_hostname(records)
240 if target and ((https and port != 443) or (not https and port != 80)):
241 return f"{target}:{port}"
243 return target
246def srv_hostname(records):
247 """
248 Return the right (target, port) pair from a list of SRV records.
249 """
251 if len(records) < 1:
252 return (None, None)
254 if len(records) == 1:
255 ret = records[0]
256 return (ret["target"], ret["port"])
258 # Keep only the servers in the top priority
259 priority_records = []
260 total_weight = 0
261 top_priority = records[0]["priority"] # highest priority = lowest number
263 for ret in records:
264 if ret["priority"] > top_priority:
265 # ignore the record (ret has lower priority)
266 continue
268 # Take care - this if is only a if, if the above if
269 # uses continue at the end. else it should be an elsif
270 if ret["priority"] < top_priority:
271 # reset the priority (ret has higher priority)
272 top_priority = ret["priority"]
273 total_weight = 0
274 priority_records = []
276 total_weight += ret["weight"]
278 if ret["weight"] > 0:
279 priority_records.append((total_weight, ret))
280 else:
281 # zero-weight elements must come first
282 priority_records.insert(0, (0, ret))
284 if len(priority_records) == 1:
285 unused, ret = priority_records[0] # pylint: disable=unused-variable
286 return (ret["target"], ret["port"])
288 # Select first record according to RFC2782 weight ordering algorithm (page 3)
289 random_number = random.randint(0, total_weight)
291 for record in priority_records:
292 weighted_index, ret = record
294 if weighted_index >= random_number:
295 return (ret["target"], ret["port"])
297 print("There is something wrong with our SRV weight ordering algorithm")
298 return (None, None)
301def lookup_ip_address(hostname, ipv6):
302 """
303 Try to get IPv4 or IPv6 addresses for the given hostname
304 """
306 DNS.DiscoverNameServers()
307 try:
308 if ipv6:
309 dns_request = DNS.Request(name=hostname, qtype=DNS.Type.AAAA).req()
310 else:
311 dns_request = DNS.Request(name=hostname, qtype=DNS.Type.A).req()
312 except DNS.DNSError as message:
313 print(f"DNS Error: {message} ({hostname})")
314 return None
316 if dns_request.header["status"] != "NOERROR":
317 print(f'DNS Error: status={dns_request.header["status"]} ({hostname})')
318 return None
320 for answer in dns_request.answers:
321 if ("data" not in answer) or (not answer["data"]):
322 continue
323 if (ipv6 and answer["typename"] != "AAAA") or (
324 not ipv6 and answer["typename"] != "A"
325 ):
326 continue # skip CNAME records
328 return inet_ntop(AF_INET6, answer["data"]) if ipv6 else answer["data"]
329 return None