Coverage for ivatar/file_security.py: 91%
151 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-24 23:06 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-24 23:06 +0000
1"""
2File upload security utilities for ivatar
3"""
5import hashlib
6import logging
7import magic
8import os
9from io import BytesIO
10from typing import Dict, Tuple
12from PIL import Image
14# Initialize logger
15logger = logging.getLogger("ivatar.security")
17# Security constants
18ALLOWED_MIME_TYPES = [
19 "image/jpeg",
20 "image/png",
21 "image/gif",
22 "image/webp",
23 "image/bmp",
24 "image/tiff",
25]
27ALLOWED_EXTENSIONS = [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff"]
29# Magic byte signatures for image formats
30IMAGE_SIGNATURES = {
31 b"\xff\xd8\xff": "image/jpeg",
32 b"\x89PNG\r\n\x1a\n": "image/png",
33 b"GIF87a": "image/gif",
34 b"GIF89a": "image/gif",
35 b"RIFF": "image/webp", # WebP starts with RIFF
36 b"BM": "image/bmp",
37 b"II*\x00": "image/tiff", # Little-endian TIFF
38 b"MM\x00*": "image/tiff", # Big-endian TIFF
39}
41# Maximum file size for different operations (in bytes)
42MAX_FILE_SIZE_BASIC = 5 * 1024 * 1024 # 5MB for basic validation
43MAX_FILE_SIZE_SCAN = 10 * 1024 * 1024 # 10MB for virus scanning
44MAX_FILE_SIZE_PROCESS = 50 * 1024 * 1024 # 50MB for processing
47class FileUploadSecurityError(Exception):
48 """Custom exception for file upload security issues"""
50 pass
53class FileValidator:
54 """Comprehensive file validation for uploads"""
56 def __init__(self, file_data: bytes, filename: str):
57 self.file_data = file_data
58 self.filename = filename
59 self.file_size = len(file_data)
60 self.file_hash = hashlib.sha256(file_data).hexdigest()
62 def validate_basic(self) -> Dict[str, any]:
63 """
64 Perform basic file validation
65 Returns validation results dictionary
66 """
67 results = {
68 "valid": True,
69 "errors": [],
70 "warnings": [],
71 "file_info": {
72 "size": self.file_size,
73 "hash": self.file_hash,
74 "filename": self.filename,
75 },
76 }
78 # Check file size
79 if self.file_size > MAX_FILE_SIZE_BASIC:
80 results["valid"] = False
81 results["errors"].append(f"File too large: {self.file_size} bytes")
83 # Check filename
84 if not self.filename or len(self.filename) > 255:
85 results["valid"] = False
86 results["errors"].append("Invalid filename")
88 # Check file extension
89 ext = os.path.splitext(self.filename)[1].lower()
90 if ext not in ALLOWED_EXTENSIONS:
91 results["valid"] = False
92 results["errors"].append(f"File extension not allowed: {ext}")
94 return results
96 def validate_magic_bytes(self) -> Dict[str, any]:
97 """
98 Validate file using magic bytes (file signatures)
99 """
100 results = {"valid": True, "detected_type": None, "errors": []}
102 # Check magic bytes
103 detected_type = None
104 for signature, mime_type in IMAGE_SIGNATURES.items():
105 if self.file_data.startswith(signature):
106 detected_type = mime_type
107 break
109 # Special handling for WebP (RIFF + WEBP)
110 if self.file_data.startswith(b"RIFF") and b"WEBP" in self.file_data[:12]:
111 detected_type = "image/webp"
113 if not detected_type:
114 results["valid"] = False
115 results["errors"].append(
116 "File signature does not match any supported image format"
117 )
118 else:
119 results["detected_type"] = detected_type
121 return results
123 def validate_mime_type(self) -> Dict[str, any]:
124 """
125 Validate MIME type using python-magic
126 """
127 results = {"valid": True, "detected_mime": None, "errors": []}
129 try:
130 # Use python-magic to detect MIME type
131 detected_mime = magic.from_buffer(self.file_data, mime=True)
132 results["detected_mime"] = detected_mime
134 if detected_mime not in ALLOWED_MIME_TYPES:
135 results["valid"] = False
136 results["errors"].append(f"MIME type not allowed: {detected_mime}")
138 except Exception as e:
139 logger.warning(f"MIME type detection failed: {e}")
140 results["warnings"].append("Could not detect MIME type")
142 return results
144 def validate_pil_image(self) -> Dict[str, any]:
145 """
146 Validate using PIL to ensure it's a valid image
147 """
148 results = {"valid": True, "image_info": {}, "errors": []}
150 try:
151 # Open image with PIL
152 image = Image.open(BytesIO(self.file_data))
154 # Get image information
155 results["image_info"] = {
156 "format": image.format,
157 "mode": image.mode,
158 "size": image.size,
159 "width": image.width,
160 "height": image.height,
161 "has_transparency": image.mode in ("RGBA", "LA", "P"),
162 }
164 # Verify image can be loaded
165 image.load()
167 # Check for suspicious characteristics
168 if image.width > 10000 or image.height > 10000:
169 results["warnings"].append("Image dimensions are very large")
171 if image.width < 1 or image.height < 1:
172 results["valid"] = False
173 results["errors"].append("Invalid image dimensions")
175 except Exception as e:
176 results["valid"] = False
177 results["errors"].append(f"Invalid image format: {str(e)}")
179 return results
181 def sanitize_exif_data(self) -> bytes:
182 """
183 Remove EXIF data from image to prevent metadata leaks
184 """
185 try:
186 image = Image.open(BytesIO(self.file_data))
188 # Create new image without EXIF data
189 if image.mode in ("RGBA", "LA"):
190 # Preserve transparency
191 new_image = Image.new("RGBA", image.size, (255, 255, 255, 0))
192 new_image.paste(image, mask=image.split()[-1])
193 else:
194 new_image = Image.new("RGB", image.size, (255, 255, 255))
195 new_image.paste(image)
197 # Save without EXIF data
198 output = BytesIO()
199 new_image.save(output, format=image.format or "JPEG", quality=95)
200 return output.getvalue()
202 except Exception as e:
203 logger.warning(f"EXIF sanitization failed: {e}")
204 return self.file_data # Return original if sanitization fails
206 def scan_for_malicious_content(self) -> Dict[str, any]:
207 """
208 Scan for potentially malicious content patterns
209 """
210 results = {"suspicious": False, "threats": [], "warnings": []}
212 # Check for embedded scripts or executable content
213 suspicious_patterns = [
214 b"<script",
215 b"javascript:",
216 b"vbscript:",
217 b"data:text/html",
218 b"<?php",
219 b"<%",
220 b"#!/bin/",
221 b"MZ", # PE executable header
222 b"\x7fELF", # ELF executable header
223 ]
225 for pattern in suspicious_patterns:
226 if pattern in self.file_data:
227 results["suspicious"] = True
228 results["threats"].append(f"Suspicious pattern detected: {pattern}")
230 # Check for polyglot files (valid in multiple formats)
231 if self.file_data.startswith(b"GIF89a") and b"<script" in self.file_data:
232 results["suspicious"] = True
233 results["threats"].append("Potential polyglot attack detected")
235 return results
237 def comprehensive_validation(self) -> Dict[str, any]:
238 """
239 Perform comprehensive file validation
240 """
241 results = {
242 "valid": True,
243 "errors": [],
244 "warnings": [],
245 "file_info": {},
246 "security_score": 100,
247 }
249 # Basic validation
250 basic_results = self.validate_basic()
251 if not basic_results["valid"]:
252 results["valid"] = False
253 results["errors"].extend(basic_results["errors"])
254 results["security_score"] -= 30
256 results["file_info"].update(basic_results["file_info"])
257 results["warnings"].extend(basic_results["warnings"])
259 # Magic bytes validation
260 magic_results = self.validate_magic_bytes()
261 if not magic_results["valid"]:
262 results["valid"] = False
263 results["errors"].extend(magic_results["errors"])
264 results[
265 "security_score"
266 ] -= 10 # Reduced from 25 - basic format issue, not security threat
268 results["file_info"]["detected_type"] = magic_results["detected_type"]
270 # MIME type validation
271 mime_results = self.validate_mime_type()
272 if not mime_results["valid"]:
273 results["valid"] = False
274 results["errors"].extend(mime_results["errors"])
275 results[
276 "security_score"
277 ] -= 10 # Reduced from 20 - basic format issue, not security threat
279 results["file_info"]["detected_mime"] = mime_results["detected_mime"]
280 results["warnings"].extend(mime_results.get("warnings", []))
282 # PIL image validation
283 pil_results = self.validate_pil_image()
284 if not pil_results["valid"]:
285 results["valid"] = False
286 results["errors"].extend(pil_results["errors"])
287 results[
288 "security_score"
289 ] -= 10 # Reduced from 15 - basic format issue, not security threat
291 results["file_info"]["image_info"] = pil_results["image_info"]
292 results["warnings"].extend(pil_results.get("warnings", []))
294 # Security scan
295 security_results = self.scan_for_malicious_content()
296 if security_results["suspicious"]:
297 results["valid"] = False
298 results["errors"].extend(security_results["threats"])
299 results["security_score"] -= 50
301 results["warnings"].extend(security_results.get("warnings", []))
303 # Log security events
304 if not results["valid"]:
305 logger.warning(f"File upload validation failed: {results['errors']}")
306 elif results["security_score"] < 80:
307 logger.info(
308 f"File upload with low security score: {results['security_score']}"
309 )
311 return results
314def validate_uploaded_file(
315 file_data: bytes, filename: str
316) -> Tuple[bool, Dict[str, any], bytes]:
317 """
318 Main function to validate uploaded files
320 Returns:
321 (is_valid, validation_results, sanitized_data)
322 """
323 validator = FileValidator(file_data, filename)
325 # Perform comprehensive validation
326 results = validator.comprehensive_validation()
328 if not results["valid"]:
329 return False, results, file_data
331 # Sanitize EXIF data
332 sanitized_data = validator.sanitize_exif_data()
334 return True, results, sanitized_data
337def get_file_security_report(file_data: bytes, filename: str) -> Dict[str, any]:
338 """
339 Generate a security report for a file without modifying it
340 """
341 validator = FileValidator(file_data, filename)
342 return validator.comprehensive_validation()