Coverage for ivatar/utils.py: 66%

136 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-12 23:12 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Simple module providing reusable random_string function 

4""" 

5 

6import contextlib 

7import random 

8import string 

9from io import BytesIO 

10from PIL import Image, ImageDraw, ImageSequence 

11from urllib.parse import urlparse 

12import requests 

13from ivatar.settings import DEBUG, URL_TIMEOUT 

14from urllib.request import urlopen as urlopen_orig 

15 

16BLUESKY_IDENTIFIER = None 

17BLUESKY_APP_PASSWORD = None 

18with contextlib.suppress(Exception): 

19 from ivatar.settings import BLUESKY_IDENTIFIER, BLUESKY_APP_PASSWORD 

20 

21 

22def urlopen(url, timeout=URL_TIMEOUT): 

23 ctx = None 

24 if DEBUG: 

25 import ssl 

26 

27 ctx = ssl.create_default_context() 

28 ctx.check_hostname = False 

29 ctx.verify_mode = ssl.CERT_NONE 

30 return urlopen_orig(url, timeout=timeout, context=ctx) 

31 

32 

33class Bluesky: 

34 """ 

35 Handle Bluesky client access 

36 """ 

37 

38 identifier = "" 

39 app_password = "" 

40 service = "https://bsky.social" 

41 session = None 

42 

43 def __init__( 

44 self, 

45 identifier: str = BLUESKY_IDENTIFIER, 

46 app_password: str = BLUESKY_APP_PASSWORD, 

47 service: str = "https://bsky.social", 

48 ): 

49 self.identifier = identifier 

50 self.app_password = app_password 

51 self.service = service 

52 

53 def login(self): 

54 """ 

55 Login to Bluesky 

56 """ 

57 auth_response = requests.post( 

58 f"{self.service}/xrpc/com.atproto.server.createSession", 

59 json={"identifier": self.identifier, "password": self.app_password}, 

60 ) 

61 auth_response.raise_for_status() 

62 self.session = auth_response.json() 

63 

64 def normalize_handle(self, handle: str) -> str: 

65 """ 

66 Return the normalized handle for given handle 

67 """ 

68 # Normalize Bluesky handle in case someone enters an '@' at the beginning 

69 while handle.startswith("@"): 

70 handle = handle[1:] 

71 # Remove trailing spaces or spaces at the beginning 

72 while handle.startswith(" "): 

73 handle = handle[1:] 

74 while handle.endswith(" "): 

75 handle = handle[:-1] 

76 return handle 

77 

78 def get_profile(self, handle: str) -> str: 

79 if not self.session: 

80 self.login() 

81 profile_response = None 

82 

83 try: 

84 profile_response = requests.get( 

85 f"{self.service}/xrpc/app.bsky.actor.getProfile", 

86 headers={"Authorization": f'Bearer {self.session["accessJwt"]}'}, 

87 params={"actor": handle}, 

88 ) 

89 profile_response.raise_for_status() 

90 except Exception as exc: 

91 print(f"Bluesky profile fetch failed with HTTP error: {exc}") 

92 return None 

93 

94 return profile_response.json() 

95 

96 def get_avatar(self, handle: str): 

97 """ 

98 Get avatar URL for a handle 

99 """ 

100 profile = self.get_profile(handle) 

101 return profile["avatar"] if profile else None 

102 

103 

104def random_string(length=10): 

105 """ 

106 Return some random string with default length 10 

107 """ 

108 return "".join( 

109 random.SystemRandom().choice(string.ascii_lowercase + string.digits) 

110 for _ in range(length) 

111 ) 

112 

113 

114def openid_variations(openid): 

115 """ 

116 Return the various OpenID variations, ALWAYS in the same order: 

117 - http w/ trailing slash 

118 - http w/o trailing slash 

119 - https w/ trailing slash 

120 - https w/o trailing slash 

121 """ 

122 

123 # Make the 'base' version: http w/ trailing slash 

124 if openid.startswith("https://"): 

125 openid = openid.replace("https://", "http://") 

126 if openid[-1] != "/": 

127 openid = f"{openid}/" 

128 

129 # http w/o trailing slash 

130 var1 = openid[:-1] 

131 var2 = openid.replace("http://", "https://") 

132 var3 = var2[:-1] 

133 return (openid, var1, var2, var3) 

134 

135 

136def mm_ng( 

137 idhash, size=80, add_red=0, add_green=0, add_blue=0 

138): # pylint: disable=too-many-locals 

139 """ 

