Coverage for ivatar/file_security.py: 91%

151 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-24 23:06 +0000

1""" 

2File upload security utilities for ivatar 

3""" 

4 

5import hashlib 

6import logging 

7import magic 

8import os 

9from io import BytesIO 

10from typing import Dict, Tuple 

11 

12from PIL import Image 

13 

14# Initialize logger 

15logger = logging.getLogger("ivatar.security") 

16 

17# Security constants 

18ALLOWED_MIME_TYPES = [ 

19 "image/jpeg", 

20 "image/png", 

21 "image/gif", 

22 "image/webp", 

23 "image/bmp", 

24 "image/tiff", 

25] 

26 

27ALLOWED_EXTENSIONS = [".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff"] 

28 

29# Magic byte signatures for image formats 

30IMAGE_SIGNATURES = { 

31 b"\xff\xd8\xff": "image/jpeg", 

32 b"\x89PNG\r\n\x1a\n": "image/png", 

33 b"GIF87a": "image/gif", 

34 b"GIF89a": "image/gif", 

35 b"RIFF": "image/webp", # WebP starts with RIFF 

36 b"BM": "image/bmp", 

37 b"II*\x00": "image/tiff", # Little-endian TIFF 

38 b"MM\x00*": "image/tiff", # Big-endian TIFF 

39} 

40 

41# Maximum file size for different operations (in bytes) 

42MAX_FILE_SIZE_BASIC = 5 * 1024 * 1024 # 5MB for basic validation 

43MAX_FILE_SIZE_SCAN = 10 * 1024 * 1024 # 10MB for virus scanning 

44MAX_FILE_SIZE_PROCESS = 50 * 1024 * 1024 # 50MB for processing 

45 

46 

47class FileUploadSecurityError(Exception): 

48 """Custom exception for file upload security issues""" 

49 

50 pass 

51 

52 

53class FileValidator: 

54 """Comprehensive file validation for uploads""" 

55 

56 def __init__(self, file_data: bytes, filename: str): 

57 self.file_data = file_data 

58 self.filename = filename 

59 self.file_size = len(file_data) 

60 self.file_hash = hashlib.sha256(file_data).hexdigest() 

61 

62 def validate_basic(self) -> Dict[str, any]: 

63 """ 

64 Perform basic file validation 

65 Returns validation results dictionary 

66 """ 

67 results = { 

68 "valid": True, 

69 "errors": [], 

70 "warnings": [], 

71 "file_info": { 

72 "size": self.file_size, 

73 "hash": self.file_hash, 

74 "filename": self.filename, 

75 }, 

76 } 

77 

78 # Check file size 

79 if self.file_size > MAX_FILE_SIZE_BASIC: 

80 results["valid"] = False 

81 results["errors"].append(f"File too large: {self.file_size} bytes") 

82 

83 # Check filename 

84 if not self.filename or len(self.filename) > 255: 

85 results["valid"] = False 

86 results["errors"].append("Invalid filename") 

87 

88 # Check file extension 

89 ext = os.path.splitext(self.filename)[1].lower() 

90 if ext not in ALLOWED_EXTENSIONS: 

91 results["valid"] = False 

92 results["errors"].append(f"File extension not allowed: {ext}") 

93 

94 return results 

95 

96 def validate_magic_bytes(self) -> Dict[str, any]: 

97 """ 

98 Validate file using magic bytes (file signatures) 

99 """ 

100 results = {"valid": True, "detected_type": None, "errors": []} 

101 

102 # Check magic bytes 

103 detected_type = None 

104 for signature, mime_type in IMAGE_SIGNATURES.items(): 

105 if self.file_data.startswith(signature): 

106 detected_type = mime_type 

107 break 

108 

109 # Special handling for WebP (RIFF + WEBP) 

110 if self.file_data.startswith(b"RIFF") and b"WEBP" in self.file_data[:12]: 

111 detected_type = "image/webp" 

112 

