Coverage for ivatar/utils.py: 66%
136 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-12 23:12 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-12 23:12 +0000
1# -*- coding: utf-8 -*-
2"""
3Simple module providing reusable random_string function
4"""
6import contextlib
7import random
8import string
9from io import BytesIO
10from PIL import Image, ImageDraw, ImageSequence
11from urllib.parse import urlparse
12import requests
13from ivatar.settings import DEBUG, URL_TIMEOUT
14from urllib.request import urlopen as urlopen_orig
16BLUESKY_IDENTIFIER = None
17BLUESKY_APP_PASSWORD = None
18with contextlib.suppress(Exception):
19 from ivatar.settings import BLUESKY_IDENTIFIER, BLUESKY_APP_PASSWORD
22def urlopen(url, timeout=URL_TIMEOUT):
23 ctx = None
24 if DEBUG:
25 import ssl
27 ctx = ssl.create_default_context()
28 ctx.check_hostname = False
29 ctx.verify_mode = ssl.CERT_NONE
30 return urlopen_orig(url, timeout=timeout, context=ctx)
33class Bluesky:
34 """
35 Handle Bluesky client access
36 """
38 identifier = ""
39 app_password = ""
40 service = "https://bsky.social"
41 session = None
43 def __init__(
44 self,
45 identifier: str = BLUESKY_IDENTIFIER,
46 app_password: str = BLUESKY_APP_PASSWORD,
47 service: str = "https://bsky.social",
48 ):
49 self.identifier = identifier
50 self.app_password = app_password
51 self.service = service
53 def login(self):
54 """
55 Login to Bluesky
56 """
57 auth_response = requests.post(
58 f"{self.service}/xrpc/com.atproto.server.createSession",
59 json={"identifier": self.identifier, "password": self.app_password},
60 )
61 auth_response.raise_for_status()
62 self.session = auth_response.json()
64 def normalize_handle(self, handle: str) -> str:
65 """
66 Return the normalized handle for given handle
67 """
68 # Normalize Bluesky handle in case someone enters an '@' at the beginning
69 while handle.startswith("@"):
70 handle = handle[1:]
71 # Remove trailing spaces or spaces at the beginning
72 while handle.startswith(" "):
73 handle = handle[1:]
74 while handle.endswith(" "):
75 handle = handle[:-1]
76 return handle
78 def get_profile(self, handle: str) -> str:
79 if not self.session:
80 self.login()
81 profile_response = None
83 try:
84 profile_response = requests.get(
85 f"{self.service}/xrpc/app.bsky.actor.getProfile",
86 headers={"Authorization": f'Bearer {self.session["accessJwt"]}'},
87 params={"actor": handle},
88 )
89 profile_response.raise_for_status()
90 except Exception as exc:
91 print(f"Bluesky profile fetch failed with HTTP error: {exc}")
92 return None
94 return profile_response.json()
96 def get_avatar(self, handle: str):
97 """
98 Get avatar URL for a handle
99 """
100 profile = self.get_profile(handle)
101 return profile["avatar"] if profile else None
104def random_string(length=10):
105 """
106 Return some random string with default length 10
107 """
108 return "".join(
109 random.SystemRandom().choice(string.ascii_lowercase + string.digits)
110 for _ in range(length)
111 )
114def openid_variations(openid):
115 """
116 Return the various OpenID variations, ALWAYS in the same order:
117 - http w/ trailing slash
118 - http w/o trailing slash
119 - https w/ trailing slash
120 - https w/o trailing slash
121 """
123 # Make the 'base' version: http w/ trailing slash
124 if openid.startswith("https://"):
125 openid = openid.replace("https://", "http://")
126 if openid[-1] != "/":
127 openid = f"{openid}/"
129 # http w/o trailing slash
130 var1 = openid[:-1]
131 var2 = openid.replace("http://", "https://")
132 var3 = var2[:-1]
133 return (openid, var1, var2, var3)
136def mm_ng(
137 idhash, size=80, add_red=0, add_green=0, add_blue=0
138): # pylint: disable=too-many-locals
139 """
140 Return an MM (mystery man) image, based on a given hash
141 add some red, green or blue, if specified
142 """
144 # Make sure the lightest bg color we paint is e0, else
145 # we do not see the MM any more
146 if idhash[0] == "f":
147 idhash = "e0"
149 # How large is the circle?
150 circle_size = size * 0.6
152 # Coordinates for the circle
153 start_x = int(size * 0.2)
154 end_x = start_x + circle_size
155 start_y = int(size * 0.05)
156 end_y = start_y + circle_size
158 # All are the same, based on the input hash
159 # this should always result in a "gray-ish" background
160 red = idhash[:2]
161 green = idhash[:2]
162 blue = idhash[:2]
164 # Add some red (i/a) and make sure it's not over 255
165 red = hex(int(red, 16) + add_red).replace("0x", "")
166 if int(red, 16) > 255:
167 red = "ff"
168 if len(red) == 1:
169 red = f"0{red}"
171 # Add some green (i/a) and make sure it's not over 255
172 green = hex(int(green, 16) + add_green).replace("0x", "")
173 if int(green, 16) > 255:
174 green = "ff"
175 if len(green) == 1:
176 green = f"0{green}"
178 # Add some blue (i/a) and make sure it's not over 255
179 blue = hex(int(blue, 16) + add_blue).replace("0x", "")
180 if int(blue, 16) > 255:
181 blue = "ff"
182 if len(blue) == 1:
183 blue = f"0{blue}"
185 # Assemble the bg color "string" in web notation. Eg. '#d3d3d3'
186 bg_color = f"#{red}{green}{blue}"
188 # Image
189 image = Image.new("RGB", (size, size))
190 draw = ImageDraw.Draw(image)
192 # Draw background
193 draw.rectangle(((0, 0), (size, size)), fill=bg_color)
195 # Draw MMs head
196 draw.ellipse((start_x, start_y, end_x, end_y), fill="white")
198 # Draw MMs 'body'
199 draw.polygon(
200 (
201 (start_x + circle_size / 2, size / 2.5),
202 (size * 0.15, size),
203 (size - size * 0.15, size),
204 ),
205 fill="white",
206 )
208 return image
211def is_trusted_url(url, url_filters):
212 """
213 Check if a URL is valid and considered a trusted URL.
214 If the URL is malformed, returns False.
216 Based on: https://developer.mozilla.org/en-US/docs/Mozilla/Add-ons/WebExtensions/API/events/UrlFilter
217 """
218 (scheme, netloc, path, params, query, fragment) = urlparse(url)
220 for filter in url_filters:
221 if "schemes" in filter:
222 schemes = filter["schemes"]
224 if scheme not in schemes:
225 continue
227 if "host_equals" in filter:
228 host_equals = filter["host_equals"]
230 if netloc != host_equals:
231 continue
233 if "host_suffix" in filter:
234 host_suffix = filter["host_suffix"]
236 if not netloc.endswith(host_suffix):
237 continue
239 if "path_prefix" in filter:
240 path_prefix = filter["path_prefix"]
242 if not path.startswith(path_prefix):
243 continue
245 if "url_prefix" in filter:
246 url_prefix = filter["url_prefix"]
248 if not url.startswith(url_prefix):
249 continue
251 return True
253 return False
256def resize_animated_gif(input_pil: Image, size: list) -> BytesIO:
257 def _thumbnail_frames(image):
258 for frame in ImageSequence.Iterator(image):
259 new_frame = frame.copy()
260 new_frame.thumbnail(size)
261 yield new_frame
263 frames = list(_thumbnail_frames(input_pil))
264 output = BytesIO()
265 output_image = frames[0]
266 output_image.save(
267 output,
268 format="gif",
269 save_all=True,
270 optimize=False,
271 append_images=frames[1:],
272 disposal=input_pil.disposal_method,
273 **input_pil.info,
274 )
275 return output