#!/usr/bin/env python3 """ 使用 OpenCLI web read 下载文档内容 """ import os import sys import json import hashlib import subprocess import argparse from pathlib import Path from typing import Optional, List from urllib.parse import urlparse def download_with_opencli(url: str, output_dir: str, timeout: int = 60) -> Optional[str]: """ 使用 OpenCLI web read 下载文档内容 Args: url: 要下载的 URL output_dir: 输出目录 timeout: 超时时间(秒) Returns: 下载文件的本地路径,失败返回 None """ # 生成文件名 url_hash = hashlib.md5(url.encode()).hexdigest()[:12] parsed = urlparse(url) domain = parsed.netloc.replace(".", "_") filename = f"{domain}_{url_hash}.md" output_path = os.path.join(output_dir, filename) # 构建命令 cmd = ["opencli", "web", "read", "--url", url, "--output", output_path] print(f"下载: {url}") print(f"输出: {output_path}") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) if result.returncode == 0: if os.path.exists(output_path): file_size = os.path.getsize(output_path) print(f"✓ 成功下载 ({file_size} bytes)") return output_path else: print(f"✗ 文件未生成") return None else: print(f"✗ 下载失败: {result.stderr[:200]}") return None except subprocess.TimeoutExpired: print(f"✗ 下载超时") return None except Exception as e: print(f"✗ 错误: {str(e)}") return None def batch_download(urls: List[str], output_dir: str, max_workers: int = 3) -> dict: """ 批量下载多个 URL Args: urls: URL 列表 output_dir: 输出目录 max_workers: 最大并行数 Returns: 下载结果字典 {url: local_path or None} """ from concurrent.futures import ThreadPoolExecutor, as_completed results = {} with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_url = { executor.submit(download_with_opencli, url, output_dir): url for url in urls } for future in as_completed(future_to_url): url = future_to_url[future] try: results[url] = future.result() except Exception as e: print(f"✗ {url} 异常: {str(e)}") results[url] = None return results def load_results_from_search(search_dir: str) -> List[str]: """ 从之前的搜索结果中加载 URL 列表 Args: search_dir: 搜索结果目录 Returns: URL 列表 """ results_file = os.path.join(search_dir, "results.json") if not os.path.exists(results_file): print(f"未找到结果文件: {results_file}") return [] with open(results_file, "r", encoding="utf-8") as f: data = json.load(f) urls = [] for source, result in data.items(): if result.get("success") and result.get("output"): # 简单解析输出中的 URL output = result["output"] for line in output.split("\n"): if "url:" in line.lower() or "http" in line: # 提取 URL import re url_match = re.search(r'https?://[^\s\'"<>]+', line) if url_match: urls.append(url_match.group()) return list(set(urls)) def main(): parser = argparse.ArgumentParser(description="使用 OpenCLI 下载文档内容") parser.add_argument("--url", help="单个 URL 下载") parser.add_argument("--urls", nargs="+", help="多个 URL 下载") parser.add_argument("--from-search", help="从搜索结果目录加载 URL") parser.add_argument("--output-dir", required=True, help="输出目录") parser.add_argument("--max-workers", type=int, default=3, help="最大并行数") args = parser.parse_args() # 确保输出目录存在 os.makedirs(args.output_dir, exist_ok=True) # 收集 URL 列表 urls = [] if args.url: urls.append(args.url) if args.urls: urls.extend(args.urls) if args.from_search: search_urls = load_results_from_search(args.from_search) urls.extend(search_urls) print(f"从搜索结果加载 {len(search_urls)} 个 URL") if not urls: print("错误: 未提供 URL") return 1 # 去重 urls = list(set(urls)) print(f"\n共 {len(urls)} 个唯一 URL 待下载\n") # 批量下载 results = batch_download(urls, args.output_dir, args.max_workers) # 统计 success_count = sum(1 for v in results.values() if v is not None) print(f"\n{'='*60}") print(f"下载完成: {success_count}/{len(urls)} 成功") print(f"{'='*60}") # 保存下载记录 record_file = os.path.join(args.output_dir, "download_record.json") with open(record_file, "w", encoding="utf-8") as f: json.dump(results, f, ensure_ascii=False, indent=2) print(f"下载记录: {record_file}") return 0 if __name__ == "__main__": sys.exit(main())