Coverage for ivatar/tools/views.py: 70%
149 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-26 00:11 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-26 00:11 +0000
1# -*- coding: utf-8 -*-
2"""
3View classes for ivatar/tools/
4"""
5from socket import inet_ntop, AF_INET6
6import hashlib
7import random
9from django.views.generic.edit import FormView
10from django.urls import reverse_lazy as reverse
11from django.shortcuts import render
13import DNS
15from libravatar import libravatar_url, parse_user_identity
16from libravatar import SECURE_BASE_URL as LIBRAVATAR_SECURE_BASE_URL
17from libravatar import BASE_URL as LIBRAVATAR_BASE_URL
19from ivatar.settings import SECURE_BASE_URL, BASE_URL
20from .forms import (
21 CheckDomainForm,
22 CheckForm,
23) # pylint: disable=relative-beyond-top-level
26class CheckDomainView(FormView):
27 """
28 View class for checking a domain
29 """
31 template_name = "check_domain.html"
32 form_class = CheckDomainForm
33 success_url = reverse("tools_check_domain")
35 def form_valid(self, form):
36 result = {}
37 super().form_valid(form)
38 domain = form.cleaned_data["domain"]
39 result["avatar_server_http"] = lookup_avatar_server(domain, False)
40 if result["avatar_server_http"]:
41 result["avatar_server_http_ipv4"] = lookup_ip_address(
42 result["avatar_server_http"], False
43 )
44 result["avatar_server_http_ipv6"] = lookup_ip_address(
45 result["avatar_server_http"], True
46 )
47 result["avatar_server_https"] = lookup_avatar_server(domain, True)
48 if result["avatar_server_https"]:
49 result["avatar_server_https_ipv4"] = lookup_ip_address(
50 result["avatar_server_https"], False
51 )
52 result["avatar_server_https_ipv6"] = lookup_ip_address(
53 result["avatar_server_https"], True
54 )
55 return render(
56 self.request,
57 self.template_name,
58 {
59 "form": form,
60 "result": result,
61 },
62 )
65class CheckView(FormView):
66 """
67 View class for checking an e-mail or openid address
68 """
70 template_name = "check.html"
71 form_class = CheckForm
72 success_url = reverse("tools_check")
74 def form_valid(self, form):
75 mailurl = None
76 openidurl = None
77 mailurl_secure = None
78 mailurl_secure_256 = None
79 openidurl_secure = None
80 mail_hash = None
81 mail_hash256 = None
82 openid_hash = None
83 size = 80
85 super().form_valid(form)
87 if form.cleaned_data["default_url"]:
88 default_url = form.cleaned_data["default_url"]
89 elif (
90 form.cleaned_data["default_opt"]
91 and form.cleaned_data["default_opt"] != "none"
92 ):
93 default_url = form.cleaned_data["default_opt"]
94 else:
95 default_url = None
97 if "size" in form.cleaned_data:
98 size = form.cleaned_data["size"]
99 if form.cleaned_data["mail"]:
100 mailurl = libravatar_url(
101 email=form.cleaned_data["mail"], size=size, default=default_url
102 )
103 mailurl = mailurl.replace(LIBRAVATAR_BASE_URL, BASE_URL)
104 mailurl_secure = libravatar_url(
105 email=form.cleaned_data["mail"],
106 size=size,
107 https=True,
108 default=default_url,
109 )
110 mailurl_secure = mailurl_secure.replace(
111 LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL
112 )
113 mail_hash = parse_user_identity(
114 email=form.cleaned_data["mail"], openid=None
115 )[0]
116 hash_obj = hashlib.new("sha256")
117 hash_obj.update(form.cleaned_data["mail"].encode("utf-8"))
118 mail_hash256 = hash_obj.hexdigest()
119 mailurl_secure_256 = mailurl_secure.replace(mail_hash, mail_hash256)
120 if form.cleaned_data["openid"]:
121 if not form.cleaned_data["openid"].startswith(
122 "http://"
123 ) and not form.cleaned_data["openid"].startswith("https://"):
124 form.cleaned_data["openid"] = "http://%s" % form.cleaned_data["openid"]
125 openidurl = libravatar_url(
126 openid=form.cleaned_data["openid"], size=size, default=default_url
127 )
128 openidurl = openidurl.replace(LIBRAVATAR_BASE_URL, BASE_URL)
129 openidurl_secure = libravatar_url(
130 openid=form.cleaned_data["openid"],
131 size=size,
132 https=True,
133 default=default_url,
134 )
135 openidurl_secure = openidurl_secure.replace(
136 LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL
137 )
138 openid_hash = parse_user_identity(
139 openid=form.cleaned_data["openid"], email=None
140 )[0]
142 return render(
143 self.request,
144 self.template_name,
145 {
146 "form": form,
147 "mailurl": mailurl,
148 "openidurl": openidurl,
149 "mailurl_secure": mailurl_secure,
150 "mailurl_secure_256": mailurl_secure_256,
151 "openidurl_secure": openidurl_secure,
152 "mail_hash": mail_hash,
153 "mail_hash256": mail_hash256,
154 "openid_hash": openid_hash,
155 "size": size,
156 },
157 )
160def lookup_avatar_server(domain, https):
161 """
162 Extract the avatar server from an SRV record in the DNS zone
164 The SRV records should look like this:
166 _avatars._tcp.example.com. IN SRV 0 0 80 avatars.example.com
167 _avatars-sec._tcp.example.com. IN SRV 0 0 443 avatars.example.com
168 """
170 if domain and len(domain) > 60:
171 domain = domain[:60]
173 service_name = None
174 if https:
175 service_name = "_avatars-sec._tcp.%s" % domain
176 else:
177 service_name = "_avatars._tcp.%s" % domain
179 DNS.DiscoverNameServers()
180 try:
181 dns_request = DNS.Request(name=service_name, qtype="SRV").req()
182 except DNS.DNSError as message:
183 print("DNS Error: %s (%s)" % (message, domain))
184 return None
186 if dns_request.header["status"] == "NXDOMAIN":
187 # Not an error, but no point in going any further
188 return None
190 if dns_request.header["status"] != "NOERROR":
191 print("DNS Error: status=%s (%s)" % (dns_request.header["status"], domain))
192 return None
194 records = []
195 for answer in dns_request.answers:
196 if (
197 ("data" not in answer)
198 or (not answer["data"])
199 or (not answer["typename"])
200 or (answer["typename"] != "SRV")
201 ):
202 continue
204 record = {
205 "priority": int(answer["data"][0]),
206 "weight": int(answer["data"][1]),
207 "port": int(answer["data"][2]),
208 "target": answer["data"][3],
209 }
211 records.append(record)
213 target, port = srv_hostname(records)
215 if target and ((https and port != 443) or (not https and port != 80)):
216 return "%s:%s" % (target, port)
218 return target
221def srv_hostname(records):
222 """
223 Return the right (target, port) pair from a list of SRV records.
224 """
226 if len(records) < 1:
227 return (None, None)
229 if len(records) == 1:
230 ret = records[0]
231 return (ret["target"], ret["port"])
233 # Keep only the servers in the top priority
234 priority_records = []
235 total_weight = 0
236 top_priority = records[0]["priority"] # highest priority = lowest number
238 for ret in records:
239 if ret["priority"] > top_priority:
240 # ignore the record (ret has lower priority)
241 continue
243 # Take care - this if is only a if, if the above if
244 # uses continue at the end. else it should be an elsif
245 if ret["priority"] < top_priority:
246 # reset the aretay (ret has higher priority)
247 top_priority = ret["priority"]
248 total_weight = 0
249 priority_records = []
251 total_weight += ret["weight"]
253 if ret["weight"] > 0:
254 priority_records.append((total_weight, ret))
255 else:
256 # zero-weigth elements must come first
257 priority_records.insert(0, (0, ret))
259 if len(priority_records) == 1:
260 unused, ret = priority_records[0] # pylint: disable=unused-variable
261 return (ret["target"], ret["port"])
263 # Select first record according to RFC2782 weight ordering algorithm (page 3)
264 random_number = random.randint(0, total_weight)
266 for record in priority_records:
267 weighted_index, ret = record
269 if weighted_index >= random_number:
270 return (ret["target"], ret["port"])
272 print("There is something wrong with our SRV weight ordering algorithm")
273 return (None, None)
276def lookup_ip_address(hostname, ipv6):
277 """
278 Try to get IPv4 or IPv6 addresses for the given hostname
279 """
281 DNS.DiscoverNameServers()
282 try:
283 if ipv6:
284 dns_request = DNS.Request(name=hostname, qtype=DNS.Type.AAAA).req()
285 else:
286 dns_request = DNS.Request(name=hostname, qtype=DNS.Type.A).req()
287 except DNS.DNSError as message:
288 print("DNS Error: %s (%s)" % (message, hostname))
289 return None
291 if dns_request.header["status"] != "NOERROR":
292 print("DNS Error: status=%s (%s)" % (dns_request.header["status"], hostname))
293 return None
295 for answer in dns_request.answers:
296 if ("data" not in answer) or (not answer["data"]):
297 continue
298 if (ipv6 and answer["typename"] != "AAAA") or (
299 not ipv6 and answer["typename"] != "A"
300 ):
301 continue # skip CNAME records
303 if ipv6:
304 return inet_ntop(AF_INET6, answer["data"])
306 return answer["data"]
308 return None