D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
cl_website_collector
/
Filename :
docroot_processor.py
back
Copy
# -*- coding: utf-8 -*- # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2024 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging import os import time from pathlib import Path from typing import Dict, List, Optional, Any from cl_website_collector.constants import DOCROOT_EXCLUDE_DIRS, DOCROOT_MAX_DEPTH class DocrootProcessor: """ Processes individual docroot to collect .htaccess files and metadata. """ def __init__(self, logger: logging.Logger): self.logger = logger def collect_htaccess_paths(self, docroot: str, domains: list, username: str, timeout: int = 30) -> Optional[ Dict[str, Any]]: """ Collect .htaccess file paths from a docroot without reading file contents. Args: docroot: Document root path domains: Domain names username: Owner username timeout: Processing timeout in seconds Returns: Dictionary with collected file paths or None if failed """ start_time = time.time() result = { 'docroot': docroot, 'domains': domains, 'username': username, 'htaccess_file_paths': [], 'symlinks': [], 'timeout_reached': False, 'processing_time_seconds': 0, 'htaccess_files_found': 0, } try: self.logger.debug("Finding .htaccess files in %s", docroot) htaccess_files = self._find_htaccess_files(docroot, max_depth=DOCROOT_MAX_DEPTH, timeout=timeout - 5) self.logger.debug("Found %d .htaccess files in %s", len(htaccess_files), docroot) for file_path in htaccess_files: self.logger.debug(" - %s", file_path) if not htaccess_files: self.logger.debug("No .htaccess files found in %s", docroot) else: # Process each found file path (no content reading) for file_path in htaccess_files: if time.time() - start_time > timeout: result['timeout_reached'] = True self.logger.error("[WEBSITE-COLLECTOR] Timeout reached while collecting paths in %s", docroot) break try: self.logger.debug("Collecting .htaccess path: %s", file_path) # Handle symlinks p = Path(file_path) is_symlink = p.is_symlink() real_path = str(p.resolve(strict=False)) if is_symlink else file_path if is_symlink: result['symlinks'].append({ 'link': self._normalize_path(file_path, docroot), 'target': real_path }) # Check if file is readable if Path(real_path).exists() and os.access(real_path, os.R_OK): # Store file path info for on-demand reading location = self._normalize_path(file_path, docroot) result['htaccess_file_paths'].append({ 'location': location, 'file_path': file_path, 'real_path': real_path, 'is_symlink': is_symlink }) else: self.logger.debug("Cannot read file: %s", file_path) except Exception as e: self.logger.error("[WEBSITE-COLLECTOR] Error collecting path %s: %s", file_path, e) result['htaccess_files_found'] = len(result['htaccess_file_paths']) result['processing_time_seconds'] = time.time() - start_time self.logger.debug("Collected %d .htaccess file paths from %s in %.2fs", result['htaccess_files_found'], docroot, result['processing_time_seconds']) except Exception as e: self.logger.error("[WEBSITE-COLLECTOR] Error processing docroot %s: %s", docroot, e) return result def _find_htaccess_files(self, docroot: str, max_depth: int = DOCROOT_MAX_DEPTH, timeout: int = 25) -> List[str]: """ Find .htaccess files. """ start_time = time.time() htaccess_files = [] try: for root, dirs, files in os.walk(docroot): # Check timeout if time.time() - start_time > timeout: self.logger.error("[WEBSITE-COLLECTOR] os.walk timeout for %s", docroot) break # Calculate current depth robustly regardless of trailing separators if root == docroot: depth = 0 else: depth = os.path.relpath(root, docroot).count(os.sep) if depth >= max_depth: dirs[:] = [] # Don't go deeper, but still process files at this level # Apply exclusion filters for directories dirs[:] = [d for d in dirs if not self._should_exclude_directory(root, d)] # Look for .htaccess files if '.htaccess' in files: file_path = Path(root) / '.htaccess' # Consider empty .htaccess files as valid as well if (file_path.is_file() and os.access(str(file_path), os.R_OK)): htaccess_files.append(str(file_path)) except Exception as e: self.logger.error("[WEBSITE-COLLECTOR] Error walking %s: %s", docroot, e) return htaccess_files def _should_exclude_directory(self, parent_path: str, dirname: str) -> bool: """ Check if directory should be excluded based on DOCROOT_EXCLUDE_DIRS. Supports both plain directory names (e.g. "node_modules") and nested paths (e.g. "wp-content/cache"). The check is performed against the full candidate path composed from parent_path and dirname. """ try: candidate = Path(parent_path) / dirname candidate_normalized = candidate.resolve(strict=False) for exclude_dir in DOCROOT_EXCLUDE_DIRS: pattern = Path(exclude_dir) # Match exact directory name or nested path suffix if (str(candidate_normalized).endswith(os.sep + str(pattern)) or candidate.name == pattern.name): return True except Exception: # Be conservative on errors and do not exclude return False return False def _normalize_path(self, file_path: str, docroot: str) -> str: """ Normalize file path relative to docroot. """ try: return str(Path(file_path).relative_to(Path(docroot))) except ValueError: # If relative path calculation fails, return filename only return Path(file_path).name