Начинающий
- Статус
- Оффлайн
- Регистрация
- 5 Янв 2023
- Сообщения
- 65
- Реакции
- 2
Короче тупо гпт+хороший промт. Защита есть от всяких ошибок, много всяких фишечек.
ss-
сам исходник
ss-
Пожалуйста, авторизуйтесь для просмотра ссылки.
сам исходник
Downloader.py:
#!/usr/bin/env python3
"""
Advanced Website Downloader with improved:
[LIST]
[*]Error handling
[*]Performance
[*]Resource management
[*]Code organization
[*]Type safety
[*]Documentation
[/LIST]
"""
import argparse
import concurrent.futures
import logging
import os
import re
import sys
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from urllib.parse import urljoin, urlparse, urlunparse
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Constants
DEFAULT_USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
DEFAULT_TIMEOUT = 15
MAX_RETRIES = 3
BACKOFF_FACTOR = 1
MAX_WORKERS = 10
CHUNK_SIZE = 8192
VALID_CONTENT_TYPES = {
'text/html',
'text/css',
'application/javascript',
'image/jpeg',
'image/png',
'image/gif',
'image/webp',
'image/svg+xml',
'application/font-woff',
'application/font-woff2',
'application/x-font-ttf',
'application/vnd.ms-fontobject'
}
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('website_downloader.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger([B]name[/B])
class WebsiteDownloader:
"""Main class for website downloading functionality."""
def [B]init[/B](self):
self.session = self._create_session()
self.visited_urls = set()
self.assets_downloaded = set()
self.total_downloaded = 0
self.start_time = time.time()
def _create_session(self) -> requests.Session:
"""Create a configured requests session with retry logic."""
session = requests.Session()
retry = Retry(
total=MAX_RETRIES,
backoff_factor=BACKOFF_FACTOR,
status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
session.headers.update({'User-Agent': DEFAULT_USER_AGENT})
return session
@staticmethod
def validate_url(url: str) -> Optional[str]:
"""Validate and normalize the given URL."""
try:
parsed = urlparse(url)
if not parsed.scheme:
url = f'https://{url}'
parsed = urlparse(url)
if not all([parsed.scheme, parsed.netloc]):
raise ValueError("Invalid URL - missing scheme or netloc")
# Normalize URL by removing fragments and query parameters for the main page
if not parsed.path or parsed.path == '/':
url = urlunparse(parsed._replace(query='', fragment=''))
return url
except Exception as e:
logger.error(f"URL validation failed: {e}")
return None
@staticmethod
def create_directory(path: Union[str, Path]) -> Path:
"""Create directory structure if it doesn't exist."""
path = Path(path)
try:
path.mkdir(parents=True, exist_ok=True)
logger.debug(f"Created directory: {path}")
return path
except Exception as e:
logger.error(f"Failed to create directory {path}: {e}")
raise
def download_file(self, url: str, save_path: Path) -> bool:
"""Download a file from URL to specified path with proper error handling."""
if url in self.assets_downloaded:
logger.debug(f"Skipping already downloaded asset: {url}")
return True
try:
response = self.session.get(url, stream=True, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
# Check content type
content_type = response.headers.get('Content-Type', '').split(';')[0]
if content_type not in VALID_CONTENT_TYPES:
logger.warning(f"Skipping unsupported content type {content_type} from {url}")
return False
# Ensure filename is safe and unique
save_path = self._ensure_unique_filename(save_path)
with save_path.open('wb') as f, tqdm(
unit='B',
unit_scale=True,
unit_divisor=1024,
miniters=1,
desc=save_path.name,
total=int(response.headers.get('content-length', 0))
) as pbar:
for chunk in response.iter_content(chunk_size=CHUNK_SIZE):
if chunk: # filter out keep-alive chunks
f.write(chunk)
pbar.update(len(chunk))
self.assets_downloaded.add(url)
self.total_downloaded += 1
logger.debug(f"Downloaded: {url} -> {save_path}")
return True
except requests.RequestException as e:
logger.error(f"Failed to download {url}: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error downloading {url}: {e}")
return False
@staticmethod
def _ensure_unique_filename(path: Path) -> Path:
"""Ensure filename is unique by adding suffix if needed."""
counter = 1
original_stem = path.stem
while path.exists():
path = path.with_name(f"{original_stem}_{counter}{path.suffix}")
counter += 1
return path
@staticmethod
def get_filename_from_url(url: str) -> str:
"""Extract filename from URL with proper sanitization."""
parsed = urlparse(url)
filename = os.path.basename(parsed.path)
if not filename:
return 'index.html'
# Sanitize filename
filename = re.sub(r'[^\w\-[I].]', '[/I]', filename)
filename = filename[:255] # Limit filename length
# Ensure extension
if '.' not in filename:
content_type = requests.head(url).headers.get('Content-Type', '')
if 'text/html' in content_type:
filename += '.html'
elif 'text/css' in content_type:
filename += '.css'
elif 'javascript' in content_type:
filename += '.js'
return filename
def fix_relative_paths(self, content: str, base_url: str, local_dir: str) -> str:
"""Fix relative paths in HTML/CSS content."""
soup = BeautifulSoup(content, 'html.parser')
base_url_parsed = urlparse(base_url)
# Process all elements with src, href, or url attributes
for tag in soup.find_all(['img', 'script', 'link', 'a', 'iframe']):
for attr in ['src', 'href']:
if tag.has_attr(attr):
original_url = tag[attr]
if not original_url or original_url.startswith(('data:', 'javascript:', 'mailto:', 'tel:', '#')):
continue
# Make URL absolute
absolute_url = urljoin(base_url, original_url)
# Get filename for local reference
filename = self.get_filename_from_url(absolute_url)
tag[attr] = str(Path(local_dir) / filename)
# Process CSS url() references
for style in soup.find_all('style'):
style.string = self._fix_css_urls(style.string, base_url, local_dir)
for link in soup.find_all('link', rel='stylesheet'):
if link.has_attr('href'):
css_url = urljoin(base_url, link['href'])
try:
response = self.session.get(css_url, timeout=DEFAULT_TIMEOUT)
if response.status_code == 200:
css_content = self._fix_css_urls(response.text, base_url, local_dir)
filename = self.get_filename_from_url(css_url)
css_path = Path(local_dir) / filename
with open(css_path, 'w') as f:
f.write(css_content)
link['href'] = str(Path(local_dir) / filename)
except Exception as e:
logger.error(f"Failed to process CSS {css_url}: {e}")
return str(soup)
def _fix_css_urls(self, css_content: str, base_url: str, local_dir: str) -> str:
"""Fix URLs in CSS content."""
if not css_content:
return css_content
def replacer(match):
url = match.group(1).strip('\'"')
if not url or url.startswith(('data:', 'http:', 'https:', '//')):
return match.group(0)
absolute_url = urljoin(base_url, url)
filename = self.get_filename_from_url(absolute_url)
return f'url("{Path(local_dir) / filename}")'
return re.sub(r'url\(["\']?(.*?)["\']?\)', replacer, css_content)
def extract_assets(self, soup: BeautifulSoup, base_url: str) -> List[str]:
"""Extract all asset URLs from the page."""
assets = set()
# HTML elements with src/href attributes
elements = [
('img', 'src'),
('script', 'src'),
('link', 'href'),
('source', 'src'),
('source', 'srcset'),
('video', 'poster'),
('audio', 'src'),
('iframe', 'src'),
('embed', 'src'),
('object', 'data')
]
for tag, attr in elements:
for element in soup.find_all(tag, **{attr: True}):
urls = element[attr].split(',') if attr == 'srcset' else [element[attr]]
for url in urls:
if attr == 'srcset':
url = url.strip().split()[0] # Get just the URL part
if not url or url.startswith(('data:', 'javascript:', 'mailto:', 'tel:', '#')):
continue
absolute_url = urljoin(base_url, url)
assets.add(absolute_url)
# CSS background images
for element in soup.find_all(style=True):
for url in re.findall(r'url\(["\']?(.*?)["\']?\)', element['style']):
if not url or url.startswith(('data:', 'http:', 'https:', '//')):
continue
absolute_url = urljoin(base_url, url)
assets.add(absolute_url)
return list(assets)
def download_website(self, url: str, output_dir: Union[str, Path]) -> Tuple[Path, str]:
"""Main method to download a website."""
url = self.validate_url(url)
if not url:
raise ValueError("Invalid URL provided")
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
domain = parsed_url.netloc.replace('www.', '')
# Create directory structure
output_dir = Path(output_dir)
site_dir = self.create_directory(output_dir / domain)
assets_dir = self.create_directory(site_dir / 'assets')
# Download main page
main_filename = self.get_filename_from_url(url)
main_filepath = site_dir / main_filename
logger.info(f"Downloading main page: {url}")
try:
response = self.session.get(url, timeout=DEFAULT_TIMEOUT)
response.raise_for_status()
content = response.text
except requests.RequestException as e:
logger.error(f"Failed to download main page {url}: {e}")
raise
# Parse HTML and extract assets
soup = BeautifulSoup(content, 'html.parser')
assets = self.extract_assets(soup, base_url)
# Download all assets in parallel
logger.info(f"Found {len(assets)} assets to download")
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = []
for asset_url in assets:
filename = self.get_filename_from_url(asset_url)
save_path = assets_dir / filename
futures.append(executor.submit(self.download_file, asset_url, save_path))
# Show progress
for future in tqdm(
concurrent.futures.as_completed(futures),
total=len(futures),
desc="Downloading assets",
unit="file"
):
try:
future.result()
except Exception as e:
logger.error(f"Error downloading asset: {e}")
# Fix paths in HTML
content = self.fix_relative_paths(content, base_url, 'assets')
# Save main HTML
with open(main_filepath, 'w', encoding='utf-8') as f:
f.write(content)
logger.info(f"Saved main page: {main_filepath}")
# Generate server script
self._generate_server_script(site_dir)
elapsed = time.time() - self.start_time
logger.info(f"\nDownload completed in {elapsed:.2f} seconds")
logger.info(f"Total files downloaded: {self.total_downloaded}")
return site_dir, main_filename
@staticmethod
def _generate_server_script(site_dir: Path) -> None:
"""Generate a simple HTTP server script."""
server_script = site_dir / 'run_server.py'
server_content = f"""\
#!/usr/bin/env python3
import http.server
import socketserver
import os
import sys
PORT = 8000
DIRECTORY = os.path.dirname(os.path.abspath([B]file[/B]))
class Handler(http.server.SimpleHTTPRequestHandler):
def [B]init[/B](self, *args, **kwargs):
super().[B]init[/B](*args, directory=DIRECTORY, **kwargs)
def end_headers(self):
self.send_header('Cache-Control', 'no-store, no-cache, must-revalidate')
self.send_header('Pragma', 'no-cache')
self.send_header('Expires', '0')
super().end_headers()
def main():
os.chdir(DIRECTORY)
with socketserver.TCPServer(("", PORT), Handler) as httpd:
print(f"Serving at http://localhost:{{PORT}}")
print("Press Ctrl+C to stop")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\\nServer stopped")
sys.exit(0)
if [B]name[/B] == "[B]main[/B]":
main()
"""
with open(server_script, 'w') as f:
f.write(server_content)
server_script.chmod(0o755) # Make executable
logger.info(f"Created server script: {server_script}")
def main():
"""Command line interface with interactive input."""
try:
print("\n=== Website Downloader ===")
print("Enter website URL to download (e.g. https://example.com):")
url = input("URL: ").strip()
downloader = WebsiteDownloader()
site_dir, main_filename = downloader.download_website(url, os.getcwd())
print("\n" + "="*50)
print(f"Website successfully downloaded to: {site_dir}")
print("To view the website, run:")
print(f" cd {site_dir}")
print(" python run_server.py")
print(f"Then open http://localhost:8000/{main_filename} in your browser")
print("="*50 + "\n")
except Exception as e:
logger.error(f"Failed to download website: {e}")
sys.exit(1)
if [B]name[/B] == "[B]main[/B]":
main()