Coverage for ivatar/utils.py: 65%
172 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 23:06 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-20 23:06 +0000
1# -*- coding: utf-8 -*-
2"""
3Simple module providing reusable random_string function
4"""
6import contextlib
7import random
8import string
9import logging
10from io import BytesIO
11from PIL import Image, ImageDraw, ImageSequence
12from urllib.parse import urlparse
13import requests
14from ivatar.settings import DEBUG, URL_TIMEOUT
15from urllib.request import urlopen as urlopen_orig
17# Initialize logger
18logger = logging.getLogger("ivatar")
20BLUESKY_IDENTIFIER = None
21BLUESKY_APP_PASSWORD = None
22with contextlib.suppress(Exception):
23 from ivatar.settings import BLUESKY_IDENTIFIER, BLUESKY_APP_PASSWORD
26def urlopen(url, timeout=URL_TIMEOUT):
27 ctx = None
28 if DEBUG:
29 import ssl
31 ctx = ssl.create_default_context()
32 ctx.check_hostname = False
33 ctx.verify_mode = ssl.CERT_NONE
34 return urlopen_orig(url, timeout=timeout, context=ctx)
37class Bluesky:
38 """
39 Handle Bluesky client access with persistent session management
40 """
42 identifier = ""
43 app_password = ""
44 service = "https://bsky.social"
45 session = None
46 _shared_session = None # Class-level shared session
47 _session_expires_at = None # Track session expiration
49 def __init__(
50 self,
51 identifier: str = BLUESKY_IDENTIFIER,
52 app_password: str = BLUESKY_APP_PASSWORD,
53 service: str = "https://bsky.social",
54 ):
55 self.identifier = identifier
56 self.app_password = app_password
57 self.service = service
59 def _is_session_valid(self) -> bool:
60 """
61 Check if the current session is still valid
62 """
63 if not self._shared_session or not self._session_expires_at:
64 return False
66 import time
68 # Add 5 minute buffer before actual expiration
69 return time.time() < (self._session_expires_at - 300)
71 def login(self):
72 """
73 Login to Bluesky with session persistence
74 """
75 # Use shared session if available and valid
76 if self._is_session_valid():
77 self.session = self._shared_session
78 logger.debug("Reusing existing Bluesky session")
79 return
81 logger.debug("Creating new Bluesky session")
82 auth_response = requests.post(
83 f"{self.service}/xrpc/com.atproto.server.createSession",
84 json={"identifier": self.identifier, "password": self.app_password},
85 )
86 auth_response.raise_for_status()
87 self.session = auth_response.json()
89 # Store session data for reuse
90 self._shared_session = self.session
91 import time
93 # Sessions typically expire in 24 hours, but we'll refresh every 12 hours
94 self._session_expires_at = time.time() + (12 * 60 * 60)
96 logger.debug(
97 "Created new Bluesky session, expires at: %s",
98 time.strftime(
99 "%Y-%m-%d %H:%M:%S", time.localtime(self._session_expires_at)
100 ),
101 )
103 @classmethod
104 def clear_shared_session(cls):
105 """
106 Clear the shared session (useful for testing)
107 """
108 cls._shared_session = None
109 cls._session_expires_at = None
110 logger.debug("Cleared shared Bluesky session")
112 def normalize_handle(self, handle: str) -> str:
113 """
114 Return the normalized handle for given handle
115 """
116 # Normalize Bluesky handle in case someone enters an '@' at the beginning
117 while handle.startswith("@"):
118 handle = handle[1:]
119 # Remove trailing spaces or spaces at the beginning
120 while handle.startswith(" "):
121 handle = handle[1:]
122 while handle.endswith(" "):
123 handle = handle[:-1]
124 return handle
126 def _make_profile_request(self, handle: str):
127 """
128 Make a profile request to Bluesky API with automatic retry on session expiration
129 """
130 try:
131 profile_response = requests.get(
132 f"{self.service}/xrpc/app.bsky.actor.getProfile",
133 headers={"Authorization": f'Bearer {self.session["accessJwt"]}'},
134 params={"actor": handle},
135 )
136 profile_response.raise_for_status()
137 return profile_response.json()
138 except requests.exceptions.HTTPError as exc:
139 if exc.response.status_code == 401:
140 # Session expired, try to login again
141 logger.warning("Bluesky session expired, re-authenticating")
142 self.clear_shared_session()
143 self.login()
144 # Retry the request
145 profile_response = requests.get(
146 f"{self.service}/xrpc/app.bsky.actor.getProfile",
147 headers={"Authorization": f'Bearer {self.session["accessJwt"]}'},
148 params={"actor": handle},
149 )
150 profile_response.raise_for_status()
151 return profile_response.json()
152 else:
153 logger.warning(f"Bluesky profile fetch failed with HTTP error: {exc}")
154 return None
155 except Exception as exc:
156 logger.warning(f"Bluesky profile fetch failed with error: {exc}")
157 return None
159 def get_profile(self, handle: str) -> str:
160 if not self.session or not self._is_session_valid():
161 self.login()
162 return self._make_profile_request(handle)
164 def get_avatar(self, handle: str):
165 """
166 Get avatar URL for a handle
167 """
168 profile = self.get_profile(handle)
169 return profile["avatar"] if profile else None
172def random_string(length=10):
173 """
174 Return some random string with default length 10
175 """
176 return "".join(
177 random.SystemRandom().choice(string.ascii_lowercase + string.digits)
178 for _ in range(length)
179 )
182def random_ip_address():
183 """
184 Return a random IP address (IPv4)
185 """
186 return f"{random.randint(1, 254)}.{random.randint(1, 254)}.{random.randint(1, 254)}.{random.randint(1, 254)}"
189def openid_variations(openid):
190 """
191 Return the various OpenID variations, ALWAYS in the same order:
192 - http w/ trailing slash
193 - http w/o trailing slash
194 - https w/ trailing slash
195 - https w/o trailing slash
196 """
198 # Make the 'base' version: http w/ trailing slash
199 if openid.startswith("https://"):
200 openid = openid.replace("https://", "http://")
201 if openid[-1] != "/":
202 openid = f"{openid}/"
204 # http w/o trailing slash
205 var1 = openid[:-1]
206 var2 = openid.replace("http://", "https://")
207 var3 = var2[:-1]
208 return (openid, var1, var2, var3)
211def mm_ng(
212 idhash, size=80, add_red=0, add_green=0, add_blue=0
213): # pylint: disable=too-many-locals
214 """
215 Return an MM (mystery man) image, based on a given hash
216 add some red, green or blue, if specified
217 """
219 # Make sure the lightest bg color we paint is e0, else
220 # we do not see the MM any more
221 if idhash[0] == "f":
222 idhash = "e0"
224 # How large is the circle?
225 circle_size = size * 0.6
227 # Coordinates for the circle
228 start_x = int(size * 0.2)
229 end_x = start_x + circle_size
230 start_y = int(size * 0.05)
231 end_y = start_y + circle_size
233 # All are the same, based on the input hash
234 # this should always result in a "gray-ish" background
235 red = idhash[:2]
236 green = idhash[:2]
237 blue = idhash[:2]
239 # Add some red (i/a) and make sure it's not over 255
240 red = hex(int(red, 16) + add_red).replace("0x", "")
241 if int(red, 16) > 255:
242 red = "ff"
243 if len(red) == 1:
244 red = f"0{red}"
246 # Add some green (i/a) and make sure it's not over 255
247 green = hex(int(green, 16) + add_green).replace("0x", "")
248 if int(green, 16) > 255:
249 green = "ff"
250 if len(green) == 1:
251 green = f"0{green}"
253 # Add some blue (i/a) and make sure it's not over 255
254 blue = hex(int(blue, 16) + add_blue).replace("0x", "")
255 if int(blue, 16) > 255:
256 blue = "ff"
257 if len(blue) == 1:
258 blue = f"0{blue}"
260 # Assemble the bg color "string" in web notation. Eg. '#d3d3d3'
261 bg_color = f"#{red}{green}{blue}"
263 # Image
264 image = Image.new("RGB", (size, size))
265 draw = ImageDraw.Draw(image)
267 # Draw background
268 draw.rectangle(((0, 0), (size, size)), fill=bg_color)
270 # Draw MMs head
271 draw.ellipse((start_x, start_y, end_x, end_y), fill="white")
273 # Draw MMs 'body'
274 draw.polygon(
275 (
276 (start_x + circle_size / 2, size / 2.5),
277 (size * 0.15, size),
278 (size - size * 0.15, size),
279 ),
280 fill="white",
281 )
283 return image
286def is_trusted_url(url, url_filters):
287 """
288 Check if a URL is valid and considered a trusted URL.
289 If the URL is malformed, returns False.
291 Based on: https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/events/UrlFilter
292 """
293 (scheme, netloc, path, params, query, fragment) = urlparse(url)
295 for ufilter in url_filters:
296 if "schemes" in ufilter:
297 schemes = ufilter["schemes"]
299 if scheme not in schemes:
300 continue
302 if "host_equals" in ufilter:
303 host_equals = ufilter["host_equals"]
305 if netloc != host_equals:
306 continue
308 if "host_suffix" in ufilter:
309 host_suffix = ufilter["host_suffix"]
311 if not netloc.endswith(host_suffix):
312 continue
314 if "path_prefix" in ufilter:
315 path_prefix = ufilter["path_prefix"]
317 if not path.startswith(path_prefix):
318 continue
320 if "url_prefix" in ufilter:
321 url_prefix = ufilter["url_prefix"]
323 if not url.startswith(url_prefix):
324 continue
326 return True
328 return False
331def resize_animated_gif(input_pil: Image, size: list) -> BytesIO:
332 def _thumbnail_frames(image):
333 for frame in ImageSequence.Iterator(image):
334 new_frame = frame.copy()
335 new_frame.thumbnail(size)
336 yield new_frame
338 frames = list(_thumbnail_frames(input_pil))
339 output = BytesIO()
340 output_image = frames[0]
341 output_image.save(
342 output,
343 format="gif",
344 save_all=True,
345 optimize=False,
346 append_images=frames[1:],
347 disposal=input_pil.disposal_method,
348 **input_pil.info,
349 )
350 return output