Coverage for ivatar/utils.py: 65%

172 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-20 23:06 +0000

1# -*- coding: utf-8 -*- 

2""" 

3Simple module providing reusable random_string function 

4""" 

5 

6import contextlib 

7import random 

8import string 

9import logging 

10from io import BytesIO 

11from PIL import Image, ImageDraw, ImageSequence 

12from urllib.parse import urlparse 

13import requests 

14from ivatar.settings import DEBUG, URL_TIMEOUT 

15from urllib.request import urlopen as urlopen_orig 

16 

17# Initialize logger 

18logger = logging.getLogger("ivatar") 

19 

20BLUESKY_IDENTIFIER = None 

21BLUESKY_APP_PASSWORD = None 

22with contextlib.suppress(Exception): 

23 from ivatar.settings import BLUESKY_IDENTIFIER, BLUESKY_APP_PASSWORD 

24 

25 

26def urlopen(url, timeout=URL_TIMEOUT): 

27 ctx = None 

28 if DEBUG: 

29 import ssl 

30 

31 ctx = ssl.create_default_context() 

32 ctx.check_hostname = False 

33 ctx.verify_mode = ssl.CERT_NONE 

34 return urlopen_orig(url, timeout=timeout, context=ctx) 

35 

36 

37class Bluesky: 

38 """ 

39 Handle Bluesky client access with persistent session management 

40 """ 

41 

42 identifier = "" 

43 app_password = "" 

44 service = "https://bsky.social" 

45 session = None 

46 _shared_session = None # Class-level shared session 

47 _session_expires_at = None # Track session expiration 

48 

49 def __init__( 

50 self, 

51 identifier: str = BLUESKY_IDENTIFIER, 

52 app_password: str = BLUESKY_APP_PASSWORD, 

53 service: str = "https://bsky.social", 

54 ): 

55 self.identifier = identifier 

56 self.app_password = app_password 

57 self.service = service 

58 

59 def _is_session_valid(self) -> bool: 

60 """ 

61 Check if the current session is still valid 

62 """ 

63 if not self._shared_session or not self._session_expires_at: 

64 return False 

65 

66 import time 

67 

68 # Add 5 minute buffer before actual expiration 

69 return time.time() < (self._session_expires_at - 300) 

70 

71 def login(self): 

72 """ 

73 Login to Bluesky with session persistence 

74 """ 

75 # Use shared session if available and valid 

76 if self._is_session_valid(): 

77 self.session = self._shared_session 

78 logger.debug("Reusing existing Bluesky session") 

79 return 

80 

81 logger.debug("Creating new Bluesky session") 

82 auth_response = requests.post( 

83 f"{self.service}/xrpc/com.atproto.server.createSession", 

84 json={"identifier": self.identifier, "password": self.app_password}, 

85 ) 

86 auth_response.raise_for_status() 

87 self.session = auth_response.json() 

88 

89 # Store session data for reuse 

90 self._shared_session = self.session 

91 import time 

92 

93 # Sessions typically expire in 24 hours, but we'll refresh every 12 hours 

94 self._session_expires_at = time.time() + (12 * 60 * 60) 

95 

96 logger.debug( 

97 "Created new Bluesky session, expires at: %s", 

98 time.strftime( 

99 "%Y-%m-%d %H:%M:%S", time.localtime(self._session_expires_at) 

100 ), 

101 ) 

102 

103 @classmethod 

104 def clear_shared_session(cls): 

105 """ 

106 Clear the shared session (useful for testing) 

107 """ 

108 cls._shared_session = None 

109 cls._session_expires_at = None 

110 logger.debug("Cleared shared Bluesky session") 

111 

112 def normalize_handle(self, handle: str) -> str: 

113 """ 

114 Return the normalized handle for given handle 

115 """ 

116 # Normalize Bluesky handle in case someone enters an '@' at the beginning 

117 while handle.startswith("@"): 

