Python 网络爬虫实战
大约 10 分钟约 3134 字
Python 网络爬虫实战
简介
网络爬虫是自动获取网页数据的技术。理解 HTTP 请求、HTML 解析、反爬策略应对和分布式爬虫架构,有助于构建高效可靠的数据采集系统。
特点
基础爬虫
requests + BeautifulSoup
import requests
from bs4 import BeautifulSoup
from dataclasses import dataclass
from typing import Optional
import time
import random
@dataclass
class Article:
title: str
url: str
author: str
content: str
publish_date: str
class WebScraper:
"""基础网页爬虫"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
})
self.request_delay = (1, 3) # 随机延迟范围
def fetch(self, url: str, params: dict = None, **kwargs) -> Optional[requests.Response]:
"""发送 HTTP 请求"""
try:
response = self.session.get(url, params=params, timeout=10, **kwargs)
response.raise_for_status()
response.encoding = response.apparent_encoding # 自动检测编码
return response
except requests.RequestException as e:
print(f"请求失败: {url} - {e}")
return None
finally:
delay = random.uniform(*self.request_delay)
time.sleep(delay)
def parse_html(self, html: str) -> BeautifulSoup:
"""解析 HTML"""
return BeautifulSoup(html, "lxml") # lxml 比 html.parser 快
def scrape_articles(self, base_url: str, max_pages: int = 5) -> list[Article]:
"""爬取文章列表"""
articles = []
for page in range(1, max_pages + 1):
url = f"{base_url}?page={page}"
response = self.fetch(url)
if not response:
continue
soup = self.parse_html(response.text)
# CSS 选择器提取数据
items = soup.select(".article-item")
for item in items:
title_el = item.select_one(".title a")
if not title_el:
continue
article = Article(
title=title_el.get_text(strip=True),
url=title_el["href"],
author=item.select_one(".author").get_text(strip=True) if item.select_one(".author") else "",
content="", # 详情页获取
publish_date=item.select_one(".date").get_text(strip=True) if item.select_one(".date") else ""
)
articles.append(article)
print(f"第 {page} 页: 获取 {len(items)} 篇文章")
return articles
def scrape_detail(self, article: Article) -> Article:
"""爬取文章详情"""
response = self.fetch(article.url)
if not response:
return article
soup = self.parse_html(response.text)
content_el = soup.select_one(".article-content")
if content_el:
# 清理 HTML 标签
article.content = content_el.get_text(separator="\n", strip=True)
return article
# JSON API 爬虫
class ApiScraper:
"""JSON API 爬虫"""
def __init__(self, base_url: str):
self.base_url = base_url
self.session = requests.Session()
def fetch_api(self, endpoint: str, params: dict = None) -> dict:
"""获取 API 数据"""
url = f"{self.base_url}{endpoint}"
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()
def scrape_all(self, endpoint: str, page_size: int = 20, max_items: int = 1000):
"""分页爬取所有数据"""
all_data = []
page = 1
while len(all_data) < max_items:
data = self.fetch_api(endpoint, params={
"page": page,
"size": page_size
})
items = data.get("items", [])
if not items:
break
all_data.extend(items)
print(f"Page {page}: {len(items)} items (total: {len(all_data)})")
if len(items) < page_size:
break
page += 1
time.sleep(0.5)
return all_data[:max_items]反爬策略应对
代理与请求头管理
class AntiDetectionScraper:
"""反检测爬虫"""
# User-Agent 池
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Firefox/121.0",
]
def __init__(self, proxy_list: list[str] = None):
self.session = requests.Session()
self.proxy_list = proxy_list or []
self.proxy_index = 0
def get_random_headers(self) -> dict:
"""生成随机请求头"""
return {
"User-Agent": random.choice(self.USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Referer": "https://www.google.com/",
}
def get_next_proxy(self) -> dict:
"""轮换代理"""
if not self.proxy_list:
return {}
proxy = self.proxy_list[self.proxy_index % len(self.proxy_list)]
self.proxy_index += 1
return {"http": proxy, "https": proxy}
def fetch_with_retry(self, url: str, max_retries: int = 3) -> Optional[requests.Response]:
"""带重试和代理轮换的请求"""
for attempt in range(max_retries):
try:
headers = self.get_random_headers()
proxies = self.get_next_proxy()
response = self.session.get(
url,
headers=headers,
proxies=proxies,
timeout=10
)
response.raise_for_status()
return response
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429: # 限流
wait = 2 ** attempt + random.random()
print(f"限流,等待 {wait:.1f}s")
time.sleep(wait)
elif e.response.status_code == 403: # 封禁
print(f"代理被封,切换代理")
continue
else:
raise
except requests.exceptions.RequestException:
time.sleep(2 ** attempt)
return None动态页面爬取
Playwright 无头浏览器
# pip install playwright
# playwright install chromium
from playwright.async_api import async_playwright
class DynamicScraper:
"""动态页面爬虫(Playwright)"""
async def scrape_dynamic_page(self, url: str, wait_selector: str = None):
"""爬取动态渲染页面"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# 设置视口和 UA
await page.set_viewport_size({"width": 1920, "height": 1080})
await page.set_extra_http_headers({
"Accept-Language": "zh-CN,zh;q=0.9"
})
await page.goto(url, wait_until="networkidle")
# 等待特定元素
if wait_selector:
await page.wait_for_selector(wait_selector, timeout=10000)
# 滚动加载更多内容
await self._scroll_to_bottom(page)
# 提取数据
content = await page.content()
data = await page.evaluate("""
() => {
const items = document.querySelectorAll('.item');
return Array.from(items).map(item => ({
title: item.querySelector('.title')?.textContent?.trim(),
price: item.querySelector('.price')?.textContent?.trim(),
url: item.querySelector('a')?.href
}));
}
""")
await browser.close()
return data
async def _scroll_to_bottom(self, page, max_scrolls=10):
"""滚动到底部加载更多"""
for _ in range(max_scrolls):
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(1000)
async def handle_login(self, url: str, username: str, password: str):
"""处理登录"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False) # 显示浏览器
page = await browser.new_page()
await page.goto(url)
# 填写登录表单
await page.fill('input[name="username"]', username)
await page.fill('input[name="password"]', password)
await page.click('button[type="submit"]')
# 等待登录完成
await page.wait_for_url("**/dashboard")
# 获取 cookies
cookies = await page.context.cookies()
return cookiesScrapy 框架
分布式爬虫
# pip install scrapy scrapy-redis
# items.py
import scrapy
class ArticleItem(scrapy.Item):
title = scrapy.Field()
url = scrapy.Field()
author = scrapy.Field()
content = scrapy.Field()
publish_date = scrapy.Field()
tags = scrapy.Field()
# spiders/article_spider.py
class ArticleSpider(scrapy.Spider):
name = "articles"
allowed_domains = ["example.com"]
start_urls = ["https://example.com/articles"]
custom_settings = {
"CONCURRENT_REQUESTS": 16,
"DOWNLOAD_DELAY": 1,
"ROBOTSTXT_OBEY": True,
"FEEDS": {
"output.json": {"format": "json"},
},
"DOWNLOADER_MIDDLEWARES": {
"scrapy.downloadermiddlewares.retry.RetryMiddleware": 550,
},
"ITEM_PIPELINES": {
"myproject.pipelines.CleanPipeline": 100,
"myproject.pipelines.ValidationPipeline": 200,
"myproject.pipelines.DatabasePipeline": 300,
}
}
def parse(self, response):
"""解析列表页"""
articles = response.css(".article-item")
for article in articles:
url = article.css(".title a::attr(href)").get()
yield response.follow(url, callback=self.parse_detail)
# 翻页
next_page = response.css(".next-page::attr(href)").get()
if next_page:
yield response.follow(next_page, callback=self.parse)
def parse_detail(self, response):
"""解析详情页"""
item = ArticleItem()
item["title"] = response.css("h1::text").get().strip()
item["url"] = response.url
item["author"] = response.css(".author::text").get(default="").strip()
item["content"] = " ".join(response.css(".content p::text").getall())
item["publish_date"] = response.css(".date::text").get(default="").strip()
item["tags"] = response.css(".tag::text").getall()
yield item
# pipelines.py
class CleanPipeline:
"""数据清洗"""
def process_item(self, item, spider):
for field in item.fields:
if isinstance(item[field], str):
item[field] = item[field].strip()
return item
class ValidationPipeline:
"""数据验证"""
def process_item(self, item, spider):
if not item.get("title"):
raise DropItem("Missing title")
return item
class DatabasePipeline:
"""数据库存储"""
def open_spider(self, spider):
import sqlite3
self.conn = sqlite3.connect("articles.db")
self.conn.execute("""
CREATE TABLE IF NOT EXISTS articles (
title TEXT, url TEXT UNIQUE, author TEXT,
content TEXT, publish_date TEXT, tags TEXT
)
""")
def process_item(self, item, spider):
self.conn.execute(
"INSERT OR IGNORE INTO articles VALUES (?,?,?,?,?,?)",
(item["title"], item["url"], item["author"],
item["content"], item["publish_date"], str(item.get("tags", "")))
)
self.conn.commit()
return item
def close_spider(self, spider):
self.conn.close()
# 运行: scrapy crawl articles优点
缺点
总结
Python 爬虫使用 requests + BeautifulSoup 处理静态页面,Playwright 处理动态渲染页面。反爬策略包括 User-Agent 轮换、代理池、请求限速和重试机制。Scrapy 框架提供完整的爬虫解决方案,支持中间件、Pipeline 和分布式(scrapy-redis)。建议遵守 robots.txt、设置合理延迟、不爬取个人隐私数据,确保爬虫合法合规。
关键知识点
- 先区分这篇内容属于语法能力、工程能力,还是生态工具能力。
- Python 的开发效率来自生态,但可维护性来自结构、测试和规范。
- 脚本一旦进入长期维护,就必须按项目来治理。
- 框架与语言特性类主题要同时理解运行方式和工程组织方式。
项目落地视角
- 统一虚拟环境、依赖锁定、格式化和日志方案。
- 把入口、配置、业务逻辑和工具函数拆开,避免单文件膨胀。
- 对网络请求、文件读写和数据处理结果做异常与样本校验。
- 明确项目入口、配置管理、依赖管理、日志和测试策略。
常见误区
- 把临时脚本直接当生产代码使用。
- 忽略依赖版本、编码、路径和时区差异。
- 只会写 happy path,没有补超时、重试和资源释放。
- 把 notebook 或脚本风格直接带入长期维护项目。
进阶路线
- 把类型注解、测试、打包和部署纳入统一工程流程。
- 继续向异步、性能、数据管线和框架源码层深入。
- 把常用脚本抽成可复用库或 CLI 工具,而不是复制粘贴。
- 继续补齐部署、打包、监控和性能调优能力。
适用场景
- 当你准备把《Python 网络爬虫实战》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合脚本自动化、数据处理、Web 开发和测试工具建设。
- 当需求强调快速迭代和丰富生态时,Python 往往能快速起步。
落地建议
- 统一使用虚拟环境与依赖锁定,避免环境漂移。
- 对核心函数补类型注解、异常处理和日志,减少“脚本黑盒”。
- 一旦脚本进入生产链路,及时补测试和监控。
排错清单
- 先确认当前解释器、虚拟环境和依赖版本是否正确。
- 检查编码、路径、时区和第三方库行为差异。
- 排查同步阻塞、数据库连接未释放或网络请求无超时。
复盘问题
- 如果把《Python 网络爬虫实战》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《Python 网络爬虫实战》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《Python 网络爬虫实战》最大的收益和代价分别是什么?
异步爬虫(aiohttp + asyncio)
对于高并发爬取场景,异步 IO 可以显著提升效率。
import asyncio
import aiohttp
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class CrawlResult:
url: str
status: int
html: Optional[str] = None
error: Optional[str] = None
elapsed: float = 0.0
class AsyncWebScraper:
"""异步并发爬虫"""
def __init__(self, max_concurrent=10, request_delay=0.5):
self.max_concurrent = max_concurrent
self.request_delay = request_delay
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results: list[CrawlResult] = []
async def fetch(self, session: aiohttp.ClientSession, url: str) -> CrawlResult:
"""带信号量控制的异步请求"""
async with self.semaphore:
start = time.perf_counter()
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as resp:
html = await resp.text()
elapsed = time.perf_counter() - start
return CrawlResult(url=url, status=resp.status, html=html, elapsed=elapsed)
except Exception as e:
elapsed = time.perf_counter() - start
return CrawlResult(url=url, status=0, error=str(e), elapsed=elapsed)
finally:
await asyncio.sleep(self.request_delay)
async def crawl_batch(self, urls: list[str]) -> list[CrawlResult]:
"""批量爬取"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120.0.0.0"
}
async with aiohttp.ClientSession(headers=headers) as session:
tasks = [self.fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
self.results = [r for r in results if isinstance(r, CrawlResult)]
return self.results
def print_summary(self):
"""打印爬取摘要"""
success = sum(1 for r in self.results if r.status == 200)
failed = sum(1 for r in self.results if r.status != 200)
avg_time = sum(r.elapsed for r in self.results) / max(len(self.results), 1)
print(f"\n爬取摘要:")
print(f" 总数: {len(self.results)}")
print(f" 成功: {success}, 失败: {failed}")
print(f" 平均耗时: {avg_time:.2f}s")
# 使用
# async def main():
# scraper = AsyncWebScraper(max_concurrent=5, request_delay=0.3)
# urls = [f"https://example.com/page/{i}" for i in range(1, 21)]
# await scraper.crawl_batch(urls)
# scraper.print_summary()
#
# asyncio.run(main())数据存储策略
import json
import csv
import sqlite3
from pathlib import Path
from datetime import datetime
class CrawlStorage:
"""多种数据存储方式"""
# 1. JSON Lines 格式(推荐,每行一条记录)
@staticmethod
def save_jsonl(data: list[dict], filepath: str):
"""保存为 JSONL 格式(追加模式)"""
with open(filepath, "a", encoding="utf-8") as f:
for item in data:
item["_crawl_time"] = datetime.now().isoformat()
f.write(json.dumps(item, ensure_ascii=False) + "\n")
@staticmethod
def load_jsonl(filepath: str) -> list[dict]:
"""读取 JSONL 文件"""
results = []
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
if line.strip():
results.append(json.loads(line))
return results
# 2. CSV 格式
@staticmethod
def save_csv(data: list[dict], filepath: str):
"""保存为 CSV"""
if not data:
return
with open(filepath, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
# 3. SQLite 存储
@staticmethod
def save_to_sqlite(data: list[dict], db_path: str, table_name: str):
"""保存到 SQLite"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 自动建表
columns = data[0].keys()
col_defs = ", ".join(f'"{c}" TEXT' for c in columns)
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({col_defs})")
# 插入数据(忽略重复 URL)
placeholders = ", ".join("?" for _ in columns)
insert_sql = f"INSERT OR IGNORE INTO {table_name} VALUES ({placeholders})"
for item in data:
cursor.execute(insert_sql, tuple(item.values()))
conn.commit()
conn.close()
# 4. 文件下载管理
@staticmethod
def download_file(url: str, save_dir: str, filename: str = None):
"""下载文件到本地"""
import requests
save_dir_path = Path(save_dir)
save_dir_path.mkdir(parents=True, exist_ok=True)
if not filename:
filename = url.split("/")[-1] or "download"
filepath = save_dir_path / filename
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return str(filepath)爬虫合规与最佳实践
def crawler_best_practices():
"""爬虫合规与最佳实践"""
practices = {
"遵守 robots.txt": """
# 检查 robots.txt
import urllib.robotparser
rp = urllib.robotparser.RobotFileParser()
rp.set_url("https://example.com/robots.txt")
rp.read()
can_fetch = rp.can_fetch("MyBot", "https://example.com/page")
""",
"设置合理延迟": "每次请求间隔 1-3 秒,高峰期加大延迟",
"限制并发数": "并发不超过 5-10,避免对目标服务器造成压力",
"使用有意义的 UA": "包含联系方式,方便站长联系",
"尊重版权": "不爬取和分发受版权保护的内容",
"不爬取个人信息": "避免爬取身份证号、手机号等敏感信息",
"设置重试上限": "单页面最多重试 3 次,避免无限循环",
"断点续爬": "记录已爬取 URL,支持中断后继续",
}
print("爬虫最佳实践:")
for practice, desc in practices.items():
print(f" {practice}: {desc}")
# 断点续爬示例
class ResumableScraper:
"""支持断点续爬的爬虫"""
def __init__(self, progress_file="crawl_progress.json"):
self.progress_file = progress_file
self.visited_urls: set[str] = set()
self._load_progress()
def _load_progress(self):
"""加载已爬取的 URL"""
if Path(self.progress_file).exists():
with open(self.progress_file, "r") as f:
self.visited_urls = set(json.load(f))
def _save_progress(self):
"""保存已爬取的 URL"""
with open(self.progress_file, "w") as f:
json.dump(list(self.visited_urls), f)
def is_visited(self, url: str) -> bool:
return url in self.visited_urls
def mark_visited(self, url: str):
self.visited_urls.add(url)
self._save_progress()
crawler_best_practices()