113 if not detected_type: 

114 results["valid"] = False 

115 results["errors"].append( 

116 "File signature does not match any supported image format" 

117 ) 

118 else: 

119 results["detected_type"] = detected_type 

120 

121 return results 

122 

123 def validate_mime_type(self) -> Dict[str, any]: 

124 """ 

125 Validate MIME type using python-magic 

126 """ 

127 results = {"valid": True, "detected_mime": None, "errors": []} 

128 

129 try: 

130 # Use python-magic to detect MIME type 

131 detected_mime = magic.from_buffer(self.file_data, mime=True) 

132 results["detected_mime"] = detected_mime 

133 

134 if detected_mime not in ALLOWED_MIME_TYPES: 

135 results["valid"] = False 

136 results["errors"].append(f"MIME type not allowed: {detected_mime}") 

137 

138 except Exception as e: 

139 logger.warning(f"MIME type detection failed: {e}") 

140 results["warnings"].append("Could not detect MIME type") 

141 

142 return results 

143 

144 def validate_pil_image(self) -> Dict[str, any]: 

145 """ 

146 Validate using PIL to ensure it's a valid image 

147 """ 

148 results = {"valid": True, "image_info": {}, "errors": []} 

149 

150 try: 

151 # Open image with PIL 

152 image = Image.open(BytesIO(self.file_data)) 

153 

154 # Get image information 

155 results["image_info"] = { 

156 "format": image.format, 

157 "mode": image.mode, 

158 "size": image.size, 

159 "width": image.width, 

160 "height": image.height, 

161 "has_transparency": image.mode in ("RGBA", "LA", "P"), 

162 } 

163 

164 # Verify image can be loaded 

165 image.load() 

166 

167 # Check for suspicious characteristics 

168 if image.width > 10000 or image.height > 10000: 

169 results["warnings"].append("Image dimensions are very large") 

170 

171 if image.width < 1 or image.height < 1: 

172 results["valid"] = False 

173 results["errors"].append("Invalid image dimensions") 

174 

175 except Exception as e: 

176 results["valid"] = False 

177 results["errors"].append(f"Invalid image format: {str(e)}") 

178 

179 return results 

180 

181 def sanitize_exif_data(self) -> bytes: 

182 """ 

183 Remove EXIF data from image to prevent metadata leaks 

184 """ 

185 try: 

186 image = Image.open(BytesIO(self.file_data)) 

187 

188 # Create new image without EXIF data 

189 if image.mode in ("RGBA", "LA"): 

190 # Preserve transparency 

191 new_image = Image.new("RGBA", image.size, (255, 255, 255, 0)) 

192 new_image.paste(image, mask=image.split()[-1]) 

193 else: 

194 new_image = Image.new("RGB", image.size, (255, 255, 255)) 

195 new_image.paste(image) 

196 

197 # Save without EXIF data 

198 output = BytesIO() 

199 new_image.save(output, format=image.format or "JPEG", quality=95) 

200 return output.getvalue() 

201 

202 except Exception as e: 

203 logger.warning(f"EXIF sanitization failed: {e}") 

204 return self.file_data # Return original if sanitization fails 

205 

206 def scan_for_malicious_content(self) -> Dict[str, any]: 

207 """ 

208 Scan for potentially malicious content patterns 

209 """ 

210 results = {"suspicious": False, "threats": [], "warnings": []} 

211 

212 # Check for embedded scripts or executable content 

213 suspicious_patterns = [ 

214 b"<script", 

215 b"javascript:", 

216 b"vbscript:", 

217 b"data:text/html", 

218 b"<?php", 

219 b"<%", 

220 b"#!/bin/", 

221 b"MZ", # PE executable header 

222 b"\x7fELF", # ELF executable header 

223 ] 

224 

225 for pattern in suspicious_patterns: 

226 if pattern in self.file_data: 

227 results["suspicious"] = True 

228 results["threats"].append(f"Suspicious pattern detected: {pattern}") 