118 handle = handle[1:] 

119 # Remove trailing spaces or spaces at the beginning 

120 while handle.startswith(" "): 

121 handle = handle[1:] 

122 while handle.endswith(" "): 

123 handle = handle[:-1] 

124 return handle 

125 

126 def _make_profile_request(self, handle: str): 

127 """ 

128 Make a profile request to Bluesky API with automatic retry on session expiration 

129 """ 

130 try: 

131 profile_response = requests.get( 

132 f"{self.service}/xrpc/app.bsky.actor.getProfile", 

133 headers={"Authorization": f'Bearer {self.session["accessJwt"]}'}, 

134 params={"actor": handle}, 

135 ) 

136 profile_response.raise_for_status() 

137 return profile_response.json() 

138 except requests.exceptions.HTTPError as exc: 

139 if exc.response.status_code == 401: 

140 # Session expired, try to login again 

141 logger.warning("Bluesky session expired, re-authenticating") 

142 self.clear_shared_session() 

143 self.login() 

144 # Retry the request 

145 profile_response = requests.get( 

146 f"{self.service}/xrpc/app.bsky.actor.getProfile", 

147 headers={"Authorization": f'Bearer {self.session["accessJwt"]}'}, 

148 params={"actor": handle}, 

149 ) 

150 profile_response.raise_for_status() 

151 return profile_response.json() 

152 else: 

153 logger.warning(f"Bluesky profile fetch failed with HTTP error: {exc}") 

154 return None 

155 except Exception as exc: 

156 logger.warning(f"Bluesky profile fetch failed with error: {exc}") 

157 return None 

158 

159 def get_profile(self, handle: str) -> str: 

160 if not self.session or not self._is_session_valid(): 

161 self.login() 

162 return self._make_profile_request(handle) 

163 

164 def get_avatar(self, handle: str): 

165 """ 

166 Get avatar URL for a handle 

167 """ 

168 profile = self.get_profile(handle) 

169 return profile["avatar"] if profile else None 

170 

171 

172def random_string(length=10): 

173 """ 

174 Return some random string with default length 10 

175 """ 

176 return "".join( 

177 random.SystemRandom().choice(string.ascii_lowercase + string.digits) 

178 for _ in range(length) 

179 ) 

180 

181 

182def random_ip_address(): 

183 """ 

184 Return a random IP address (IPv4) 

185 """ 

186 return f"{random.randint(1, 254)}.{random.randint(1, 254)}.{random.randint(1, 254)}.{random.randint(1, 254)}" 

187 

188 

189def openid_variations(openid): 

190 """ 

191 Return the various OpenID variations, ALWAYS in the same order: 

192 - http w/ trailing slash 

193 - http w/o trailing slash 

194 - https w/ trailing slash 

195 - https w/o trailing slash 

196 """ 

197 

198 # Make the 'base' version: http w/ trailing slash 

199 if openid.startswith("https://"): 

200 openid = openid.replace("https://", "http://") 

201 if openid[-1] != "/": 

202 openid = f"{openid}/" 

203 

204 # http w/o trailing slash 

205 var1 = openid[:-1] 

206 var2 = openid.replace("http://", "https://") 

207 var3 = var2[:-1] 

208 return (openid, var1, var2, var3) 

209 

210 

211def mm_ng( 

212 idhash, size=80, add_red=0, add_green=0, add_blue=0 

213): # pylint: disable=too-many-locals 

214 """ 

215 Return an MM (mystery man) image, based on a given hash 

216 add some red, green or blue, if specified 

217 """ 

218 

219 # Make sure the lightest bg color we paint is e0, else 

220 # we do not see the MM any more 

221 if idhash[0] == "f": 

222 idhash = "e0" 

223 

224 # How large is the circle? 

225 circle_size = size * 0.6 

226 

227 # Coordinates for the circle 

228 start_x = int(size * 0.2) 

229 end_x = start_x + circle_size 