140 Return an MM (mystery man) image, based on a given hash 

141 add some red, green or blue, if specified 

142 """ 

143 

144 # Make sure the lightest bg color we paint is e0, else 

145 # we do not see the MM any more 

146 if idhash[0] == "f": 

147 idhash = "e0" 

148 

149 # How large is the circle? 

150 circle_size = size * 0.6 

151 

152 # Coordinates for the circle 

153 start_x = int(size * 0.2) 

154 end_x = start_x + circle_size 

155 start_y = int(size * 0.05) 

156 end_y = start_y + circle_size 

157 

158 # All are the same, based on the input hash 

159 # this should always result in a "gray-ish" background 

160 red = idhash[:2] 

161 green = idhash[:2] 

162 blue = idhash[:2] 

163 

164 # Add some red (i/a) and make sure it's not over 255 

165 red = hex(int(red, 16) + add_red).replace("0x", "") 

166 if int(red, 16) > 255: 

167 red = "ff" 

168 if len(red) == 1: 

169 red = f"0{red}" 

170 

171 # Add some green (i/a) and make sure it's not over 255 

172 green = hex(int(green, 16) + add_green).replace("0x", "") 

173 if int(green, 16) > 255: 

174 green = "ff" 

175 if len(green) == 1: 

176 green = f"0{green}" 

177 

178 # Add some blue (i/a) and make sure it's not over 255 

179 blue = hex(int(blue, 16) + add_blue).replace("0x", "") 

180 if int(blue, 16) > 255: 

181 blue = "ff" 

182 if len(blue) == 1: 

183 blue = f"0{blue}" 

184 

185 # Assemble the bg color "string" in web notation. Eg. '#d3d3d3' 

186 bg_color = f"#{red}{green}{blue}" 

187 

188 # Image 

189 image = Image.new("RGB", (size, size)) 

190 draw = ImageDraw.Draw(image) 

191 

192 # Draw background 

193 draw.rectangle(((0, 0), (size, size)), fill=bg_color) 

194 

195 # Draw MMs head 

196 draw.ellipse((start_x, start_y, end_x, end_y), fill="white") 

197 

198 # Draw MMs 'body' 

199 draw.polygon( 

200 ( 

201 (start_x + circle_size / 2, size / 2.5), 

202 (size * 0.15, size), 

203 (size - size * 0.15, size), 

204 ), 

205 fill="white", 

206 ) 

207 

208 return image 

209 

210 

211def is_trusted_url(url, url_filters): 

212 """ 

213 Check if a URL is valid and considered a trusted URL. 

214 If the URL is malformed, returns False. 

215 

216 Based on: https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/events/UrlFilter 

217 """ 

218 (scheme, netloc, path, params, query, fragment) = urlparse(url) 

219 

220 for filter in url_filters: 

221 if "schemes" in filter: 

222 schemes = filter["schemes"] 

223 

224 if scheme not in schemes: 

225 continue 

226 

227 if "host_equals" in filter: 

228 host_equals = filter["host_equals"] 

229 

230 if netloc != host_equals: 

231 continue 

232 

233 if "host_suffix" in filter: 

234 host_suffix = filter["host_suffix"] 

235 

236 if not netloc.endswith(host_suffix): 

237 continue 

238 

239 if "path_prefix" in filter: 

240 path_prefix = filter["path_prefix"] 

241 

242 if not path.startswith(path_prefix): 

243 continue 

244 

245 if "url_prefix" in filter: 

246 url_prefix = filter["url_prefix"] 

247 

248 if not url.startswith(url_prefix): 

249 continue 

250 

251 return True 

252 

253 return False 

254 

255 

256def resize_animated_gif(input_pil: Image, size: list) -> BytesIO: 

257 def _thumbnail_frames(image): 

258 for frame in ImageSequence.Iterator(image): 

259 new_frame = frame.copy() 

260 new_frame.thumbnail(size) 

261 yield new_frame 

262 

263 frames = list(_thumbnail_frames(input_pil)) 

264 output = BytesIO() 

265 output_image = frames[0] 

266 output_image.save( 

267 output, 

268 format="gif", 

269 save_all=True, 

270 optimize=False, 

271 append_images=frames[1:], 

272 disposal=input_pil.disposal_method, 

273 **input_pil.info, 

274 ) 

275 return output