229 

230 # Check for polyglot files (valid in multiple formats) 

231 if self.file_data.startswith(b"GIF89a") and b"<script" in self.file_data: 

232 results["suspicious"] = True 

233 results["threats"].append("Potential polyglot attack detected") 

234 

235 return results 

236 

237 def comprehensive_validation(self) -> Dict[str, any]: 

238 """ 

239 Perform comprehensive file validation 

240 """ 

241 results = { 

242 "valid": True, 

243 "errors": [], 

244 "warnings": [], 

245 "file_info": {}, 

246 "security_score": 100, 

247 } 

248 

249 # Basic validation 

250 basic_results = self.validate_basic() 

251 if not basic_results["valid"]: 

252 results["valid"] = False 

253 results["errors"].extend(basic_results["errors"]) 

254 results["security_score"] -= 30 

255 

256 results["file_info"].update(basic_results["file_info"]) 

257 results["warnings"].extend(basic_results["warnings"]) 

258 

259 # Magic bytes validation 

260 magic_results = self.validate_magic_bytes() 

261 if not magic_results["valid"]: 

262 results["valid"] = False 

263 results["errors"].extend(magic_results["errors"]) 

264 results[ 

265 "security_score" 

266 ] -= 10 # Reduced from 25 - basic format issue, not security threat 

267 

268 results["file_info"]["detected_type"] = magic_results["detected_type"] 

269 

270 # MIME type validation 

271 mime_results = self.validate_mime_type() 

272 if not mime_results["valid"]: 

273 results["valid"] = False 

274 results["errors"].extend(mime_results["errors"]) 

275 results[ 

276 "security_score" 

277 ] -= 10 # Reduced from 20 - basic format issue, not security threat 

278 

279 results["file_info"]["detected_mime"] = mime_results["detected_mime"] 

280 results["warnings"].extend(mime_results.get("warnings", [])) 

281 

282 # PIL image validation 

283 pil_results = self.validate_pil_image() 

284 if not pil_results["valid"]: 

285 results["valid"] = False 

286 results["errors"].extend(pil_results["errors"]) 

287 results[ 

288 "security_score" 

289 ] -= 10 # Reduced from 15 - basic format issue, not security threat 

290 

291 results["file_info"]["image_info"] = pil_results["image_info"] 

292 results["warnings"].extend(pil_results.get("warnings", [])) 

293 

294 # Security scan 

295 security_results = self.scan_for_malicious_content() 

296 if security_results["suspicious"]: 

297 results["valid"] = False 

298 results["errors"].extend(security_results["threats"]) 

299 results["security_score"] -= 50 

300 

301 results["warnings"].extend(security_results.get("warnings", [])) 

302 

303 # Log security events 

304 if not results["valid"]: 

305 logger.warning(f"File upload validation failed: {results['errors']}") 

306 elif results["security_score"] < 80: 

307 logger.info( 

308 f"File upload with low security score: {results['security_score']}" 

309 ) 

310 

311 return results 

312 

313 

314def validate_uploaded_file( 

315 file_data: bytes, filename: str 

316) -> Tuple[bool, Dict[str, any], bytes]: 

317 """ 

318 Main function to validate uploaded files 

319 

320 Returns: 

321 (is_valid, validation_results, sanitized_data) 

322 """ 

323 validator = FileValidator(file_data, filename) 

324 

325 # Perform comprehensive validation 

326 results = validator.comprehensive_validation() 

327 

328 if not results["valid"]: 

329 return False, results, file_data 

330 

331 # Sanitize EXIF data 

332 sanitized_data = validator.sanitize_exif_data() 

333 

334 return True, results, sanitized_data 

335 

336 

337def get_file_security_report(file_data: bytes, filename: str) -> Dict[str, any]: 

338 """ 

339 Generate a security report for a file without modifying it 

340 """ 

341 validator = FileValidator(file_data, filename) 

342 return validator.comprehensive_validation()