Coverage for ivatar/tools/views.py: 66%

156 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-12 23:12 +0000

1# -*- coding: utf-8 -*- 

2""" 

3View classes for ivatar/tools/ 

4""" 

5from socket import inet_ntop, AF_INET6 

6import hashlib 

7import random 

8 

9from django.views.generic.edit import FormView 

10from django.urls import reverse_lazy as reverse 

11from django.shortcuts import render 

12 

13import DNS 

14 

15from libravatar import libravatar_url, parse_user_identity 

16from libravatar import SECURE_BASE_URL as LIBRAVATAR_SECURE_BASE_URL 

17from libravatar import BASE_URL as LIBRAVATAR_BASE_URL 

18 

19from ivatar.settings import SECURE_BASE_URL, BASE_URL, SITE_NAME, DEBUG 

20from .forms import ( 

21 CheckDomainForm, 

22 CheckForm, 

23) # pylint: disable=relative-beyond-top-level 

24 

25 

26class CheckDomainView(FormView): 

27 """ 

28 View class for checking a domain 

29 """ 

30 

31 template_name = "check_domain.html" 

32 form_class = CheckDomainForm 

33 success_url = reverse("tools_check_domain") 

34 

35 def form_valid(self, form): 

36 super().form_valid(form) 

37 domain = form.cleaned_data["domain"] 

38 result = {"avatar_server_http": lookup_avatar_server(domain, False)} 

39 if result["avatar_server_http"]: 

40 result["avatar_server_http_ipv4"] = lookup_ip_address( 

41 result["avatar_server_http"], False 

42 ) 

43 result["avatar_server_http_ipv6"] = lookup_ip_address( 

44 result["avatar_server_http"], True 

45 ) 

46 result["avatar_server_https"] = lookup_avatar_server(domain, True) 

47 if result["avatar_server_https"]: 

48 result["avatar_server_https_ipv4"] = lookup_ip_address( 

49 result["avatar_server_https"], False 

50 ) 

51 result["avatar_server_https_ipv6"] = lookup_ip_address( 

52 result["avatar_server_https"], True 

53 ) 

54 return render( 

55 self.request, 

56 self.template_name, 

57 { 

58 "form": form, 

59 "result": result, 

60 }, 

61 ) 

62 

63 

64class CheckView(FormView): 

65 """ 

66 View class for checking an e-mail or openid address 

67 """ 

68 

69 template_name = "check.html" 

70 form_class = CheckForm 

71 success_url = reverse("tools_check") 

72 

73 def form_valid(self, form): 

74 mailurl = None 

75 openidurl = None 

76 mailurl_secure = None 

77 mailurl_secure_256 = None 

78 openidurl_secure = None 

79 mail_hash = None 

80 mail_hash256 = None 

81 openid_hash = None 

82 super().form_valid(form) 

83 

84 if form.cleaned_data["default_url"]: 

85 default_url = form.cleaned_data["default_url"] 

86 elif ( 

87 form.cleaned_data["default_opt"] 

88 and form.cleaned_data["default_opt"] != "none" 

89 ): 

90 default_url = form.cleaned_data["default_opt"] 

91 else: 

92 default_url = None 

93 

94 size = form.cleaned_data["size"] if "size" in form.cleaned_data else 80 

95 if form.cleaned_data["mail"]: 

96 mailurl = libravatar_url( 

97 email=form.cleaned_data["mail"], size=size, default=default_url 

98 ) 

99 mailurl = mailurl.replace(LIBRAVATAR_BASE_URL, BASE_URL) 

100 mailurl_secure = libravatar_url( 

101 email=form.cleaned_data["mail"], 

102 size=size, 

103 https=True, 

104 default=default_url, 

105 ) 

106 mailurl_secure = mailurl_secure.replace( 

107 LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL 

108 ) 

109 mail_hash = parse_user_identity( 

110 email=form.cleaned_data["mail"], openid=None 

111 )[0] 

