函数式编程思想
大约 12 分钟约 3477 字
函数式编程思想
简介
函数式编程(FP)强调不可变数据和纯函数。Python 虽然不是纯函数式语言,但支持闭包、高阶函数、生成器和不可变数据结构等函数式特性。理解函数式编程思想,有助于编写更简洁、可测试和可组合的代码。
特点
纯函数与不可变性
核心原则
from dataclasses import dataclass, replace
from typing import TypeVar, Callable, Generic
from functools import reduce, partial
import operator
# 纯函数原则:
# 1. 相同输入总是返回相同输出
# 2. 没有副作用(不修改外部状态)
# 3. 引用透明(可被返回值替换)
# ❌ 不纯函数(有副作用)
total = 0
def impure_add(n):
global total
total += n # 修改外部状态
return total
# ✅ 纯函数
def pure_add(a, b):
return a + b
# 不可变数据 — 使用 frozen dataclass
@dataclass(frozen=True)
class User:
id: int
name: str
email: str
age: int
# "修改" 返回新实例
def update_user_age(user: User, new_age: int) -> User:
return replace(user, age=new_age)
user = User(id=1, name="张三", email="test@example.com", age=25)
older_user = update_user_age(user, 26) # user 不变,返回新实例
# 不可变列表操作(返回新列表)
def add_item(items: tuple, item) -> tuple:
return items + (item,)
def remove_item(items: tuple, index: int) -> tuple:
return items[:index] + items[index + 1:]高阶函数
函数组合与管道
from typing import Callable, TypeVar
T = TypeVar("T")
U = TypeVar("U")
# 1. 函数组合
def compose(*functions: Callable) -> Callable:
"""从右向左组合函数"""
return reduce(lambda f, g: lambda x: f(g(x)), functions)
def pipe(*functions: Callable) -> Callable:
"""从左向右管道操作"""
return reduce(lambda f, g: lambda x: g(f(x)), functions)
# 示例:数据处理管道
strip = lambda s: s.strip()
lower = lambda s: s.lower()
capitalize = lambda s: s.title()
add_prefix = lambda s: f"Hello, {s}!"
greet = pipe(strip, lower, capitalize, add_prefix)
print(greet(" wANG rAN ")) # "Hello, Wang Ran!"
# 2. 函数式数据处理
def functional_data_processing(users: list[dict]) -> list[dict]:
"""函数式风格的数据处理"""
return pipe(
# 过滤
lambda data: filter(lambda u: u["age"] >= 18, data),
# 转换
lambda data: map(lambda u: {
**u,
"display_name": f"{u['first_name']} {u['last_name']}",
"age_group": "young" if u["age"] < 30 else "adult"
}, data),
# 排序
lambda data: sorted(data, key=lambda u: u["age"]),
# 收集
list
)(users)
# 3. 柯里化(Currying)
def curry(func: Callable) -> Callable:
"""自动柯里化"""
import inspect
sig = inspect.signature(func)
num_params = len(sig.parameters)
def curried(*args):
if len(args) >= num_params:
return func(*args)
return lambda *more: curried(*(args + more))
return curried
@curry
def add(a, b):
return a + b
add5 = add(5) # 部分应用
print(add5(3)) # 8
# 4. 常用函数式工具
def group_by(key: Callable, items: list) -> dict:
"""分组"""
result = {}
for item in items:
k = key(item)
result.setdefault(k, []).append(item)
return result
def flat_map(func: Callable, items: list) -> list:
"""映射后展平"""
return [item for sublist in map(func, items) for item in sublist]
def partition(predicate: Callable, items: list) -> tuple:
"""分区"""
yes, no = [], []
for item in items:
(yes if predicate(item) else no).append(item)
return yes, no
# 使用
users = [
{"name": "张三", "age": 25, "city": "北京"},
{"name": "李四", "age": 30, "city": "上海"},
{"name": "王五", "age": 22, "city": "北京"},
]
by_city = group_by(lambda u: u["city"], users)
young, old = partition(lambda u: u["age"] < 28, users)函子与单子
Optional/Either 模式
from typing import TypeVar, Generic, Callable
T = TypeVar("T")
U = TypeVar("U")
# 1. Maybe(Optional)函子
class Maybe(Generic[T]):
"""可选值,避免 None 检查"""
def bind(self, func: Callable[[T], "Maybe[U]"]) -> "Maybe[U]":
raise NotImplementedError
def map(self, func: Callable[[T], U]) -> "Maybe[U]":
raise NotImplementedError
def get_or_else(self, default: T) -> T:
raise NotImplementedError
class Just(Maybe[T]):
def __init__(self, value: T):
self.value = value
def bind(self, func):
return func(self.value)
def map(self, func):
return Just(func(self.value))
def get_or_else(self, default):
return self.value
def __repr__(self):
return f"Just({self.value})"
class Nothing(Maybe[T]):
def bind(self, func):
return self
def map(self, func):
return self
def get_or_else(self, default):
return default
def __repr__(self):
return "Nothing"
# 安全的链式操作
def safe_divide(a, b):
if b == 0:
return Nothing()
return Just(a / b)
result = (Just(10)
.bind(lambda x: safe_divide(x, 2)) # Just(5.0)
.map(lambda x: x * 3) # Just(15.0)
.get_or_else(0)) # 15.0
# 2. Either(结果或错误)单子
class Either(Generic[T]):
pass
class Left(Either[T]):
def __init__(self, error):
self.error = error
def map(self, func):
return self
def bind(self, func):
return self
class Right(Either[T]):
def __init__(self, value: T):
self.value = value
def map(self, func):
return Right(func(self.value))
def bind(self, func):
return func(self.value)
# 链式验证
def validate_name(name: str) -> Either:
if not name or len(name) < 2:
return Left("姓名至少2个字符")
return Right(name)
def validate_age(age: int) -> Either:
if age < 0 or age > 150:
return Left("年龄无效")
return Right(age)
def validate_email(email: str) -> Either:
if "@" not in email:
return Left("邮箱格式错误")
return Right(email)
# 链式验证
result = (Right("张三")
.bind(validate_name)
.bind(lambda _: validate_age(25))
.bind(lambda _: validate_email("test@example.com")))
# Right("test@example.com")
error_result = (Right("")
.bind(validate_name)) # Left("姓名至少2个字符")优点
函数式实战:数据处理管道
数据清洗管道
from dataclasses import dataclass, replace
from typing import Callable, List, Tuple
from functools import reduce
import re
@dataclass(frozen=True)
class RawRecord:
id: int
name: str
email: str
age: str
salary: str
@dataclass(frozen=True)
class CleanRecord:
id: int
name: str
email: str
age: int
salary: float
is_valid: bool
# 纯函数 — 每一步都返回新数据
def strip_whitespace(record: RawRecord) -> RawRecord:
"""去除空白字符"""
return replace(record,
name=record.name.strip(),
email=record.email.strip(),
age=record.age.strip(),
salary=record.salary.strip())
def validate_email(record: RawRecord) -> RawRecord:
"""验证邮箱格式(标记而非抛异常)"""
# 这里只是示例,实际可以用更复杂的验证
return record
def parse_fields(record: RawRecord) -> Tuple[CleanRecord, bool]:
"""解析字段类型"""
try:
return CleanRecord(
id=record.id,
name=record.name,
email=record.email,
age=int(record.age),
salary=float(record.salary),
is_valid=True
), True
except (ValueError, TypeError):
return CleanRecord(
id=record.id, name=record.name, email="",
age=0, salary=0.0, is_valid=False
), False
def filter_adults(record: CleanRecord) -> bool:
"""过滤未成年"""
return record.age >= 18
def compute_tax(record: CleanRecord) -> dict:
"""计算税费"""
tax_rate = 0.1 if record.salary < 5000 else 0.2
return {
"name": record.name,
"salary": record.salary,
"tax": round(record.salary * tax_rate, 2)
}
# 构建管道
def clean_pipeline(raw_data: List[RawRecord]) -> List[dict]:
return pipe(
lambda data: map(strip_whitespace, data), # 步骤1:清洗
lambda data: map(validate_email, data), # 步骤2:验证邮箱
lambda data: map(parse_fields, data), # 步骤3:类型转换
lambda data: filter(lambda r: r[1], data), # 步骤4:过滤无效
lambda data: map(lambda r: r[0], data), # 步骤5:解包
lambda data: filter(filter_adults, data), # 步骤6:过滤未成年
lambda data: map(compute_tax, data), # 步骤7:计算税费
list # 步骤8:收集结果
)(raw_data)
# 使用
raw_records = [
RawRecord(1, " 张三 ", "zhang@test.com", "25", "8000"),
RawRecord(2, "李四", "li@test.com", "17", "3000"),
RawRecord(3, "王五", "wang@test.com", "abc", "6000"),
]
results = clean_pipeline(raw_records)函数式状态机
from dataclasses import dataclass
from typing import Tuple, Optional
@dataclass(frozen=True)
class OrderState:
status: str
items: tuple
total: float
payment_id: Optional[str] = None
# 纯函数状态转换
def create_order(items: tuple, total: float) -> OrderState:
return OrderState(status="created", items=items, total=total)
def add_item(state: OrderState, item: str, price: float) -> OrderState:
if state.status != "created":
raise ValueError("只能修改未支付的订单")
return replace(state,
items=state.items + (item,),
total=state.total + price)
def pay_order(state: OrderState, payment_id: str) -> OrderState:
if state.status != "created":
raise ValueError("只能支付未支付的订单")
return replace(state, status="paid", payment_id=payment_id)
def cancel_order(state: OrderState) -> OrderState:
if state.status == "shipped":
raise ValueError("已发货的订单不能取消")
return replace(state, status="cancelled")
# 使用 — 每一步都是纯函数,状态不可变
order = create_order(("商品A",), 100.0)
order = add_item(order, "商品B", 50.0)
order = pay_order(order, "PAY001")
# order = cancel_order(order) # 抛异常:已支付的订单不能取消性能优化技巧
记忆化(Memoization)
from functools import lru_cache
# 使用 lru_cache 自动缓存函数结果
@lru_cache(maxsize=128)
def fibonacci(n: int) -> int:
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
print(fibonacci(100)) # 瞬间完成,因为中间结果被缓存
# 手动实现记忆化
def memoize(func):
cache = {}
def wrapper(*args):
if args not in cache:
cache[args] = func(*args)
return cache[args]
return wrapper
@memoize
def expensive_computation(n: int) -> int:
print(f"计算 {n}")
return n * n
expensive_computation(5) # 输出: 计算 5
expensive_computation(5) # 无输出(命中缓存)惰性求值
# 生成器实现惰性求值 — 按需计算,不预先加载全部数据
def read_large_file(filepath: str):
"""逐行读取大文件,不一次性加载到内存"""
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
yield line.strip()
def process_lines(lines, filter_fn, transform_fn):
"""惰性过滤和转换"""
for line in lines:
if filter_fn(line):
yield transform_fn(line)
# 使用 — 处理 10GB 的日志文件,内存占用极小
lines = read_large_file("/var/log/app.log")
errors = process_lines(
lines,
filter_fn=lambda l: "ERROR" in l,
transform_fn=lambda l: l.split("|")[3]
)
for error in errors:
print(error) # 每次只处理一行避免不必要的拷贝
from itertools import islice, chain
# 不好的做法:创建多个中间列表
def process_bad(data):
step1 = [x for x in data if x > 0] # 创建列表1
step2 = [x * 2 for x in step1] # 创建列表2
step3 = [x for x in step2 if x < 100] # 创建列表3
return sum(step3)
# 好的做法:使用生成器表达式,零中间列表
def process_good(data):
return sum(
x for x in data
if x > 0
for x in [x * 2]
if x < 100
)
# 更好的做法:使用内置函数链
def process_best(data):
return sum(
islice(
(x * 2 for x in data if x > 0),
0, 1000 # 最多处理1000条
)
)函数式与面向对象的结合
函数式构建器
from dataclasses import dataclass, replace
from typing import Callable
@dataclass(frozen=True)
class QueryConfig:
table: str
conditions: tuple = ()
order_by: str = ""
limit: int = 100
offset: int = 0
# 函数式链式调用构建查询
class QueryBuilder:
def __init__(self, config: QueryConfig = None):
self._config = config or QueryConfig(table="")
def from_table(self, table: str) -> "QueryBuilder":
return QueryBuilder(replace(self._config, table=table))
def where(self, *conditions: str) -> "QueryBuilder":
return QueryBuilder(replace(self._config,
conditions=self._config.conditions + conditions))
def order(self, field: str) -> "QueryBuilder":
return QueryBuilder(replace(self._config, order_by=field))
def limit(self, n: int) -> "QueryBuilder":
return QueryBuilder(replace(self._config, limit=n))
def build(self) -> QueryConfig:
return self._config
# 使用 — 每步返回新对象,原对象不变
base_query = QueryBuilder().from_table("users").where("age > 18")
adult_query = base_query.where("status = 'active'").order("name").limit(50)
admin_query = base_query.where("role = 'admin'")
print(adult_query.build())
print(admin_query.build())函数式事件处理
from typing import List, Callable
class EventBus:
def __init__(self):
self._handlers: dict[str, List[Callable]] = {}
def on(self, event: str, handler: Callable) -> Callable:
"""注册事件处理器,返回取消注册函数"""
if event not in self._handlers:
self._handlers[event] = []
self._handlers[event].append(handler)
def unsubscribe():
self._handlers[event].remove(handler)
return unsubscribe
def emit(self, event: str, *args, **kwargs):
"""触发事件"""
for handler in self._handlers.get(event, []):
handler(*args, **kwargs)
def pipe(self, event: str, *transforms: Callable) -> Callable:
"""创建事件处理管道"""
def handler(data):
result = data
for transform in transforms:
result = transform(result)
return result
return self.on(event, handler)
# 使用
bus = EventBus()
# 函数式管道处理事件
bus.pipe("order_created",
lambda order: {**order, "total": sum(i["price"] for i in order["items"])},
lambda order: {**order, "discount": order["total"] * 0.1},
lambda order: print(f"订单总价: {order['total'] - order['discount']}")
)
bus.emit("order_created", {"items": [{"price": 100}, {"price": 200}]})缺点
总结
函数式编程核心原则:纯函数(无副作用)和不可变数据。Python 通过 frozen=True 的 dataclass 实现不可变对象,replace() 创建修改副本。函数组合使用 pipe()(从左到右)和 compose()(从右到左)。高阶函数 map/filter/reduce 是函数式数据处理的基础。Maybe/Either 模式避免 None 检查和异常处理。建议在数据处理和转换场景使用函数式风格,在 IO 和状态管理场景使用命令式风格。
函数式编程在 C# 中的应用
不可变记录类型
// C# record 天然支持不可变
public record Order(int Id, string Customer, decimal Total);
// with 表达式创建修改副本(类似 Python 的 replace)
var order = new Order(1, "张三", 100m);
var updatedOrder = order with { Total = 90m }; // 新对象
// 不可变集合
var items = ImmutableArray.Create("商品A", "商品B");
var newItems = items.Add("商品C"); // 返回新集合,原集合不变函数式数据处理
// LINQ 本质上就是函数式编程
var results = orders
.Where(o => o.Status == "Active") // filter
.Select(o => new { o.Name, Total = o.Amount * 0.9m }) // map
.OrderByDescending(o => o.Total) // 排序
.GroupBy(o => o.Name.Substring(0, 1)) // groupBy
.Select(g => new { Letter = g.Key, Count = g.Count(), Avg = g.Average(x => x.Total) })
.ToList();
// 函数组合
Func<decimal, decimal> applyTax = x => x * 1.1m;
Func<decimal, decimal> applyDiscount = x => x * 0.9m;
Func<decimal, decimal> round = x => Math.Round(x, 2);
// 管道组合
Func<decimal, decimal> pipeline = Compose(round, applyDiscount, applyTax);
// 从右到左执行: applyTax -> applyDiscount -> round关键知识点
- 模式不是目标,降低耦合和控制变化才是目标。
- 先找变化点、稳定点和协作边界,再决定是否引入模式。
- 同一个模式在不同规模下的收益和代价差异很大。
项目落地视角
- 优先画出参与对象、依赖方向和调用链,再落到代码。
- 把模式放到一个真实场景里,比如支付、规则引擎、工作流或插件扩展。
- 配合单元测试或契约测试,保证重构后的行为没有漂移。
常见误区
- 为了看起来“高级”而套模式。
- 把简单问题拆成过多抽象层,导致阅读和排障都变难。
- 只会背 UML,不会解释为什么这里需要这个模式。
进阶路线
- 继续关注模式之间的组合用法,而不是孤立记忆。
- 从业务建模、演进策略和团队协作角度看模式的适用性。
- 把模式结论沉淀为项目模板、基类或约束文档。
适用场景
- 当你准备把《函数式编程思想》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合在业务规则频繁变化、分支增多或对象协作复杂时引入。
- 当你希望提高扩展性,但又不想把系统拆得过度抽象时,这类主题很有参考价值。
落地建议
- 先识别变化点,再决定是否引入模式,而不是反过来套模板。
- 优先为模式的边界、依赖和调用路径画出简单结构图。
- 把模式落到一个明确场景,例如支付、规则计算、插件扩展或工作流。
排错清单
- 检查抽象层是否过多,导致调用路径和责任不清晰。
- 确认引入模式后是否真的减少了条件分支和重复代码。
- 警惕“为了模式而模式”,尤其是在简单业务里。
复盘问题
- 如果把《函数式编程思想》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《函数式编程思想》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《函数式编程思想》最大的收益和代价分别是什么?
