Coverage for ivatar/tools/views.py: 71%

150 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-18 23:10 +0000

1# -*- coding: utf-8 -*- 

2""" 

3View classes for ivatar/tools/ 

4""" 

5from socket import inet_ntop, AF_INET6 

6import hashlib 

7import random 

8 

9from django.views.generic.edit import FormView 

10from django.urls import reverse_lazy as reverse 

11from django.shortcuts import render 

12 

13import DNS 

14 

15from libravatar import libravatar_url, parse_user_identity 

16from libravatar import SECURE_BASE_URL as LIBRAVATAR_SECURE_BASE_URL 

17from libravatar import BASE_URL as LIBRAVATAR_BASE_URL 

18 

19from ivatar.settings import SECURE_BASE_URL, BASE_URL 

20from .forms import ( 

21 CheckDomainForm, 

22 CheckForm, 

23) # pylint: disable=relative-beyond-top-level 

24 

25 

26class CheckDomainView(FormView): 

27 """ 

28 View class for checking a domain 

29 """ 

30 

31 template_name = "check_domain.html" 

32 form_class = CheckDomainForm 

33 success_url = reverse("tools_check_domain") 

34 

35 def form_valid(self, form): 

36 result = {} 

37 super().form_valid(form) 

38 domain = form.cleaned_data["domain"] 

39 result["avatar_server_http"] = lookup_avatar_server(domain, False) 

40 if result["avatar_server_http"]: 

41 result["avatar_server_http_ipv4"] = lookup_ip_address( 

42 result["avatar_server_http"], False 

43 ) 

44 result["avatar_server_http_ipv6"] = lookup_ip_address( 

45 result["avatar_server_http"], True 

46 ) 

47 result["avatar_server_https"] = lookup_avatar_server(domain, True) 

48 if result["avatar_server_https"]: 

49 result["avatar_server_https_ipv4"] = lookup_ip_address( 

50 result["avatar_server_https"], False 

51 ) 

52 result["avatar_server_https_ipv6"] = lookup_ip_address( 

53 result["avatar_server_https"], True 

54 ) 

55 return render( 

56 self.request, 

57 self.template_name, 

58 { 

59 "form": form, 

60 "result": result, 

61 }, 

62 ) 

63 

64 

65class CheckView(FormView): 

66 """ 

67 View class for checking an e-mail or openid address 

68 """ 

69 

70 template_name = "check.html" 

71 form_class = CheckForm 

72 success_url = reverse("tools_check") 

73 

74 def form_valid(self, form): 

75 mailurl = None 

76 openidurl = None 

77 mailurl_secure = None 

78 mailurl_secure_256 = None 

79 openidurl_secure = None 

80 mail_hash = None 

81 mail_hash256 = None 

82 openid_hash = None 

83 size = 80 

84 

85 super().form_valid(form) 

86 

87 if form.cleaned_data["default_url"]: 

88 default_url = form.cleaned_data["default_url"] 

89 elif ( 

90 form.cleaned_data["default_opt"] 

91 and form.cleaned_data["default_opt"] != "none" 

92 ): 

93 default_url = form.cleaned_data["default_opt"] 

94 else: 

95 default_url = None 

96 

97 if "size" in form.cleaned_data: 

98 size = form.cleaned_data["size"] 

99 if form.cleaned_data["mail"]: 

100 mailurl = libravatar_url( 

101 email=form.cleaned_data["mail"], size=size, default=default_url 

102 ) 

103 mailurl = mailurl.replace(LIBRAVATAR_BASE_URL, BASE_URL) 

104 mailurl_secure = libravatar_url( 

105 email=form.cleaned_data["mail"], 

106 size=size, 

107 https=True, 

108 default=default_url, 

109 ) 

110 mailurl_secure = mailurl_secure.replace( 

111 LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL 

112 ) 

113 mail_hash = parse_user_identity( 

114 email=form.cleaned_data["mail"], openid=None 

115 )[0] 

116 hash_obj = hashlib.new("sha256") 

117 hash_obj.update(form.cleaned_data["mail"].encode("utf-8")) 

118 mail_hash256 = hash_obj.hexdigest() 

119 mailurl_secure_256 = mailurl_secure.replace(mail_hash, mail_hash256) 

120 if form.cleaned_data["openid"]: 

121 if not form.cleaned_data["openid"].startswith( 

122 "http://" 

123 ) and not form.cleaned_data["openid"].startswith("https://"): 

124 form.cleaned_data["openid"] = "http://%s" % form.cleaned_data["openid"] 

125 openidurl = libravatar_url( 

126 openid=form.cleaned_data["openid"], size=size, default=default_url 

127 ) 

128 openidurl = openidurl.replace(LIBRAVATAR_BASE_URL, BASE_URL) 

129 openidurl_secure = libravatar_url( 

130 openid=form.cleaned_data["openid"], 

131 size=size, 

132 https=True, 

133 default=default_url, 

134 ) 

135 openidurl_secure = openidurl_secure.replace( 

136 LIBRAVATAR_SECURE_BASE_URL, SECURE_BASE_URL 

137 ) 

138 openid_hash = parse_user_identity( 

139 openid=form.cleaned_data["openid"], email=None 

140 )[0] 

141 

