Python 网络编程
大约 17 分钟约 5013 字
Python 网络编程
简介
Python 是网络编程领域最流行的语言之一,拥有丰富的标准库和第三方库来处理各种网络协议和场景。从底层的 Socket 编程到高层 HTTP 客户端,从 Web 爬虫到 REST API 开发,Python 提供了完整的工具链。本文将系统性地介绍 Python 网络编程的核心技术,涵盖 TCP/UDP Socket、HTTP 客户端、Web 爬虫、REST API、WebSocket、异步网络、SSL/TLS 等主题。
特点
Python 网络编程的核心特征:
- 标准库丰富: socket、http、urllib 等标准库开箱即用
- 高层封装: requests、httpx 等库大幅简化 HTTP 操作
- 异步支持: asyncio + aiohttp/httpx 支持高性能异步网络
- 协议覆盖: 支持 TCP、UDP、HTTP、WebSocket、FTP、SMTP 等主流协议
- 生态完善: Scrapy、FastAPI 等框架覆盖各种网络应用场景
实现
1. Socket 编程(TCP/UDP)
1.1 TCP 客户端与服务端
import socket
import threading
import json
import struct
from typing import Callable
# ---- TCP 服务端 ----
class TCPServer:
"""TCP 服务器"""
def __init__(
self,
host: str = "0.0.0.0",
port: int = 8888,
max_connections: int = 5,
buffer_size: int = 4096,
):
self.host = host
self.port = port
self.max_connections = max_connections
self.buffer_size = buffer_size
self.running = False
self.client_handlers: dict[str, socket.socket] = {}
def start(self):
"""启动服务器"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(self.max_connections)
self.running = True
print(f"TCP 服务器启动: {self.host}:{self.port}")
try:
while self.running:
try:
self.server_socket.settimeout(1.0)
client_socket, address = self.server_socket.accept()
print(f"新连接: {address}")
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, address),
daemon=True,
)
client_thread.start()
except socket.timeout:
continue
except KeyboardInterrupt:
print("\n服务器关闭...")
finally:
self.stop()
def stop(self):
"""停止服务器"""
self.running = False
for addr, sock in self.client_handlers.items():
sock.close()
self.server_socket.close()
def _handle_client(self, client_socket: socket.socket, address: tuple):
"""处理客户端连接"""
self.client_handlers[str(address)] = client_socket
try:
while self.running:
data = client_socket.recv(self.buffer_size)
if not data:
break
# 处理请求
response = self._process_request(data, address)
client_socket.sendall(response)
except ConnectionResetError:
print(f"客户端断开: {address}")
finally:
client_socket.close()
del self.client_handlers[str(address)]
print(f"连接关闭: {address}")
def _process_request(self, data: bytes, address: tuple) -> bytes:
"""处理请求数据"""
try:
message = json.loads(data.decode("utf-8"))
print(f"收到来自 {address} 的消息: {message}")
response = {"status": "ok", "echo": message}
return json.dumps(response).encode("utf-8")
except json.JSONDecodeError:
return json.dumps({"status": "error", "message": "无效 JSON"}).encode("utf-8")
# ---- TCP 客户端 ----
class TCPClient:
"""TCP 客户端"""
def __init__(self, host: str = "127.0.0.1", port: int = 8888):
self.host = host
self.port = port
self.socket = None
def connect(self):
"""连接服务器"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.host, self.port))
print(f"已连接: {self.host}:{self.port}")
def send(self, data: dict) -> dict:
"""发送 JSON 数据并接收响应"""
if not self.socket:
raise RuntimeError("未连接")
message = json.dumps(data).encode("utf-8")
self.socket.sendall(message)
response = self.socket.recv(4096)
return json.loads(response.decode("utf-8"))
def close(self):
"""关闭连接"""
if self.socket:
self.socket.close()
self.socket = None
# ---- 长度前缀协议 ----
class LengthPrefixedProtocol:
"""长度前缀协议 - 解决 TCP 粘包问题"""
@staticmethod
def send_message(sock: socket.socket, data: bytes):
"""发送带长度前缀的消息"""
length = len(data)
header = struct.pack("!I", length) # 4字节大端无符号整数
sock.sendall(header + data)
@staticmethod
def recv_message(sock: socket.socket) -> bytes | None:
"""接收带长度前缀的消息"""
# 先接收4字节长度头
header = LengthPrefixedProtocol._recv_exact(sock, 4)
if not header:
return None
length = struct.unpack("!I", header)[0]
# 接收完整消息体
return LengthPrefixedProtocol._recv_exact(sock, length)
@staticmethod
def _recv_exact(sock: socket.socket, n: int) -> bytes | None:
"""精确接收 n 字节"""
data = bytearray()
while len(data) < n:
chunk = sock.recv(n - len(data))
if not chunk:
return None
data.extend(chunk)
return bytes(data)
# 使用示例
if __name__ == "__main__":
# 服务端
server = TCPServer(port=8888)
server_thread = threading.Thread(target=server.start, daemon=True)
server_thread.start()
# 客户端
client = TCPClient(port=8888)
client.connect()
response = client.send({"action": "hello", "name": "Python"})
print(f"响应: {response}")
client.close()
server.stop()1.2 UDP 编程
import socket
class UDPServer:
"""UDP 服务器(无连接)"""
def __init__(self, host: str = "0.0.0.0", port: int = 9999):
self.host = host
self.port = port
def start(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((self.host, self.port))
print(f"UDP 服务器启动: {self.host}:{self.port}")
while True:
data, address = sock.recvfrom(4096)
print(f"收到来自 {address}: {data.decode()}")
response = f"已收到: {data.decode()}".encode()
sock.sendto(response, address)
class UDPClient:
"""UDP 客户端"""
def __init__(self, server_host: str = "127.0.0.1", server_port: int = 9999):
self.server_address = (server_host, server_port)
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.settimeout(5.0)
def send(self, message: str) -> str | None:
self.socket.sendto(message.encode(), self.server_address)
try:
data, _ = self.socket.recvfrom(4096)
return data.decode()
except socket.timeout:
print("超时无响应")
return None
# ---- UDP 广播 ----
class UDPBroadcast:
"""UDP 广播"""
@staticmethod
def sender(broadcast_port: int = 37020, message: str = "DISCOVER"):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(message.encode(), ("<broadcast>", broadcast_port))
sock.close()
@staticmethod
def listener(broadcast_port: int = 37020):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", broadcast_port))
while True:
data, address = sock.recvfrom(4096)
print(f"广播消息来自 {address}: {data.decode()}")2. HTTP 客户端
2.1 requests 库
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
class RobustHTTPClient:
"""健壮的 HTTP 客户端"""
def __init__(
self,
base_url: str = "",
timeout: int = 30,
max_retries: int = 3,
retry_backoff: float = 1.0,
session_cookies: dict = None,
headers: dict = None,
):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self.session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=max_retries,
backoff_factor=retry_backoff,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT", "DELETE"],
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=20,
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# 默认请求头
self.session.headers.update({
"User-Agent": "Python-HTTP-Client/1.0",
"Accept": "application/json",
})
if headers:
self.session.headers.update(headers)
if session_cookies:
self.session.cookies.update(session_cookies)
def get(self, path: str, params: dict = None, **kwargs) -> requests.Response:
url = f"{self.base_url}{path}" if self.base_url else path
response = self.session.get(
url, params=params, timeout=self.timeout, **kwargs
)
response.raise_for_status()
return response
def post(self, path: str, json: dict = None, data: dict = None, **kwargs) -> requests.Response:
url = f"{self.base_url}{path}" if self.base_url else path
response = self.session.post(
url, json=json, data=data, timeout=self.timeout, **kwargs
)
response.raise_for_status()
return response
def put(self, path: str, json: dict = None, **kwargs) -> requests.Response:
url = f"{self.base_url}{path}" if self.base_url else path
response = self.session.put(url, json=json, timeout=self.timeout, **kwargs)
response.raise_for_status()
return response
def delete(self, path: str, **kwargs) -> requests.Response:
url = f"{self.base_url}{path}" if self.base_url else path
response = self.session.delete(url, timeout=self.timeout, **kwargs)
response.raise_for_status()
return response
def download_file(self, url: str, save_path: str, chunk_size: int = 8192):
"""下载大文件(流式)"""
response = self.session.get(url, stream=True, timeout=self.timeout)
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
downloaded = 0
with open(save_path, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = downloaded / total_size * 100
print(f"\r下载进度: {progress:.1f}%", end="")
print(f"\n下载完成: {save_path}")
def upload_file(self, path: str, file_path: str, field_name: str = "file"):
"""上传文件"""
url = f"{self.base_url}{path}" if self.base_url else path
with open(file_path, "rb") as f:
response = self.session.post(
url,
files={field_name: f},
timeout=self.timeout,
)
response.raise_for_status()
return response
def close(self):
self.session.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
# 使用示例
with RobustHTTPClient(base_url="https://httpbin.org") as client:
# GET 请求
response = client.get("/get", params={"key": "value"})
print(response.json())
# POST JSON
response = client.post("/post", json={"name": "张三", "age": 25})
print(response.json())2.2 httpx 异步客户端
import httpx
import asyncio
class AsyncHTTPClient:
"""异步 HTTP 客户端(httpx)"""
def __init__(self, base_url: str = "", timeout: int = 30):
self.base_url = base_url.rstrip("/")
self.timeout = httpx.Timeout(timeout)
async def get(self, path: str, params: dict = None) -> httpx.Response:
url = f"{self.base_url}{path}" if self.base_url else path
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(url, params=params)
response.raise_for_status()
return response
async def post(self, path: str, json: dict = None) -> httpx.Response:
url = f"{self.base_url}{path}" if self.base_url else path
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(url, json=json)
response.raise_for_status()
return response
async def batch_get(self, urls: list[str], max_concurrency: int = 10) -> list[httpx.Response]:
"""批量 GET 请求"""
sem = asyncio.Semaphore(max_concurrency)
async def fetch(client: httpx.AsyncClient, url: str):
async with sem:
return await client.get(url)
async with httpx.AsyncClient(timeout=self.timeout) as client:
tasks = [fetch(client, url) for url in urls]
responses = await asyncio.gather(*tasks, return_exceptions=True)
results = []
for r in responses:
if isinstance(r, Exception):
print(f"请求失败: {r}")
results.append(None)
else:
results.append(r)
return results3. Web 爬虫
3.1 BeautifulSoup 爬虫
import re
from dataclasses import dataclass
from typing import Optional
from bs4 import BeautifulSoup
import requests
@dataclass
class ScrapedItem:
"""爬取的数据项"""
title: str
url: str
content: str
metadata: dict
class WebScraper:
"""网页爬虫"""
def __init__(
self,
user_agent: str = "Mozilla/5.0 (compatible; PyScraper/1.0)",
delay: float = 1.0,
timeout: int = 30,
):
self.session = requests.Session()
self.session.headers.update({"User-Agent": user_agent})
self.delay = delay
self.timeout = timeout
self.visited: set[str] = set()
def fetch_page(self, url: str, encoding: str = None) -> BeautifulSoup:
"""获取页面"""
response = self.session.get(url, timeout=self.timeout)
if encoding:
response.encoding = encoding
else:
response.encoding = response.apparent_encoding
response.raise_for_status()
return BeautifulSoup(response.text, "html.parser")
def extract_links(self, soup: BeautifulSoup, base_url: str, pattern: str = None) -> list[str]:
"""提取页面中的链接"""
links = []
for tag in soup.find_all("a", href=True):
href = tag["href"]
# 处理相对路径
if href.startswith("/"):
from urllib.parse import urljoin
href = urljoin(base_url, href)
if pattern:
if re.search(pattern, href):
links.append(href)
else:
links.append(href)
return list(set(links))
def extract_text(self, soup: BeautifulSoup) -> str:
"""提取页面正文文本"""
# 移除无关标签
for tag in soup(["script", "style", "nav", "footer", "header", "aside"]):
tag.decompose()
# 优先提取 article 或 main 区域
content = soup.find("article") or soup.find("main") or soup.find("body")
if content:
return content.get_text(separator="\n", strip=True)
return soup.get_text(separator="\n", strip=True)
def extract_images(self, soup: BeautifulSoup, base_url: str) -> list[dict]:
"""提取图片"""
images = []
for img in soup.find_all("img"):
src = img.get("src", "")
if src:
from urllib.parse import urljoin
src = urljoin(base_url, src)
images.append({
"src": src,
"alt": img.get("alt", ""),
"title": img.get("title", ""),
})
return images
def crawl(
self,
start_url: str,
max_pages: int = 50,
url_pattern: str = None,
same_domain: bool = True,
) -> list[ScrapedItem]:
"""爬取网站"""
from urllib.parse import urlparse
base_domain = urlparse(start_url).netloc
queue = [start_url]
results = []
while queue and len(results) < max_pages:
url = queue.pop(0)
if url in self.visited:
continue
try:
self.visited.add(url)
soup = self.fetch_page(url)
# 提取内容
title_tag = soup.find("title")
title = title_tag.get_text(strip=True) if title_tag else "无标题"
content = self.extract_text(soup)
results.append(ScrapedItem(
title=title,
url=url,
content=content[:5000],
metadata={"content_length": len(content)},
))
# 提取新链接
new_links = self.extract_links(soup, url, url_pattern)
for link in new_links:
if link not in self.visited:
if same_domain and urlparse(link).netloc != base_domain:
continue
queue.append(link)
print(f"已爬取: {url} ({len(content)} 字符)")
# 请求间隔
time.sleep(self.delay)
except Exception as e:
print(f"爬取失败 {url}: {e}")
return results
# ---- 表格数据提取 ----
class TableScraper:
"""表格数据提取器"""
@staticmethod
def extract_tables(soup: BeautifulSoup) -> list[list[dict]]:
"""提取页面中的所有表格"""
tables = []
for table in soup.find_all("table"):
rows = []
# 处理表头
headers = []
thead = table.find("thead")
if thead:
headers = [th.get_text(strip=True) for th in thead.find_all(["th", "td"])]
else:
first_row = table.find("tr")
if first_row:
headers = [th.get_text(strip=True) for th in first_row.find_all(["th", "td"])]
# 处理数据行
tbody = table.find("tbody") or table
for tr in tbody.find_all("tr"):
cells = [td.get_text(strip=True) for td in tr.find_all(["td", "th"])]
if cells and cells != headers:
if headers and len(headers) == len(cells):
rows.append(dict(zip(headers, cells)))
else:
rows.append({"_row": cells})
if rows:
tables.append(rows)
return tables3.2 Scrapy 爬虫框架
# scrapy_spider.py - Scrapy 爬虫示例
import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from itemadapter import ItemAdapter
import json
class ArticleItem(scrapy.Item):
"""文章数据项"""
title = scrapy.Field()
url = scrapy.Field()
author = scrapy.Field()
publish_date = scrapy.Field()
content = scrapy.Field()
tags = scrapy.Field()
class ArticleSpider(scrapy.Spider):
"""文章爬虫"""
name = "article_spider"
allowed_domains = ["example.com"]
start_urls = ["https://example.com/articles"]
# 自定义设置
custom_settings = {
"CONCURRENT_REQUESTS": 8,
"DOWNLOAD_DELAY": 1.0,
"AUTOTHROTTLE_ENABLED": True,
"AUTOTHROTTLE_START_DELAY": 1.0,
"AUTOTHROTTLE_MAX_DELAY": 10.0,
"ROBOTSTXT_OBEY": True,
"USER_AGENT": "Mozilla/5.0 (compatible; ArticleSpider/1.0)",
"FEEDS": {
"articles.json": {"format": "json", "encoding": "utf-8"},
},
}
def parse(self, response):
"""解析文章列表页"""
articles = response.css("article.post")
for article in articles:
url = article.css("h2 a::attr(href)").get()
if url:
yield scrapy.Request(
response.urljoin(url),
callback=self.parse_article,
)
# 分页
next_page = response.css("a.next-page::attr(href)").get()
if next_page:
yield scrapy.Request(
response.urljoin(next_page),
callback=self.parse,
)
def parse_article(self, response):
"""解析文章详情页"""
item = ArticleItem()
item["title"] = response.css("h1.title::text").get("").strip()
item["url"] = response.url
item["author"] = response.css("span.author::text").get("").strip()
item["publish_date"] = response.css("time::attr(datetime)").get("")
item["content"] = " ".join(
p.get().strip()
for p in response.css("div.content p::text").getall()
)
item["tags"] = response.css("a.tag::text").getall()
yield item
# ---- Scrapy Pipeline ----
class JsonWriterPipeline:
"""JSON 写入 Pipeline"""
def open_spider(self, spider):
self.file = open("items.jsonl", "w", encoding="utf-8")
def close_spider(self, spider):
self.file.close()
def process_item(self, item, spider):
line = json.dumps(dict(item), ensure_ascii=False) + "\n"
self.file.write(line)
return item
class DuplicatesPipeline:
"""去重 Pipeline"""
def __init__(self):
self.urls_seen = set()
def process_item(self, item, spider):
url = item.get("url", "")
if url in self.urls_seen:
raise scrapy.exceptions.DropItem(f"重复: {url}")
self.urls_seen.add(url)
return item
# 运行爬虫
if __name__ == "__main__":
process = CrawlerProcess(settings={
"ITEM_PIPELINES": {
"__main__.DuplicatesPipeline": 100,
"__main__.JsonWriterPipeline": 200,
},
})
process.crawl(ArticleSpider)
process.start()4. REST API 开发
4.1 FastAPI 实现
from fastapi import FastAPI, HTTPException, Query, Path, Body, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
import uuid
app = FastAPI(
title="用户管理 API",
description="用户 CRUD 示例 API",
version="1.0.0",
)
# CORS 配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ---- 数据模型 ----
class UserCreate(BaseModel):
name: str = Field(..., min_length=2, max_length=50, description="用户名")
email: str = Field(..., description="邮箱地址")
age: Optional[int] = Field(None, ge=0, le=150, description="年龄")
class UserUpdate(BaseModel):
name: Optional[str] = Field(None, min_length=2, max_length=50)
email: Optional[str] = None
age: Optional[int] = Field(None, ge=0, le=150)
class UserResponse(BaseModel):
id: str
name: str
email: str
age: Optional[int]
created_at: str
# ---- 模拟数据库 ----
db: dict[str, dict] = {}
# ---- API 路由 ----
@app.get("/", tags=["root"])
def root():
return {"message": "用户管理 API v1.0", "docs": "/docs"}
@app.get("/users", response_model=list[UserResponse], tags=["users"])
def list_users(
skip: int = Query(0, ge=0, description="跳过数量"),
limit: int = Query(10, ge=1, le=100, description="每页数量"),
name: Optional[str] = Query(None, description="按姓名搜索"),
):
"""获取用户列表"""
users = list(db.values())
if name:
users = [u for u in users if name.lower() in u["name"].lower()]
return users[skip: skip + limit]
@app.get("/users/{user_id}", response_model=UserResponse, tags=["users"])
def get_user(user_id: str = Path(..., description="用户 ID")):
"""获取单个用户"""
if user_id not in db:
raise HTTPException(status_code=404, detail="用户不存在")
return db[user_id]
@app.post("/users", response_model=UserResponse, status_code=201, tags=["users"])
def create_user(user: UserCreate):
"""创建用户"""
user_id = str(uuid.uuid4())[:8]
user_dict = {
"id": user_id,
"name": user.name,
"email": user.email,
"age": user.age,
"created_at": datetime.now().isoformat(),
}
db[user_id] = user_dict
return user_dict
@app.put("/users/{user_id}", response_model=UserResponse, tags=["users"])
def update_user(user_id: str, user: UserUpdate):
"""更新用户"""
if user_id not in db:
raise HTTPException(status_code=404, detail="用户不存在")
existing = db[user_id]
update_data = user.model_dump(exclude_unset=True)
existing.update(update_data)
return existing
@app.delete("/users/{user_id}", tags=["users"])
def delete_user(user_id: str):
"""删除用户"""
if user_id not in db:
raise HTTPException(status_code=404, detail="用户不存在")
del db[user_id]
return {"message": "已删除"}
# 启动: uvicorn main:app --reload --port 80005. WebSocket
import asyncio
import json
import websockets
from websockets.server import serve
# ---- WebSocket 服务端 ----
class WebSocketServer:
"""WebSocket 服务器"""
def __init__(self, host: str = "0.0.0.0", port: int = 8765):
self.host = host
self.port = port
self.clients: set = set()
async def handler(self, websocket):
"""处理客户端连接"""
self.clients.add(websocket)
remote = websocket.remote_address
print(f"客户端连接: {remote}")
try:
async for message in websocket:
await self._process_message(websocket, message)
except websockets.ConnectionClosed:
print(f"客户端断开: {remote}")
finally:
self.clients.discard(websocket)
async def _process_message(self, websocket, message: str):
"""处理消息"""
try:
data = json.loads(message)
msg_type = data.get("type", "message")
content = data.get("content", "")
if msg_type == "broadcast":
# 广播给所有客户端
await self.broadcast(json.dumps({
"type": "broadcast",
"from": str(websocket.remote_address),
"content": content,
}))
elif msg_type == "echo":
await websocket.send(json.dumps({
"type": "echo",
"content": content,
}))
else:
await websocket.send(json.dumps({
"type": "response",
"content": f"收到: {content}",
}))
except json.JSONDecodeError:
await websocket.send(json.dumps({"type": "error", "content": "无效 JSON"}))
async def broadcast(self, message: str):
"""广播消息"""
if self.clients:
await asyncio.gather(
*[client.send(message) for client in self.clients],
return_exceptions=True,
)
async def start(self):
"""启动服务器"""
print(f"WebSocket 服务器启动: ws://{self.host}:{self.port}")
async with serve(self.handler, self.host, self.port):
await asyncio.Future() # 永久运行
# ---- WebSocket 客户端 ----
class WebSocketClient:
"""WebSocket 客户端"""
def __init__(self, uri: str = "ws://localhost:8765"):
self.uri = uri
async def connect_and_chat(self):
"""连接并聊天"""
async with websockets.connect(self.uri) as ws:
print(f"已连接: {self.uri}")
# 发送消息
await ws.send(json.dumps({
"type": "message",
"content": "你好, WebSocket!",
}))
# 接收响应
response = await ws.recv()
print(f"收到: {response}")
async def listen(self):
"""持续监听"""
async with websockets.connect(self.uri) as ws:
async for message in ws:
data = json.loads(message)
print(f"[{data.get('type')}] {data.get('content')}")6. SSL/TLS 安全通信
import ssl
import socket
class TLSClient:
"""TLS/SSL 安全客户端"""
@staticmethod
def create_ssl_context(
ca_cert: str = None,
client_cert: str = None,
client_key: str = None,
verify: bool = True,
) -> ssl.SSLContext:
"""创建 SSL 上下文"""
context = ssl.create_default_context()
if ca_cert:
context.load_verify_locations(ca_cert)
elif not verify:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
if client_cert and client_key:
context.load_cert_chain(client_cert, client_key)
return context
@staticmethod
def secure_get(host: str, port: int = 443, path: str = "/"):
"""通过 TLS 发送 HTTP GET"""
context = ssl.create_default_context()
with socket.create_connection((host, port)) as sock:
with context.wrap_socket(sock, server_hostname=host) as secure_sock:
request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
secure_sock.sendall(request.encode())
response = b""
while True:
chunk = secure_sock.recv(4096)
if not chunk:
break
response += chunk
return response.decode()
@staticmethod
def get_cert_info(hostname: str, port: int = 443) -> dict:
"""获取服务器证书信息"""
context = ssl.create_default_context()
with socket.create_connection((hostname, port)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as secure_sock:
cert = secure_sock.getpeercert()
return {
"subject": dict(x[0] for x in cert.get("subject", ())),
"issuer": dict(x[0] for x in cert.get("issuer", ())),
"not_before": cert.get("notBefore"),
"not_after": cert.get("notAfter"),
"san": cert.get("subjectAltName", ()),
}7. 网络调试工具
import socket
import subprocess
from dataclasses import dataclass
@dataclass
class PortInfo:
"""端口信息"""
port: int
status: str # open, closed, filtered
service: str = ""
banner: str = ""
class NetworkTools:
"""网络调试工具集"""
@staticmethod
def port_scan(host: str, ports: list[int], timeout: float = 1.0) -> list[PortInfo]:
"""端口扫描"""
results = []
for port in ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
result = sock.connect_ex((host, port))
if result == 0:
# 尝试获取 banner
banner = ""
try:
sock.sendall(b"HEAD / HTTP/1.0\r\n\r\n")
banner = sock.recv(1024).decode(errors="ignore")[:100]
except Exception:
pass
results.append(PortInfo(
port=port, status="open",
service=NetworkTools._guess_service(port),
banner=banner,
))
else:
results.append(PortInfo(port=port, status="closed"))
except socket.timeout:
results.append(PortInfo(port=port, status="filtered"))
finally:
sock.close()
return results
@staticmethod
def _guess_service(port: int) -> str:
services = {
21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP",
53: "DNS", 80: "HTTP", 110: "POP3", 143: "IMAP",
443: "HTTPS", 3306: "MySQL", 5432: "PostgreSQL",
6379: "Redis", 8080: "HTTP-Alt", 8443: "HTTPS-Alt",
27017: "MongoDB",
}
return services.get(port, "unknown")
@staticmethod
def dns_lookup(domain: str) -> dict:
"""DNS 查询"""
try:
ipv4 = socket.getaddrinfo(domain, None, socket.AF_INET)
ipv6 = socket.getaddrinfo(domain, None, socket.AF_INET6)
return {
"domain": domain,
"ipv4": list(set(addr[4][0] for addr in ipv4)),
"ipv6": list(set(addr[4][0] for addr in ipv6)),
}
except socket.gaierror:
return {"domain": domain, "error": "DNS 解析失败"}
@staticmethod
def check_connectivity(host: str, port: int, timeout: float = 5.0) -> dict:
"""检查网络连通性"""
import time
start = time.time()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect((host, port))
latency = (time.time() - start) * 1000
return {"host": host, "port": port, "reachable": True, "latency_ms": round(latency, 2)}
except socket.timeout:
return {"host": host, "port": port, "reachable": False, "error": "timeout"}
except ConnectionRefusedError:
return {"host": host, "port": port, "reachable": False, "error": "refused"}
except Exception as e:
return {"host": host, "port": port, "reachable": False, "error": str(e)}
finally:
sock.close()8. 代理与隧道
import socket
import threading
from typing import Optional
class HTTPProxy:
"""简易 HTTP 代理服务器"""
def __init__(self, host: str = "0.0.0.0", port: int = 8080):
self.host = host
self.port = port
self.running = False
def start(self):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((self.host, self.port))
server.listen(50)
self.running = True
print(f"HTTP 代理启动: {self.host}:{self.port}")
try:
while self.running:
server.settimeout(1.0)
try:
client_socket, addr = server.accept()
threading.Thread(
target=self._handle,
args=(client_socket,),
daemon=True,
).start()
except socket.timeout:
continue
finally:
server.close()
def _handle(self, client: socket.socket):
"""处理代理请求"""
try:
request = client.recv(4096).decode()
if not request:
return
first_line = request.split("\n")[0]
method, url, _ = first_line.split()
if method == "CONNECT":
# HTTPS 隧道
self._handle_https_tunnel(client, url)
else:
# HTTP 转发
self._handle_http(client, method, url, request)
except Exception as e:
print(f"代理错误: {e}")
finally:
client.close()
def _handle_http(self, client: socket.socket, method: str, url: str, request: str):
"""处理 HTTP 请求"""
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname
port = parsed.port or 80
path = parsed.path or "/"
if parsed.query:
path += f"?{parsed.query}"
# 重写请求行
headers = request.split("\n")
headers[0] = f"{method} {path} HTTP/1.1\r"
# 转发到目标服务器
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote.connect((host, port))
remote.sendall("\n".join(headers).encode())
# 返回响应
while True:
data = remote.recv(4096)
if not data:
break
client.sendall(data)
remote.close()
def _handle_https_tunnel(self, client: socket.socket, url: str):
"""处理 HTTPS 隧道(CONNECT 方法)"""
host, port = url.split(":")
port = int(port)
remote = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
remote.connect((host, port))
client.sendall(b"HTTP/1.1 200 Connection Established\r\n\r\n")
# 双向数据转发
sockets = [client, remote]
while True:
import select
readable, _, _ = select.select(sockets, [], [], 10)
for sock in readable:
data = sock.recv(4096)
if not data:
return
if sock is client:
remote.sendall(data)
else:
client.sendall(data)
except Exception:
pass
finally:
remote.close()优点
- 简单易用: requests/httpx 大幅简化 HTTP 操作
- 库生态丰富: 覆盖各种协议和应用场景
- 异步支持: asyncio + aiohttp/httpx 实现高并发
- 爬虫框架: Scrapy 提供企业级爬虫解决方案
- API 开发: FastAPI 高性能自动文档生成
缺点
- 性能限制: Python 单线程 GIL 限制网络处理吞吐量
- Socket 复杂度: 底层 Socket 编程需要处理粘包、编解码等问题
- 反爬对抗: 爬虫需要应对验证码、JS 渲染、IP 封锁等
- 并发瓶颈: 高并发场景需要借助异步或多进程
性能注意事项
- 连接复用: 使用 Session/连接池避免频繁建立连接
- 异步并发: I/O 密集型场景使用 asyncio 提升吞吐
- DNS 缓存: 缓存 DNS 结果减少解析开销
- 合理超时: 设置连接超时和读取超时,避免无限等待
- 数据压缩: 启用 gzip/deflate 压缩减少传输量
总结
Python 网络编程覆盖从底层到高层的完整技术栈:
- Socket 基础: 理解 TCP/UDP 通信模型,掌握粘包处理
- HTTP 客户端: requests/httpx 满足各种 HTTP 请求场景
- Web 爬虫: BeautifulSoup 轻量爬取,Scrapy 企业级方案
- API 开发: FastAPI 高性能 REST API 开发
- WebSocket: 实时双向通信
- 安全通信: SSL/TLS 加密和证书管理
关键知识点
| 概念 | 说明 |
|---|---|
| Socket | 操作系统提供的网络通信接口 |
| TCP | 面向连接的可靠传输协议 |
| UDP | 无连接的不可靠传输协议 |
| HTTP | 超文本传输协议,Web 基础 |
| WebSocket | 全双工通信协议,支持实时推送 |
| SSL/TLS | 安全传输层协议,加密网络通信 |
| REST | 表述性状态转移,API 设计风格 |
常见误区
误区: TCP 不会丢数据
- TCP 保证不丢,但应用层可能因缓冲区溢出或处理错误丢失
- 解决: 使用长度前缀协议和确认机制
误区: requests 适合高并发
- requests 是同步库,高并发需要多线程或使用 httpx 异步
- 解决: 高并发场景使用 httpx + asyncio
误区: HTTPS 就绝对安全
- HTTPS 只加密传输,不验证服务端逻辑安全
- 解决: 结合证书固定、请求签名等额外安全措施
进阶路线
- 入门: 掌握 Socket 编程基础和 requests 库
- 进阶: 实现 HTTP 服务和爬虫系统
- 高级: 异步网络编程、WebSocket、SSL/TLS
- 专家: 网络协议实现、高性能代理服务器
适用场景
- Web API 客户端开发和集成
- 网络爬虫和数据采集
- REST API 和微服务开发
- 实时通信应用(IM、推送)
- 网络安全工具和代理
- IoT 设备通信
落地建议
- 优先高层库: 业务开发优先使用 requests/httpx,避免直接操作 Socket
- 统一客户端: 项目中使用统一的 HTTP 客户端封装,包含重试和日志
- 爬虫合规: 遵守 robots.txt,控制请求频率,尊重网站规则
- API 文档化: 使用 FastAPI 自动生成 API 文档
- 安全加固: 生产环境必须使用 HTTPS,启用证书验证
排错清单
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| ConnectionRefused | 目标服务未启动或端口错误 | 检查服务状态和端口 |
| ConnectionTimeout | 网络不通或防火墙拦截 | 检查网络和防火墙规则 |
| SSL 证书错误 | 自签名证书或证书过期 | 导入 CA 证书或关闭验证(测试) |
| 粘包/半包 | TCP 流式传输特性 | 使用长度前缀协议 |
| DNS 解析失败 | DNS 服务器问题或域名错误 | 检查域名,尝试更换 DNS |
| 请求被拒绝 | User-Agent 被拦截或频率限制 | 设置合理 UA,控制频率 |
复盘问题
- HTTP 请求的平均延迟和 P99 延迟是多少?
- 连接池的使用率是否合理?是否有连接泄漏?
- 爬虫的成功率和被封率分别是多少?
- API 接口的错误率和响应时间是否满足 SLA?
- WebSocket 连接的平均存活时间和断线率?
