Files
qoder-config/skills/opencli-websearch/scripts/websearch.py
aszerW c3ea38c045 feat(repo): 整理 Qoder Skills 和 MCP 配置到仓库
- 添加 5 个用户级别 Skills:
  - auto-commit: 自动 Git 提交
  - karpathy-guidelines: 编码规范指南
  - opencli-websearch: 多源网络搜索
  - pdf-reader: PDF 内容提取
  - repo-analyzer: 项目深度分析

- 添加 Playwright MCP 配置 (21 个浏览器自动化工具)
- 创建完整的 README.md 文档说明
2026-04-18 11:17:41 +08:00

323 lines
9.6 KiB
Python

#!/usr/bin/env python3
"""
OpenCLI 多源 Web 搜索脚本
支持 Qoder WebSearch 和 OpenCLI 并行搜索
"""
import os
import sys
import json
import hashlib
import subprocess
import argparse
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
# 数据源配置
SOURCES = {
"academic": {
"arxiv": {"type": "public", "limit": 5},
},
"technical": {
"stackoverflow": {"type": "public", "limit": 5},
"hackernews": {"type": "public", "limit": 5},
"gitee": {"type": "public", "limit": 5},
},
"chinese": {
"zhihu": {"type": "browser", "limit": 5},
"xiaohongshu": {"type": "browser", "limit": 5},
"36kr": {"type": "public", "limit": 5},
},
"news": {
"bbc": {"type": "public", "limit": 5},
"reuters": {"type": "public", "limit": 5},
},
"general": {
"google": {"type": "browser", "limit": 5},
}
}
def create_storage_dir(query: str) -> str:
"""创建临时存储目录"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
query_hash = hashlib.md5(query.encode()).hexdigest()[:8]
dir_name = f"{timestamp}_{query_hash}"
storage_path = os.path.expanduser(f"~/Downloads/opencli-websearch-data/{dir_name}")
os.makedirs(storage_path, exist_ok=True)
os.makedirs(os.path.join(storage_path, "content"), exist_ok=True)
return storage_path
def run_opencli_search(source: str, query: str, limit: int = 5) -> Dict:
"""执行 OpenCLI 搜索"""
cmd = ["opencli", source, "search", query, "--limit", str(limit)]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
return {
"source": source,
"success": True,
"output": result.stdout,
"error": None
}
else:
return {
"source": source,
"success": False,
"output": None,
"error": result.stderr
}
except subprocess.TimeoutExpired:
return {
"source": source,
"success": False,
"output": None,
"error": "Timeout"
}
except Exception as e:
return {
"source": source,
"success": False,
"output": None,
"error": str(e)
}
def run_opencli_hackernews(limit: int = 5) -> Dict:
"""获取 HackerNews 热门内容"""
cmd = ["opencli", "hackernews", "top", "--limit", str(limit)]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
return {
"source": "hackernews",
"success": result.returncode == 0,
"output": result.stdout if result.returncode == 0 else None,
"error": result.stderr if result.returncode != 0 else None
}
except Exception as e:
return {
"source": "hackernews",
"success": False,
"output": None,
"error": str(e)
}
def download_content(url: str, output_dir: str) -> Optional[str]:
"""使用 OpenCLI web read 下载文档内容"""
url_hash = hashlib.md5(url.encode()).hexdigest()[:12]
filename = f"web_{url_hash}.md"
output_path = os.path.join(output_dir, filename)
cmd = ["opencli", "web", "read", "--url", url, "--output", output_path]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0 and os.path.exists(output_path):
return output_path
return None
except Exception:
return None
def select_sources(query: str, intent: Optional[str] = None) -> List[str]:
"""根据查询意图选择数据源"""
sources = []
if intent:
if intent == "academic":
sources.extend(SOURCES["academic"].keys())
elif intent == "technical":
sources.extend(SOURCES["technical"].keys())
elif intent == "chinese":
sources.extend(SOURCES["chinese"].keys())
elif intent == "news":
sources.extend(SOURCES["news"].keys())
else:
# 通用搜索 - 选择所有公开源
for category in ["academic", "technical", "chinese", "news"]:
for source, config in SOURCES[category].items():
if config["type"] == "public":
sources.append(source)
else:
# 自动判断
query_lower = query.lower()
# 学术关键词
if any(kw in query_lower for kw in ["paper", "论文", "arxiv", "research", "study"]):
sources.extend(SOURCES["academic"].keys())
# 技术关键词
if any(kw in query_lower for kw in ["python", "javascript", "code", "programming", "bug", "error"]):
sources.extend(["stackoverflow", "hackernews"])
# 中文关键词
if any('\u4e00' <= char <= '\u9fff' for char in query):
sources.extend(["36kr"]) # 优先公开源
# 默认添加通用源
if not sources:
sources = ["arxiv", "stackoverflow", "36kr"]
return list(set(sources))
def parallel_search(query: str, sources: List[str]) -> Dict[str, Dict]:
"""并行执行多源搜索"""
results = {}
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_source = {}
for source in sources:
if source == "hackernews":
future = executor.submit(run_opencli_hackernews)
else:
limit = 5
for category in SOURCES.values():
if source in category:
limit = category[source].get("limit", 5)
break
future = executor.submit(run_opencli_search, source, query, limit)
future_to_source[future] = source
for future in as_completed(future_to_source):
source = future_to_source[future]
try:
results[source] = future.result()
except Exception as e:
results[source] = {
"source": source,
"success": False,
"output": None,
"error": str(e)
}
return results
def save_results(storage_path: str, query: str, results: Dict) -> str:
"""保存搜索结果到本地"""
# 保存元数据
metadata = {
"query": query,
"timestamp": datetime.now().isoformat(),
"sources": list(results.keys()),
"success_count": sum(1 for r in results.values() if r["success"])
}
metadata_path = os.path.join(storage_path, "metadata.json")
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
# 保存完整结果
results_path = os.path.join(storage_path, "results.json")
with open(results_path, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
return storage_path
def print_report(query: str, results: Dict, storage_path: str):
"""打印搜索结果报告"""
print(f"\n{'='*60}")
print(f"搜索报告: {query}")
print(f"{'='*60}")
print(f"\n存储位置: {storage_path}")
print(f"数据源: {', '.join(results.keys())}")
success_count = sum(1 for r in results.values() if r["success"])
print(f"成功: {success_count}/{len(results)}")
print("\n" + "-"*60)
print("各源结果:")
print("-"*60)
for source, result in results.items():
status = "" if result["success"] else ""
print(f"\n[{status}] {source}")
if result["success"] and result["output"]:
# 截断输出,避免过长
output = result["output"][:500]
if len(result["output"]) > 500:
output += "..."
print(output)
elif result["error"]:
print(f" 错误: {result['error'][:100]}")
print(f"\n{'='*60}\n")
def main():
parser = argparse.ArgumentParser(description="OpenCLI 多源 Web 搜索")
parser.add_argument("query", help="搜索查询")
parser.add_argument("--intent", choices=["academic", "technical", "chinese", "news", "general"],
help="搜索意图类型")
parser.add_argument("--sources", nargs="+", help="指定数据源")
parser.add_argument("--download", action="store_true", help="下载高相关性文档")
parser.add_argument("--output", help="输出目录")
args = parser.parse_args()
# 创建存储目录
if args.output:
storage_path = args.output
os.makedirs(storage_path, exist_ok=True)
else:
storage_path = create_storage_dir(args.query)
print(f"存储路径: {storage_path}")
# 选择数据源
if args.sources:
sources = args.sources
else:
sources = select_sources(args.query, args.intent)
print(f"搜索源: {', '.join(sources)}")
print("正在并行搜索...")
# 执行并行搜索
results = parallel_search(args.query, sources)
# 保存结果
save_results(storage_path, args.query, results)
# 打印报告
print_report(args.query, results, storage_path)
return 0
if __name__ == "__main__":
sys.exit(main())