112 hash_obj = hashlib.new("sha256") 

113 hash_obj.update(form.cleaned_data["mail"].encode("utf-8")) 

114 mail_hash256 = hash_obj.hexdigest() 

115 mailurl_secure_256 = mailurl_secure.replace(mail_hash, mail_hash256) 

116 if form.cleaned_data["openid"]: 

117 if not form.cleaned_data["openid"].startswith( 

118 "http://" 

119 ) and not form.cleaned_data["openid"].startswith("https://"): 

120 form.cleaned_data["openid"] = f'http://{form.cleaned_data["openid"]}' 

121 openidurl = libravatar_url( 

122 openid=form.cleaned_data["openid"], size=size, default=default_url 

123 ) 

124 openidurl = openidurl.replace(LIBRAVATAR_BASE_URL, BASE_URL) 

125 openidurl_secure = libravatar_url( 

126 openid=form.cleaned_data["openid"], 

127 size=size, 

128 https=True, 

129 default=default_url, 

130 ) 

131 openidurl_secure = openidurl_secure.replace( 

132 LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL 

133 ) 

134 openid_hash = parse_user_identity( 

135 openid=form.cleaned_data["openid"], email=None 

136 )[0] 

137 

138 if "DEVELOPMENT" in SITE_NAME and DEBUG: 

139 if mailurl: 

140 mailurl = mailurl.replace( 

141 "https://avatars.linux-kernel.at", 

142 f"http://{self.request.get_host()}", 

143 ) 

144 if mailurl_secure: 

145 mailurl_secure = mailurl_secure.replace( 

146 "https://avatars.linux-kernel.at", 

147 f"http://{self.request.get_host()}", 

148 ) 

149 if mailurl_secure_256: 

150 mailurl_secure_256 = mailurl_secure_256.replace( 

151 "https://avatars.linux-kernel.at", 

152 f"http://{self.request.get_host()}", 

153 ) 

154 

155 if openidurl: 

156 openidurl = openidurl.replace( 

157 "https://avatars.linux-kernel.at", 

158 f"http://{self.request.get_host()}", 

159 ) 

160 if openidurl_secure: 

161 openidurl_secure = openidurl_secure.replace( 

162 "https://avatars.linux-kernel.at", 

163 f"http://{self.request.get_host()}", 

164 ) 

165 print(mailurl, openidurl, mailurl_secure, mailurl_secure_256, openidurl_secure) 

166 

167 return render( 

168 self.request, 

169 self.template_name, 

170 { 

171 "form": form, 

172 "mailurl": mailurl, 

173 "openidurl": openidurl, 

174 "mailurl_secure": mailurl_secure, 

175 "mailurl_secure_256": mailurl_secure_256, 

176 "openidurl_secure": openidurl_secure, 

177 "mail_hash": mail_hash, 

178 "mail_hash256": mail_hash256, 

179 "openid_hash": openid_hash, 

180 "size": size, 

181 }, 

182 ) 

183 

184 

185def lookup_avatar_server(domain, https): 

186 """ 

187 Extract the avatar server from an SRV record in the DNS zone 

188 

189 The SRV records should look like this: 

190 

191 _avatars._tcp.example.com. IN SRV 0 0 80 avatars.example.com 

192 _avatars-sec._tcp.example.com. IN SRV 0 0 443 avatars.example.com 

193 """ 

194 

195 if domain and len(domain) > 60: 

196 domain = domain[:60] 

197 

198 service_name = None 

199 if https: 

200 service_name = f"_avatars-sec._tcp.{domain}" 

201 else: 

202 service_name = f"_avatars._tcp.{domain}" 

203 

204 DNS.DiscoverNameServers() 

205 try: 

206 dns_request = DNS.Request(name=service_name, qtype="SRV").req() 

207 except DNS.DNSError as message: 

208 print(f"DNS Error: {message} ({domain})") 

209 return None 

210 