142 return render( 

143 self.request, 

144 self.template_name, 

145 { 

146 "form": form, 

147 "mailurl": mailurl, 

148 "openidurl": openidurl, 

149 "mailurl_secure": mailurl_secure, 

150 "mailurl_secure_256": mailurl_secure_256, 

151 "openidurl_secure": openidurl_secure, 

152 "mail_hash": mail_hash, 

153 "mail_hash256": mail_hash256, 

154 "openid_hash": openid_hash, 

155 "size": size, 

156 }, 

157 ) 

158 

159 

160def lookup_avatar_server(domain, https): 

161 """ 

162 Extract the avatar server from an SRV record in the DNS zone 

163 

164 The SRV records should look like this: 

165 

166 _avatars._tcp.example.com. IN SRV 0 0 80 avatars.example.com 

167 _avatars-sec._tcp.example.com. IN SRV 0 0 443 avatars.example.com 

168 """ 

169 

170 if domain and len(domain) > 60: 

171 domain = domain[:60] 

172 

173 service_name = None 

174 if https: 

175 service_name = "_avatars-sec._tcp.%s" % domain 

176 else: 

177 service_name = "_avatars._tcp.%s" % domain 

178 

179 DNS.DiscoverNameServers() 

180 try: 

181 dns_request = DNS.Request(name=service_name, qtype="SRV").req() 

182 except DNS.DNSError as message: 

183 print("DNS Error: %s (%s)" % (message, domain)) 

184 return None 

185 

186 if dns_request.header["status"] == "NXDOMAIN": 

187 # Not an error, but no point in going any further 

188 return None 

189 

190 if dns_request.header["status"] != "NOERROR": 

191 print("DNS Error: status=%s (%s)" % (dns_request.header["status"], domain)) 

192 return None 

193 

194 records = [] 

195 for answer in dns_request.answers: 

196 if ( 

197 ("data" not in answer) 

198 or (not answer["data"]) 

199 or (not answer["typename"]) 

200 or (answer["typename"] != "SRV") 

201 ): 

202 continue 

203 

204 record = { 

205 "priority": int(answer["data"][0]), 

206 "weight": int(answer["data"][1]), 

207 "port": int(answer["data"][2]), 

208 "target": answer["data"][3], 

209 } 

210 

211 records.append(record) 

212 

213 target, port = srv_hostname(records) 

214 

215 if target and ((https and port != 443) or (not https and port != 80)): 

216 return "%s:%s" % (target, port) 

217 

218 return target 

219 

220 

221def srv_hostname(records): 

222 """ 

223 Return the right (target, port) pair from a list of SRV records. 

224 """ 

225 

226 if len(records) < 1: 

227 return (None, None) 

228 

229 if len(records) == 1: 

230 ret = records[0] 

231 return (ret["target"], ret["port"]) 

232 

233 # Keep only the servers in the top priority 

234 priority_records = [] 

235 total_weight = 0 

236 top_priority = records[0]["priority"] # highest priority = lowest number 

237 

238 for ret in records: 

239 if ret["priority"] > top_priority: 

240 # ignore the record (ret has lower priority) 

241 continue 

242 

243 # Take care - this if is only a if, if the above if 

244 # uses continue at the end. else it should be an elsif 

245 if ret["priority"] < top_priority: 

246 # reset the aretay (ret has higher priority) 

247 top_priority = ret["priority"] 

248 total_weight = 0 

249 priority_records = [] 

250 

251 total_weight += ret["weight"] 

252 

253 if ret["weight"] > 0: 

254 priority_records.append((total_weight, ret)) 

255 else: 

256 # zero-weigth elements must come first 

257 priority_records.insert(0, (0, ret)) 

258 

259 if len(priority_records) == 1: 

260 unused, ret = priority_records[0] # pylint: disable=unused-variable 

261 return (ret["target"], ret["port"]) 

262 

263 # Select first record according to RFC2782 weight ordering algorithm (page 3) 

264 random_number = random.randint(0, total_weight) 

265 

266 for record in priority_records: 

267 weighted_index, ret = record 

268 

269 if weighted_index >= random_number: 

270 return (ret["target"], ret["port"]) 

271 

272 print("There is something wrong with our SRV weight ordering algorithm") 

273 return (None, None) 

274 

275 

276def lookup_ip_address(hostname, ipv6): 

277 """ 

278 Try to get IPv4 or IPv6 addresses for the given hostname 

279 """ 

280 

281 DNS.DiscoverNameServers() 

282 try: 

283 if ipv6: 

284 dns_request = DNS.Request(name=hostname, qtype=DNS.Type.AAAA).req() 

285 else: 

286 dns_request = DNS.Request(name=hostname, qtype=DNS.Type.A).req() 

287 except DNS.DNSError as message: 

288 print("DNS Error: %s (%s)" % (message, hostname)) 

289 return None 

290 

291 if dns_request.header["status"] != "NOERROR": 

292 print("DNS Error: status=%s (%s)" % (dns_request.header["status"], hostname)) 

293 return None 

294 

295 for answer in dns_request.answers: 

296 if ("data" not in answer) or (not answer["data"]): 

297 continue 

298 if (ipv6 and answer["typename"] != "AAAA") or ( 

299 not ipv6 and answer["typename"] != "A" 

300 ): 

301 continue # skip CNAME records 

302 

303 if ipv6: 

304 return inet_ntop(AF_INET6, answer["data"]) 

305 

306 return answer["data"] 

307 

308 return None