文件操作与异常处理
大约 10 分钟约 2981 字
文件操作与异常处理
简介
文件 I/O 和异常处理是 Python 编程的基础能力。Python 提供了简洁的文件操作 API 和强大的异常处理机制,确保程序健壮地处理文件读写和运行时错误。
特点
文件读写
基本操作
# 写入文件
with open("data.txt", "w", encoding="utf-8") as f:
f.write("第一行\n")
f.write("第二行\n")
f.writelines(["第三行\n", "第四行\n"])
# 读取全部
with open("data.txt", "r", encoding="utf-8") as f:
content = f.read() # 读取全部内容
print(content)
# 逐行读取
with open("data.txt", "r", encoding="utf-8") as f:
for line in f: # 内存高效
print(line.strip())
# 读取所有行
with open("data.txt", "r", encoding="utf-8") as f:
lines = f.readlines() # 返回列表
# 追加写入
with open("data.txt", "a", encoding="utf-8") as f:
f.write("追加的内容\n")二进制文件
# 复制文件
with open("image.png", "rb") as src, open("copy.png", "wb") as dst:
while chunk := src.read(8192): # 8KB 分块读取
dst.write(chunk)
# 大文件读取(分块)
def read_large_file(filepath: str, chunk_size: int = 8192):
with open(filepath, "r", encoding="utf-8") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
# 统计文件信息
import os
stat = os.stat("data.txt")
print(f"文件大小: {stat.st_size} bytes")
print(f"修改时间: {stat.st_mtime}")pathlib 路径操作
现代路径处理
from pathlib import Path
# 创建 Path 对象
p = Path("data/reports/2024/report.txt")
# 路径属性
print(p.parent) # data/reports/2024
print(p.name) # report.txt
print(p.stem) # report
print(p.suffix) # .txt
print(p.parts) # ('data', 'reports', '2024', 'report.txt')
# 路径拼接(用 / 运算符)
base = Path("data")
file_path = base / "reports" / "2024" / "report.txt"
# 创建目录
output_dir = Path("output/2024")
output_dir.mkdir(parents=True, exist_ok=True)
# 遍历文件
for f in Path("data").glob("*.txt"):
print(f)
for f in Path("data").rglob("*.csv"): # 递归
print(f)
# 文件操作
p = Path("test.txt")
p.write_text("Hello, Path!", encoding="utf-8")
content = p.read_text(encoding="utf-8")
print(p.exists()) # True
print(p.is_file()) # TrueJSON 处理
JSON 读写
import json
# Python 对象 → JSON 字符串
data = {
"name": "张三",
"age": 30,
"skills": ["Python", "C#", "SQL"],
"address": {"city": "深圳", "zip": "518000"}
}
# 写入 JSON 文件
with open("data.json", "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# 读取 JSON 文件
with open("data.json", "r", encoding="utf-8") as f:
loaded = json.load(f)
# JSON 字符串
json_str = json.dumps(data, ensure_ascii=False, indent=2)
parsed = json.loads(json_str)
# 处理自定义类型
from datetime import datetime
class User:
def __init__(self, name, created_at):
self.name = name
self.created_at = created_at
def user_encoder(obj):
if isinstance(obj, User):
return {"name": obj.name, "created_at": obj.created_at.isoformat()}
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"不支持的类型: {type(obj)}")
user = User("张三", datetime.now())
json_str = json.dumps(user, default=user_encoder, ensure_ascii=False)CSV 处理
CSV 读写
import csv
# 写入 CSV
users = [
{"name": "张三", "age": 30, "city": "深圳"},
{"name": "李四", "age": 25, "city": "北京"},
{"name": "王五", "age": 35, "city": "上海"},
]
with open("users.csv", "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=["name", "age", "city"])
writer.writeheader()
writer.writerows(users)
# 读取 CSV
with open("users.csv", "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
print(f"{row['name']}, {row['age']}岁, {row['city']}")
# 按行读写
with open("data.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["姓名", "年龄"])
writer.writerow(["张三", 30])异常处理
try/except
# 基本异常处理
def divide(a: float, b: float) -> float:
try:
result = a / b
except ZeroDivisionError:
print("错误:除数不能为零")
return 0.0
except TypeError as e:
print(f"类型错误: {e}")
return 0.0
else:
# 没有异常时执行
print(f"计算成功: {result}")
return result
finally:
# 总是执行
print("计算结束")
# 自定义异常
class BusinessException(Exception):
def __init__(self, code: int, message: str):
self.code = code
self.message = message
super().__init__(f"[{code}] {message}")
class ValidationError(BusinessException):
pass
class NotFoundError(BusinessException):
pass
# 使用自定义异常
def get_user(user_id: int):
if user_id <= 0:
raise ValidationError(400, "用户ID必须为正整数")
# user = db.find(user_id)
# if not user:
# raise NotFoundError(404, f"用户 {user_id} 不存在")
return {"id": user_id, "name": "张三"}
# 异常链
try:
get_user(-1)
except ValidationError as e:
print(f"验证失败: {e}")
except BusinessException as e:
print(f"业务异常: {e.code} - {e.message}")上下文管理器异常处理
# 文件操作的最佳实践
def safe_write_file(filepath: str, content: str):
tmp_path = filepath + ".tmp"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
f.write(content)
# 写入成功后重命名(原子操作)
import os
os.replace(tmp_path, filepath)
except Exception as e:
# 清理临时文件
if os.path.exists(tmp_path):
os.remove(tmp_path)
raise
# 重试机制
import time
def retry(func, max_retries=3, delay=1.0):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"第 {attempt+1} 次失败,{delay}s 后重试: {e}")
time.sleep(delay)
delay *= 2 # 指数退避Excel 与 YAML 处理
Excel 读写(openpyxl)
# pip install openpyxl
from openpyxl import Workbook, load_workbook
from openpyxl.styles import Font, Alignment, PatternFill
# 写入 Excel
wb = Workbook()
ws = wb.active
ws.title = "用户数据"
# 表头样式
header_font = Font(bold=True, size=12, color="FFFFFF")
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
headers = ["姓名", "年龄", "城市", "薪资"]
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center")
# 数据行
data = [
("张三", 30, "深圳", 25000),
("李四", 25, "北京", 18000),
("王五", 35, "上海", 32000),
]
for row_idx, row_data in enumerate(data, 2):
for col_idx, value in enumerate(row_data, 1):
ws.cell(row=row_idx, column=col_idx, value=value)
wb.save("users.xlsx")
# 读取 Excel
wb = load_workbook("users.xlsx")
ws = wb["用户数据"]
for row in ws.iter_rows(min_row=2, values_only=True):
name, age, city, salary = row
print(f"{name}, {age}岁, {city}, 薪资 {salary}")YAML 配置读写
# pip install pyyaml
import yaml
from pathlib import Path
# YAML 数据结构
config = {
"database": {
"host": "localhost",
"port": 5432,
"name": "mydb",
},
"cache": {
"backend": "redis",
"url": "redis://localhost:6379/0",
"ttl": 3600,
},
"features": {
"dark_mode": False,
"beta_features": True,
},
}
# 写入 YAML
with open("config.yaml", "w", encoding="utf-8") as f:
yaml.dump(config, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
# 读取 YAML
with open("config.yaml", "r", encoding="utf-8") as f:
loaded_config = yaml.safe_load(f)
print(f"数据库: {loaded_config['database']['host']}")
print(f"缓存: {loaded_config['cache']['backend']}")文件系统操作
目录遍历与文件搜索
from pathlib import Path
import os
# 1. 列出所有 Python 文件
py_files = list(Path(".").rglob("*.py"))
print(f"找到 {len(py_files)} 个 Python 文件")
# 2. 按大小过滤
large_files = [f for f in Path("./logs").glob("*.log") if f.stat().st_size > 1024 * 1024]
for f in large_files:
print(f"大文件: {f} ({f.stat().st_size / 1024 / 1024:.1f} MB)")
# 3. 按修改时间过滤
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(days=7)
recent_files = [
f for f in Path("./data").rglob("*")
if f.is_file() and datetime.fromtimestamp(f.stat().st_mtime) > cutoff
]
print(f"最近 7 天修改的文件: {len(recent_files)} 个")
# 4. 目录统计
def dir_stats(directory: str) -> dict:
"""统计目录下文件信息"""
total_size = 0
file_count = 0
extensions = {}
for f in Path(directory).rglob("*"):
if f.is_file():
file_count += 1
size = f.stat().st_size
total_size += size
ext = f.suffix.lower() or "(无后缀)"
extensions[ext] = extensions.get(ext, 0) + 1
return {
"total_size": total_size,
"total_files": file_count,
"extensions": dict(sorted(extensions.items(), key=lambda x: -x[1])),
}
stats = dir_stats(".")
print(f"总大小: {stats['total_size'] / 1024 / 1024:.1f} MB")
print(f"文件数: {stats['total_files']}")
print(f"类型分布: {stats['extensions']}")安全文件操作
import os
import shutil
import hashlib
from pathlib import Path
def safe_copy(src: str, dst: str):
"""安全复制文件(保留元数据)"""
shutil.copy2(src, dst)
def safe_move(src: str, dst: str):
"""安全移动文件(跨文件系统兼容)"""
shutil.move(src, dst)
def atomic_write(filepath: str, content: str):
"""原子写入:先写临时文件,再重命名"""
tmp_path = filepath + f".tmp.{os.getpid()}"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
f.write(content)
f.flush()
os.fsync(f.fileno()) # 确保写入磁盘
os.replace(tmp_path, filepath) # 原子操作
except Exception:
if os.path.exists(tmp_path):
os.remove(tmp_path)
raise
def file_hash(filepath: str, algorithm: str = "sha256") -> str:
"""计算文件哈希值"""
hasher = hashlib.new(algorithm)
with open(filepath, "rb") as f:
while chunk := f.read(8192):
hasher.update(chunk)
return hasher.hexdigest()
def ensure_directory(path: str):
"""确保目录存在"""
Path(path).mkdir(parents=True, exist_ok=True)
def cleanup_old_files(directory: str, max_age_days: int = 30,
pattern: str = "*"):
"""清理过期文件"""
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(days=max_age_days)
removed = 0
for f in Path(directory).glob(pattern):
if f.is_file():
mtime = datetime.fromtimestamp(f.stat().st_mtime)
if mtime < cutoff:
f.unlink()
removed += 1
return removed
# 使用
# atomic_write("config.json", json.dumps(config))
# print(file_hash("config.json")) # 文件完整性校验
# removed = cleanup_old_files("logs", max_age_days=7, pattern="*.log")临时文件与内存文件
import tempfile
import io
# 1. 临时文件(自动清理)
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=True) as tmp:
tmp.write("临时内容")
tmp_path = tmp.name
print(f"临时文件路径: {tmp_path}")
# 文件在 with 块结束后自动删除
# 2. 临时目录
with tempfile.TemporaryDirectory() as tmp_dir:
print(f"临时目录: {tmp_dir}")
Path(tmp_dir, "test.txt").write_text("测试")
# 目录在 with 块结束后自动删除
# 3. 内存文件(StringIO / BytesIO)
output = io.StringIO()
output.write("第一行\n")
output.write("第二行\n")
output.seek(0)
content = output.read()
print(content)
# 模拟文件上传处理
def process_upload(file_content: bytes, filename: str):
"""处理上传的文件内容"""
stream = io.BytesIO(file_content)
# 可以像文件一样操作
data = stream.read()
print(f"处理文件 {filename},大小: {len(data)} bytes")
stream.close()并发文件操作
并行文件处理
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
def count_lines(filepath: str) -> dict:
"""统计文件行数"""
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
lines = sum(1 for _ in f)
return {"file": filepath, "lines": lines}
def parallel_line_count(directory: str, pattern: str = "*.py",
max_workers: int = 4) -> dict:
"""并行统计目录下文件行数"""
files = [str(f) for f in Path(directory).rglob(pattern)]
total_lines = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(count_lines, f): f for f in files}
for future in as_completed(futures):
result = future.result()
total_lines += result["lines"]
return {"total_files": len(files), "total_lines": total_lines}
# 批量重命名
def batch_rename(directory: str, old_pattern: str, new_pattern: str):
"""批量重命名文件"""
renamed = 0
for filepath in Path(directory).glob(old_pattern):
new_name = filepath.name.replace(old_pattern.lstrip("*"), new_pattern)
new_path = filepath.parent / new_name
filepath.rename(new_path)
renamed += 1
return renamed优点
缺点
总结
文件操作核心:with open() 确保资源释放,pathlib 替代 os.path 做路径处理。JSON 用 json.dump/load 处理文件,json.dumps/loads 处理字符串。异常处理用 try/except/else/finally,自定义异常继承 Exception。重试机制用指数退避。大文件用分块读取或生成器避免内存溢出。
关键知识点
- 先区分这篇内容属于语法能力、工程能力,还是生态工具能力。
- Python 的开发效率来自生态,但可维护性来自结构、测试和规范。
- 脚本一旦进入长期维护,就必须按项目来治理。
- 框架与语言特性类主题要同时理解运行方式和工程组织方式。
项目落地视角
- 统一虚拟环境、依赖锁定、格式化和日志方案。
- 把入口、配置、业务逻辑和工具函数拆开,避免单文件膨胀。
- 对网络请求、文件读写和数据处理结果做异常与样本校验。
- 明确项目入口、配置管理、依赖管理、日志和测试策略。
常见误区
- 把临时脚本直接当生产代码使用。
- 忽略依赖版本、编码、路径和时区差异。
- 只会写 happy path,没有补超时、重试和资源释放。
- 把 notebook 或脚本风格直接带入长期维护项目。
进阶路线
- 把类型注解、测试、打包和部署纳入统一工程流程。
- 继续向异步、性能、数据管线和框架源码层深入。
- 把常用脚本抽成可复用库或 CLI 工具,而不是复制粘贴。
- 继续补齐部署、打包、监控和性能调优能力。
适用场景
- 当你准备把《文件操作与异常处理》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合脚本自动化、数据处理、Web 开发和测试工具建设。
- 当需求强调快速迭代和丰富生态时,Python 往往能快速起步。
落地建议
- 统一使用虚拟环境与依赖锁定,避免环境漂移。
- 对核心函数补类型注解、异常处理和日志,减少“脚本黑盒”。
- 一旦脚本进入生产链路,及时补测试和监控。
排错清单
- 先确认当前解释器、虚拟环境和依赖版本是否正确。
- 检查编码、路径、时区和第三方库行为差异。
- 排查同步阻塞、数据库连接未释放或网络请求无超时。
复盘问题
- 如果把《文件操作与异常处理》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《文件操作与异常处理》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《文件操作与异常处理》最大的收益和代价分别是什么?