211 if dns_request.header["status"] == "NXDOMAIN": 

212 # Not an error, but no point in going any further 

213 return None 

214 

215 if dns_request.header["status"] != "NOERROR": 

216 print(f'DNS Error: status={dns_request.header["status"]} ({domain})') 

217 return None 

218 

219 records = [] 

220 for answer in dns_request.answers: 

221 if ( 

222 ("data" not in answer) 

223 or (not answer["data"]) 

224 or (not answer["typename"]) 

225 or (answer["typename"] != "SRV") 

226 ): 

227 continue 

228 

229 record = { 

230 "priority": int(answer["data"][0]), 

231 "weight": int(answer["data"][1]), 

232 "port": int(answer["data"][2]), 

233 "target": answer["data"][3], 

234 } 

235 

236 records.append(record) 

237 

238 target, port = srv_hostname(records) 

239 

240 if target and ((https and port != 443) or (not https and port != 80)): 

241 return f"{target}:{port}" 

242 

243 return target 

244 

245 

246def srv_hostname(records): 

247 """ 

248 Return the right (target, port) pair from a list of SRV records. 

249 """ 

250 

251 if len(records) < 1: 

252 return (None, None) 

253 

254 if len(records) == 1: 

255 ret = records[0] 

256 return (ret["target"], ret["port"]) 

257 

258 # Keep only the servers in the top priority 

259 priority_records = [] 

260 total_weight = 0 

261 top_priority = records[0]["priority"] # highest priority = lowest number 

262 

263 for ret in records: 

264 if ret["priority"] > top_priority: 

265 # ignore the record (ret has lower priority) 

266 continue 

267 

268 # Take care - this if is only a if, if the above if 

269 # uses continue at the end. else it should be an elsif 

270 if ret["priority"] < top_priority: 

271 # reset the priority (ret has higher priority) 

272 top_priority = ret["priority"] 

273 total_weight = 0 

274 priority_records = [] 

275 

276 total_weight += ret["weight"] 

277 

278 if ret["weight"] > 0: 

279 priority_records.append((total_weight, ret)) 

280 else: 

281 # zero-weight elements must come first 

282 priority_records.insert(0, (0, ret)) 

283 

284 if len(priority_records) == 1: 

285 unused, ret = priority_records[0] # pylint: disable=unused-variable 

286 return (ret["target"], ret["port"]) 

287 

288 # Select first record according to RFC2782 weight ordering algorithm (page 3) 

289 random_number = random.randint(0, total_weight) 

290 

291 for record in priority_records: 

292 weighted_index, ret = record 

293 

294 if weighted_index >= random_number: 

295 return (ret["target"], ret["port"]) 

296 

297 print("There is something wrong with our SRV weight ordering algorithm") 

298 return (None, None) 

299 

300 

301def lookup_ip_address(hostname, ipv6): 

302 """ 

303 Try to get IPv4 or IPv6 addresses for the given hostname 

304 """ 

305 

306 DNS.DiscoverNameServers() 

307 try: 

308 if ipv6: 

309 dns_request = DNS.Request(name=hostname, qtype=DNS.Type.AAAA).req() 

310 else: 

311 dns_request = DNS.Request(name=hostname, qtype=DNS.Type.A).req() 

312 except DNS.DNSError as message: 

313 print(f"DNS Error: {message} ({hostname})") 

314 return None 

315 

316 if dns_request.header["status"] != "NOERROR": 

317 print(f'DNS Error: status={dns_request.header["status"]} ({hostname})') 

318 return None 

319 

320 for answer in dns_request.answers: 

321 if ("data" not in answer) or (not answer["data"]): 

322 continue 

323 if (ipv6 and answer["typename"] != "AAAA") or ( 

324 not ipv6 and answer["typename"] != "A" 

325 ): 

326 continue # skip CNAME records 

327 

328 return inet_ntop(AF_INET6, answer["data"]) if ipv6 else answer["data"] 

329 return None