230 start_y = int(size * 0.05) 

231 end_y = start_y + circle_size 

232 

233 # All are the same, based on the input hash 

234 # this should always result in a "gray-ish" background 

235 red = idhash[:2] 

236 green = idhash[:2] 

237 blue = idhash[:2] 

238 

239 # Add some red (i/a) and make sure it's not over 255 

240 red = hex(int(red, 16) + add_red).replace("0x", "") 

241 if int(red, 16) > 255: 

242 red = "ff" 

243 if len(red) == 1: 

244 red = f"0{red}" 

245 

246 # Add some green (i/a) and make sure it's not over 255 

247 green = hex(int(green, 16) + add_green).replace("0x", "") 

248 if int(green, 16) > 255: 

249 green = "ff" 

250 if len(green) == 1: 

251 green = f"0{green}" 

252 

253 # Add some blue (i/a) and make sure it's not over 255 

254 blue = hex(int(blue, 16) + add_blue).replace("0x", "") 

255 if int(blue, 16) > 255: 

256 blue = "ff" 

257 if len(blue) == 1: 

258 blue = f"0{blue}" 

259 

260 # Assemble the bg color "string" in web notation. Eg. '#d3d3d3' 

261 bg_color = f"#{red}{green}{blue}" 

262 

263 # Image 

264 image = Image.new("RGB", (size, size)) 

265 draw = ImageDraw.Draw(image) 

266 

267 # Draw background 

268 draw.rectangle(((0, 0), (size, size)), fill=bg_color) 

269 

270 # Draw MMs head 

271 draw.ellipse((start_x, start_y, end_x, end_y), fill="white") 

272 

273 # Draw MMs 'body' 

274 draw.polygon( 

275 ( 

276 (start_x + circle_size / 2, size / 2.5), 

277 (size * 0.15, size), 

278 (size - size * 0.15, size), 

279 ), 

280 fill="white", 

281 ) 

282 

283 return image 

284 

285 

286def is_trusted_url(url, url_filters): 

287 """ 

288 Check if a URL is valid and considered a trusted URL. 

289 If the URL is malformed, returns False. 

290 

291 Based on: https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/events/UrlFilter 

292 """ 

293 (scheme, netloc, path, params, query, fragment) = urlparse(url) 

294 

295 for ufilter in url_filters: 

296 if "schemes" in ufilter: 

297 schemes = ufilter["schemes"] 

298 

299 if scheme not in schemes: 

300 continue 

301 

302 if "host_equals" in ufilter: 

303 host_equals = ufilter["host_equals"] 

304 

305 if netloc != host_equals: 

306 continue 

307 

308 if "host_suffix" in ufilter: 

309 host_suffix = ufilter["host_suffix"] 

310 

311 if not netloc.endswith(host_suffix): 

312 continue 

313 

314 if "path_prefix" in ufilter: 

315 path_prefix = ufilter["path_prefix"] 

316 

317 if not path.startswith(path_prefix): 

318 continue 

319 

320 if "url_prefix" in ufilter: 

321 url_prefix = ufilter["url_prefix"] 

322 

323 if not url.startswith(url_prefix): 

324 continue 

325 

326 return True 

327 

328 return False 

329 

330 

331def resize_animated_gif(input_pil: Image, size: list) -> BytesIO: 

332 def _thumbnail_frames(image): 

333 for frame in ImageSequence.Iterator(image): 

334 new_frame = frame.copy() 

335 new_frame.thumbnail(size) 

336 yield new_frame 

337 

338 frames = list(_thumbnail_frames(input_pil)) 

339 output = BytesIO() 

340 output_image = frames[0] 

341 output_image.save( 

342 output, 

343 format="gif", 

344 save_all=True, 

345 optimize=False, 

346 append_images=frames[1:], 

347 disposal=input_pil.disposal_method, 

348 **input_pil.info, 

349 ) 

350 return output