Python 安全编码实践
大约 15 分钟约 4564 字
Python 安全编码实践
简介
Python 安全编码是指在 Python 应用开发中遵循安全最佳实践,防止 SQL 注入、XSS、CSRF、命令注入、反序列化攻击等常见安全漏洞。安全问题往往不是刻意为之,而是开发者在不知不觉中引入的——一个未过滤的用户输入、一个不安全的反序列化调用、一个暴露的调试接口,都可能成为攻击者的突破口。
安全编码的核心原则是不信任任何外部输入(Zero Trust Input)。所有来自用户的输入、HTTP 请求头、数据库读取的数据、第三方 API 的返回值,在验证和清理之前都应视为不可信。这一原则贯穿所有安全防御策略。
OWASP Top 10 是 Web 应用安全的风险清单,包括注入攻击、失效的身份认证、敏感数据泄露、XML 外部实体注入(XXE)、失效的访问控制、安全配置错误、跨站脚本攻击(XSS)、不安全的反序列化、使用含有已知漏洞的组件、日志与监控不足。每个风险在 Python 应用中都有对应的防御方案。
特点
SQL 注入防御
# ============================================
# SQL 注入: 最常见的 Web 安全漏洞
# 攻击者通过构造恶意输入篡改 SQL 语句
# ============================================
import pymysql
from typing import Optional
# === 错误示范(永远不要这样做!)===
def unsafe_query(username: str):
"""危险的字符串拼接 SQL"""
conn = pymysql.connect(host='localhost', user='root', password='pass', db='app')
cursor = conn.cursor()
# 攻击: username = "admin' OR '1'='1' --"
# 实际执行: SELECT * FROM users WHERE username='admin' OR '1'='1' --'
# 结果: 返回所有用户数据!
cursor.execute(f"SELECT * FROM users WHERE username='{username}'")
# 更危险的攻击: username = "'; DROP TABLE users; --"
# 实际执行: SELECT * FROM users WHERE username=''; DROP TABLE users; --'
return cursor.fetchall()
# === 正确做法 ===
def safe_query_parameterized(username: str):
"""使用参数化查询(推荐)"""
conn = pymysql.connect(host='localhost', user='root', password='pass', db='app')
cursor = conn.cursor()
# 参数化查询:数据库驱动自动处理转义
cursor.execute(
"SELECT * FROM users WHERE username = %s",
(username,)
)
return cursor.fetchall()
def safe_query_with_sqlalchemy(username: str):
"""使用 ORM(更安全,推荐)"""
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
engine = create_engine("mysql+pymysql://root:pass@localhost/app")
with Session(engine) as session:
# 使用 text() + 命名参数
result = session.execute(
text("SELECT * FROM users WHERE username = :username"),
{"username": username}
)
return result.fetchall()
def safe_like_query(search_term: str):
"""安全的 LIKE 查询"""
conn = pymysql.connect(host='localhost', user='root', password='pass', db='app')
cursor = conn.cursor()
# LIKE 查询也需要参数化
cursor.execute(
"SELECT * FROM products WHERE name LIKE %s",
(f"%{search_term}%",)
)
return cursor.fetchall()
def safe_in_query(ids: list):
"""安全的 IN 查询(动态参数数量)"""
conn = pymysql.connect(host='localhost', user='root', password='pass', db='app')
cursor = conn.cursor()
# 动态生成占位符
placeholders = ", ".join(["%s"] * len(ids))
cursor.execute(
f"SELECT * FROM products WHERE id IN ({placeholders})",
tuple(ids)
)
return cursor.fetchall()
# === 输入验证(额外的防御层)===
import re
def validate_username(username: str) -> bool:
"""验证用户名格式"""
if not username or len(username) > 50:
return False
# 只允许字母、数字、下划线
return bool(re.match(r'^[a-zA-Z0-9_]+$', username))
def validate_id(id_str: str) -> Optional[int]:
"""验证 ID 参数"""
try:
id_val = int(id_str)
if id_val > 0:
return id_val
except (ValueError, TypeError):
pass
return NoneXSS 防御
# ============================================
# XSS (Cross-Site Scripting): 跨站脚本攻击
# 攻击者注入恶意 JavaScript 到网页中
# ============================================
import html
import re
from markupsafe import escape, Markup
# === 1. HTML 转义 ===
def safe_html_output(user_input: str) -> str:
"""安全输出 HTML — 转义特殊字符"""
# 攻击: user_input = "<script>alert('xss')</script>"
# 转义后: <script>alert('xss')</script>
# 方式 A: html.escape()
safe = html.escape(user_input, quote=True)
# 方式 B: markupsafe.escape()(推荐,Jinja2 内部使用)
safe = str(escape(user_input))
return safe
# === 2. Jinja2 自动转义(推荐)===
"""
from jinja2 import Environment, FileSystemLoader
# 自动转义 HTML 文件
env = Environment(
loader=FileSystemLoader('templates'),
autoescape=True # 关键: 自动转义所有变量
)
template = env.get_template('page.html')
# {{ user_input }} 自动转义
# {{ user_input | safe }} 标记为安全(仅在确认安全时使用)
"""
# === 3. 富文本过滤 ===
import bleach
def sanitize_html(user_html: str) -> str:
"""清理用户提交的 HTML(富文本场景)"""
allowed_tags = ['b', 'i', 'u', 'em', 'strong', 'a', 'p', 'br', 'ul', 'ol', 'li']
allowed_attributes = {
'a': ['href', 'title'],
}
cleaned = bleach.clean(
user_html,
tags=allowed_tags,
attributes=allowed_attributes,
strip=True,
)
return cleaned
# === 4. Content Security Policy (CSP) ===
def add_csp_headers(response):
"""添加 CSP 响应头"""
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' 'nonce-{random_nonce}'; "
"style-src 'self'; "
"img-src 'self' data:; "
"connect-src 'self'; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self'"
)
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
return response
# === 5. URL 跳转安全 ===
from urllib.parse import urlparse
def safe_redirect(url: str, allowed_hosts: list = None) -> str:
"""安全的 URL 重定向(防止开放重定向攻击)"""
if not url:
return '/'
parsed = urlparse(url)
# 只允许相对路径或白名单域名
if parsed.netloc:
if allowed_hosts and parsed.netloc in allowed_hosts:
return url
return '/'
# 检查是否以 / 开头(防止 //evil.com)
if not url.startswith('/') or url.startswith('//'):
return '/'
return urlCSRF 防护
# ============================================
# CSRF (Cross-Site Request Forgery): 跨站请求伪造
# 攻击者诱导用户在已认证的网站上执行非预期操作
# ============================================
import secrets
import hmac
import hashlib
from functools import wraps
# === 1. CSRF Token 生成与验证 ===
class CSRFProtection:
"""CSRF 防护"""
def __init__(self, secret_key: str):
self.secret_key = secret_key.encode('utf-8')
def generate_token(self, session_id: str) -> str:
"""生成 CSRF Token"""
# 使用 HMAC 绑定 session_id
message = session_id.encode('utf-8')
token = hmac.new(self.secret_key, message, hashlib.sha256).hexdigest()
return token
def validate_token(self, token: str, session_id: str) -> bool:
"""验证 CSRF Token"""
expected = self.generate_token(session_id)
# 使用恒定时间比较防止时序攻击
return hmac.compare_digest(token, expected)
# === 2. FastAPI CSRF 中间件 ===
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import HTMLResponse
app = FastAPI()
csrf = CSRFProtection(secret_key="your-secret-key-min-32-chars!!")
def get_session_id(request: Request) -> str:
return request.cookies.get("session_id", "")
async def verify_csrf(request: Request, session_id: str = Depends(get_session_id)):
"""CSRF 验证依赖"""
# GET/HEAD/OPTIONS 不需要 CSRF 验证
if request.method in ("GET", "HEAD", "OPTIONS"):
return True
token = request.headers.get("X-CSRF-Token") or \
(await request.form()).get("csrf_token")
if not token or not csrf.validate_token(token, session_id):
raise HTTPException(status_code=403, detail="CSRF token 验证失败")
return True
@app.post("/api/transfer")
async def transfer_money(csrf_ok: bool = Depends(verify_csrf)):
return {"status": "success"}
# === 3. SameSite Cookie ===
def set_secure_cookies(response):
"""设置安全的 Cookie"""
response.set_cookie(
key="session_id",
value="session-token-here",
httponly=True, # JavaScript 不可访问
secure=True, # 仅 HTTPS
samesite="Strict", # 或 "Lax",防止 CSRF
max_age=3600,
path="/",
)
return response输入验证
# ============================================
# 输入验证: 安全的第一道防线
# ============================================
from pydantic import BaseModel, EmailStr, validator, Field, constr
import re
# === 1. Pydantic 模型验证(推荐)===
class UserRegistration(BaseModel):
"""用户注册模型 — 内置输入验证"""
username: constr(min_length=3, max_length=50, regex=r'^[a-zA-Z0-9_]+$')
email: EmailStr
password: constr(min_length=8, max_length=128)
phone: constr(regex=r'^1[3-9]\d{9}$')
age: int = Field(ge=0, le=150)
@validator('password')
def validate_password_strength(cls, v):
"""密码强度验证"""
if not re.search(r'[A-Z]', v):
raise ValueError('密码必须包含大写字母')
if not re.search(r'[a-z]', v):
raise ValueError('密码必须包含小写字母')
if not re.search(r'\d', v):
raise ValueError('密码必须包含数字')
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError('密码必须包含特殊字符')
return v
# === 2. 文件上传验证 ===
import os
from pathlib import Path
ALLOWED_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.pdf'}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
def validate_file_upload(filename: str, content: bytes) -> dict:
"""验证上传文件的安全性"""
errors = []
# 检查文件扩展名
ext = Path(filename).suffix.lower()
if ext not in ALLOWED_EXTENSIONS:
errors.append(f"不支持的文件类型: {ext}")
# 检查文件大小
if len(content) > MAX_FILE_SIZE:
errors.append(f"文件过大: {len(content) / 1024 / 1024:.1f}MB > 10MB")
# 检查文件名(防止路径遍历)
if '..' in filename or '/' in filename or '\\' in filename:
errors.append("文件名包含非法字符")
# 检查 Magic Number(真正的文件类型)
magic_numbers = {
b'\xff\xd8\xff': 'jpeg',
b'\x89PNG': 'png',
b'GIF8': 'gif',
b'%PDF': 'pdf',
}
detected_type = None
for magic, file_type in magic_numbers.items():
if content.startswith(magic):
detected_type = file_type
break
if detected_type is None:
errors.append("无法识别文件类型(Magic Number 检查失败)")
return {"valid": len(errors) == 0, "errors": errors}
def safe_save_file(filename: str, upload_dir: str, content: bytes) -> str:
"""安全保存上传文件"""
# 生成安全的文件名(防止路径遍历)
safe_name = os.path.basename(filename)
unique_name = f"{secrets.token_hex(8)}_{safe_name}"
save_path = os.path.join(upload_dir, unique_name)
# 确保保存路径在 upload_dir 内
real_path = os.path.realpath(save_path)
real_dir = os.path.realpath(upload_dir)
if not real_path.startswith(real_dir):
raise ValueError("路径遍历攻击检测")
with open(real_path, 'wb') as f:
f.write(content)
return unique_name
# === 3. 命令注入防御 ===
import subprocess
def unsafe_command(user_input: str):
"""危险!命令注入漏洞"""
# 攻击: user_input = "test; rm -rf /"
os.system(f"ping {user_input}") # 绝对不要这样做!
def safe_command(host: str):
"""安全: 使用参数列表"""
# 验证输入
if not re.match(r'^[a-zA-Z0-9.\-]+$', host):
raise ValueError("无效的主机名")
result = subprocess.run(
["ping", "-c", "1", host], # 参数列表,不经过 shell
capture_output=True,
text=True,
timeout=5,
)
return result.stdout安全 HTTP 请求
# ============================================
# 安全的 HTTP 请求实践
# ============================================
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class SecureHTTPClient:
"""安全的 HTTP 客户端"""
def __init__(self, timeout: int = 30, verify_ssl: bool = True):
self.session = requests.Session()
# 配置重试策略
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount("https://", adapter)
# 安全配置
self.timeout = timeout
self.verify_ssl = verify_ssl
# 安全头
self.session.headers.update({
'User-Agent': 'MyApp/1.0',
'Accept': 'application/json',
})
def get(self, url: str, params: dict = None) -> dict:
"""安全的 GET 请求"""
# URL 验证
self._validate_url(url)
response = self.session.get(
url,
params=params,
timeout=self.timeout,
verify=self.verify_ssl, # SSL 证书验证
)
response.raise_for_status()
return response.json()
def post(self, url: str, data: dict = None) -> dict:
"""安全的 POST 请求"""
self._validate_url(url)
response = self.session.post(
url,
json=data, # 使用 json 参数自动设置 Content-Type
timeout=self.timeout,
verify=self.verify_ssl,
)
response.raise_for_status()
return response.json()
def _validate_url(self, url: str):
"""URL 安全验证"""
from urllib.parse import urlparse
parsed = urlparse(url)
# 只允许 HTTPS
if parsed.scheme not in ('https', 'http'):
raise ValueError(f"不支持的协议: {parsed.scheme}")
# 防止 SSRF: 禁止内网地址
hostname = parsed.hostname
if hostname in ('localhost', '127.0.0.1', '0.0.0.0',
'169.254.169.254'): # AWS 元数据
raise ValueError(f"禁止访问内部地址: {hostname}")
if hostname and hostname.startswith(('10.', '172.', '192.168.')):
raise ValueError(f"禁止访问私有网络: {hostname}")
def close(self):
self.session.close()
# === 密码安全 ===
import bcrypt
import secrets
def hash_password(password: str) -> str:
"""安全密码哈希(使用 bcrypt)"""
salt = bcrypt.gensalt(rounds=12) # 计算轮数,越高越安全但越慢
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
"""验证密码"""
return bcrypt.checkpw(
password.encode('utf-8'),
hashed.encode('utf-8')
)
def generate_api_key() -> str:
"""生成 API Key"""
return secrets.token_urlsafe(32)
def generate_secret() -> str:
"""生成安全密钥"""
return secrets.token_hex(32)依赖安全扫描
# ============================================
# 依赖安全扫描
# ============================================
"""
# 1. safety — 检查已知漏洞
pip install safety
safety check --requirements requirements.txt
safety check --json # JSON 格式输出
# 2. pip-audit — PyPA 官方审计工具
pip install pip-audit
pip-audit # 扫描当前环境
pip-audit -r requirements.txt # 扫描指定文件
# 3. bandit — Python 代码安全扫描
pip install bandit
bandit -r src/ # 扫描 src 目录
bandit -f json -o report.json src/ # JSON 报告
# 4. 在 CI/CD 中集成
# .github/workflows/security.yml
name: Security Scan
on: [push, pull_request]
jobs:
security:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: 依赖漏洞扫描
run: |
pip install safety pip-audit
safety check --full-report
pip-audit
- name: 代码安全扫描
run: |
pip install bandit
bandit -r src/ -ll
"""
# requirements.txt 安全模板
"""
# 固定版本号,避免不兼容更新
Flask==3.0.0
SQLAlchemy==2.0.23
PyJWT==2.8.0
bcrypt==4.1.2
requests==2.31.0
pydantic==2.5.3
# 定期运行: pip-audit -r requirements.txt
"""Secrets 管理
# ============================================
# Secrets 管理最佳实践
# ============================================
import os
from pathlib import Path
# === 1. 环境变量(最基本的方式)===
class Config:
"""安全配置管理"""
# 从环境变量读取,不硬编码
DATABASE_URL = os.environ.get('DATABASE_URL', '')
SECRET_KEY = os.environ.get('SECRET_KEY', '')
API_KEY = os.environ.get('API_KEY', '')
@classmethod
def validate(cls):
"""验证必要配置是否存在"""
required = ['DATABASE_URL', 'SECRET_KEY', 'API_KEY']
missing = [key for key in required if not getattr(cls, key)]
if missing:
raise EnvironmentError(
f"缺少必要的环境变量: {', '.join(missing)}"
)
# === 2. .env 文件(开发环境)===
from dotenv import load_dotenv
# 加载 .env 文件(不要提交到 Git!)
load_dotenv()
# .gitignore 中添加:
# .env
# .env.local
# .env.production
# === 3. 使用 HashiCorp Vault(生产环境)===
"""
import hvac
client = hvac.Client(url='https://vault.company.com:8200')
client.auth.approle.login(
role_id=os.environ['VAULT_ROLE_ID'],
secret_id=os.environ['VAULT_SECRET_ID'],
)
# 读取密钥
secret = client.secrets.kv.v2.read_secret_version(
path='database/credentials',
)
db_password = secret['data']['data']['password']
"""
# === 4. 绝对不要做的事 ===
# 错误 1: 硬编码密钥
# API_KEY = "sk-1234567890abcdef" # 绝对禁止!
# 错误 2: 在代码中提交密码
# DB_PASSWORD = "MyP@ssw0rd" # 绝对禁止!
# 错误 3: 在日志中打印敏感信息
# print(f"连接数据库: {db_password}") # 绝对禁止!
# === 5. 日志脱敏 ===
import re
def sanitize_log(message: str) -> str:
"""日志脱敏"""
# 脱敏密码
message = re.sub(
r'(password|passwd|pwd|secret|token|key)=\S+',
r'\1=***REDACTED***',
message,
flags=re.IGNORECASE
)
# 脱敏手机号
message = re.sub(r'1[3-9]\d{9}', lambda m: m.group()[:3] + '****' + m.group()[-4:], message)
# 脱敏邮箱
message = re.sub(
r'[\w.-]+@[\w.-]+\.\w+',
lambda m: m.group()[0] + '***@' + m.group().split('@')[1],
message
)
return message反序列化安全
# ============================================
# 反序列化安全: pickle 是最危险的标准库之一
# ============================================
import pickle # 危险!
import json # 安全
import yaml # 需要注意
# === pickle 反序列化攻击 ===
# 恶意 pickle 数据可以执行任意代码!
# 永远不要对不受信任的数据使用 pickle.loads()
class UnsafeDeserialization:
"""错误示范"""
@staticmethod
def dangerous_pickle_load(data: bytes):
"""极其危险!攻击者可以执行任意系统命令"""
# 攻击 payload:
# cos\nsystem\n(S'rm -rf /'\ntR.
return pickle.loads(data) # 绝对不要这样做!
# === 安全的替代方案 ===
class SafeDeserialization:
"""安全反序列化"""
@staticmethod
def use_json(data: str):
"""JSON 反序列化(安全)"""
return json.loads(data)
@staticmethod
def safe_yaml_load(data: str):
"""安全的 YAML 加载"""
# yaml.load(data) 不安全!必须使用 safe_load
return yaml.safe_load(data)
@staticmethod
def safe_pickle_if_needed(data: bytes):
"""如果必须使用 pickle,限制反序列化能力"""
import io
class RestrictedUnpickler(pickle.Unpickler):
ALLOWED_CLASSES = {
'builtins': {'dict', 'list', 'set', 'tuple', 'str', 'int', 'float', 'bool'},
}
def find_class(self, module, name):
if module in self.ALLOWED_CLASSES:
if name in self.ALLOWED_CLASSES[module]:
return getattr(__import__(module), name)
raise pickle.UnpicklingError(
f"不允许反序列化: {module}.{name}"
)
return RestrictedUnpickler(io.BytesIO(data)).load()安全测试
# ============================================
# 安全测试实践
# ============================================
import pytest
from fastapi.testclient import TestClient
class TestSecurity:
"""安全测试用例"""
def test_sql_injection_prevention(self, client: TestClient):
"""测试 SQL 注入防御"""
malicious_inputs = [
"admin' OR '1'='1",
"'; DROP TABLE users; --",
"1 UNION SELECT * FROM passwords --",
"admin'/*",
]
for payload in malicious_inputs:
response = client.post("/api/login", json={
"username": payload,
"password": "test123"
})
assert response.status_code in (400, 401)
# 确保没有返回数据库错误信息
assert "SQL" not in response.text
assert "syntax" not in response.text
def test_xss_prevention(self, client: TestClient):
"""测试 XSS 防御"""
xss_payloads = [
"<script>alert('xss')</script>",
"<img src=x onerror=alert(1)>",
"javascript:alert(1)",
"<svg onload=alert(1)>",
]
for payload in xss_payloads:
response = client.post("/api/comments", json={
"content": payload
})
if response.status_code == 200:
data = response.json()
# 确保输出被转义
assert "<script>" not in str(data)
assert "onerror" not in str(data)
def test_csrf_protection(self, client: TestClient):
"""测试 CSRF 防护"""
# 没有 CSRF token 的 POST 应该被拒绝
response = client.post("/api/transfer", json={
"amount": 100,
"to": "attacker"
})
assert response.status_code == 403
def test_authentication_required(self, client: TestClient):
"""测试认证保护"""
protected_endpoints = [
("/api/user/profile", "GET"),
("/api/user/update", "PUT"),
("/api/admin/users", "GET"),
]
for endpoint, method in protected_endpoints:
response = getattr(client, method.lower())(endpoint)
assert response.status_code == 401
def test_rate_limiting(self, client: TestClient):
"""测试限流"""
for i in range(100):
response = client.post("/api/login", json={
"username": "test", "password": "wrong"
})
# 第 101 次应该被限流
response = client.post("/api/login", json={
"username": "test", "password": "wrong"
})
assert response.status_code == 429
def test_file_upload_security(self, client: TestClient):
"""测试文件上传安全"""
# 上传 .php 文件
response = client.post("/api/upload", files={
"file": ("shell.php", b"<?php system($_GET['cmd']); ?>", "application/php")
})
assert response.status_code == 400
# 路径遍历
response = client.post("/api/upload", files={
"file": ("../../../etc/passwd", b"root:x:0:0", "text/plain")
})
assert response.status_code == 400优点
缺点
性能注意事项
- bcrypt 轮数:rounds=12 是安全与性能的平衡点,每增加 1 轮耗时翻倍
- CSRF 验证:HMAC 计算非常快,对请求延迟影响可忽略
- 输入验证:Pydantic 验证的开销通常小于 1ms
- 安全扫描:safety/pip-audit 在 CI 中运行耗时约 10-30 秒
- 日志脱敏:正则替换对高吞吐日志可能有性能影响,考虑异步处理
- SSL 验证:HTTPS 比 HTTP 多一次 TLS 握手,约增加 50-100ms
总结
Python 安全编码的核心是不信任外部输入、使用参数化查询、验证所有输入、管理好密钥、定期扫描依赖。安全不是一个功能,而是一种贯穿开发全流程的意识。通过 Pydantic 输入验证、参数化查询、bcrypt 密码哈希、CSRF Token、安全 HTTP 客户端等手段,可以防御绝大多数 Web 安全威胁。
关键知识点
- SQL 注入 — 使用参数化查询,永远不拼接 SQL
- XSS — HTML 转义、CSP 头、Jinja2 自动转义
- CSRF — Token 验证、SameSite Cookie
- 输入验证 — Pydantic 模型验证、文件类型检查
- 密码安全 — bcrypt 哈希、恒定时间比较
- Secrets 管理 — 环境变量、Vault、禁止硬编码
- 反序列化 — 避免 pickle,使用 JSON/yaml.safe_load
- 依赖安全 — safety、pip-audit、bandit 定期扫描
常见误区
- "我的代码不需要安全":任何面向网络的应用都需要安全措施
- "框架会帮我处理安全":框架提供工具,但不保证正确使用
- "HTTPS 就够了":HTTPS 只保护传输,不防止注入和 XSS
- "小项目不需要安全":攻击者是自动化扫描,不区分项目大小
- "只关注代码安全":忽视了部署配置、日志、监控等环节
- "一次审计够了":安全是持续过程,新漏洞每天出现
进阶路线
- 入门:参数化查询、输入验证、密码哈希
- 进阶:CSRF 防护、XSS 防御、安全 HTTP 客户端
- 高级:依赖扫描、Secrets 管理、安全测试自动化
- 专家:安全架构设计、威胁建模、渗透测试
适用场景
- 所有面向互联网的 Web 应用
- 处理用户敏感数据的系统
- 金融、医疗等高安全要求行业
- 微服务之间的安全通信
- 开源项目的安全最佳实践
落地建议
- 第一步:对所有数据库查询改用参数化查询
- 第二步:引入 Pydantic 做输入验证
- 第三步:配置 CSRF 和 XSS 防护
- 第四步:密码使用 bcrypt 哈希存储
- 第五步:集成 safety/bandit 到 CI/CD
- 持续:定期更新依赖,关注安全公告
排错清单
复盘问题
- 上次安全审计发现了哪些问题?是否都已修复?
- 当前依赖是否有已知漏洞?运行 pip-audit 检查
- 最近是否有安全事件或告警?
- 团队成员的安全意识如何?是否需要培训?
- 密钥轮换上次是什么时候?
- 安全测试覆盖率是多少?
