整合 Elasticsearch 全文搜索
大约 9 分钟约 2726 字
整合 Elasticsearch 全文搜索
简介
在 ASP.NET Core 项目中整合 Elasticsearch,核心目标通常有两类:一类是为业务提供全文搜索能力,另一类是为后台提供日志检索和数据分析能力。真正落地时,重点不只是会写查询 DSL,而是要先明确索引结构、字段类型、分词策略、更新机制和查询目标。
Elasticsearch 在系统架构中的位置
数据写入链路:
数据库 → CDC/Outbox → Kafka → 消费者 → ES 索引写入
数据查询链路:
客户端 → API → ES 查询服务 → Elasticsearch → 搜索结果
两种典型场景:
1. 业务搜索(商品搜索、内容搜索、用户搜索)
2. 日志分析(ELK Stack:Elasticsearch + Logstash + Kibana)字段类型选择
text — 全文搜索(分词后建立倒排索引)
适用于:标题、正文、描述
注意:不能用于排序和聚合
keyword — 精确匹配(不分词)
适用于:ID、状态、分类、标签、枚举
注意:支持聚合和排序
date — 日期类型
适用于:创建时间、更新时间、生日
格式:yyyy-MM-dd HH:mm:ss
number — 数值类型
适用于:价格、数量、评分、浏览量
支持:范围查询、排序、聚合
boolean — 布尔类型
适用于:是否上架、是否激活
nested — 嵌套对象(保持对象内部关联性)
适用于:商品规格列表、订单明细列表
选择原则:
- 需要搜索的用 text
- 需要过滤/聚合/排序的用 keyword
- text 和 keyword 可以同时使用(multi-fields)特点
实现
客户端注册
// ============================================
// NuGet: NEST, Elasticsearch.Net
// ============================================
using Nest;
using Elasticsearch.Net;
var builder = WebApplication.CreateBuilder(args);
var esUrl = builder.Configuration["Elasticsearch:Url"] ?? "http://localhost:9200";
builder.Services.AddSingleton<IElasticClient>(_ =>
{
var settings = new ConnectionSettings(new Uri(esUrl))
.DefaultIndex("articles")
.DefaultMappingFor<SearchDocument>(m => m
.IndexName("articles")
)
.EnableDebugMode()
.PrettyJson()
.RequestTimeout(TimeSpan.FromSeconds(20))
.OnRequestCompleted(callDetails =>
{
// 记录慢查询
if (callDetails.ElapsedMilliseconds > 1000)
{
Console.WriteLine($"ES 慢查询: {callDetails.HttpMethod} {callDetails.Uri} " +
$"耗时 {callDetails.ElapsedMilliseconds}ms");
}
});
return new ElasticClient(settings);
});
// 健康检查
builder.Services.AddHealthChecks()
.AddElasticsearch(esUrl, name: "elasticsearch", timeout: TimeSpan.FromSeconds(5));索引定义与创建
// ============================================
// 文档模型
// ============================================
public class ArticleDocument
{
[Keyword(Name = "id")]
public string Id { get; set; } = string.Empty;
[Text(Name = "title", Analyzer = "ik_max_word", SearchAnalyzer = "ik_smart")]
public string Title { get; set; } = string.Empty;
[Text(Name = "content", Analyzer = "ik_max_word", SearchAnalyzer = "ik_smart")]
public string Content { get; set; } = string.Empty;
[Text(Name = "summary")]
public string Summary { get; set; } = string.Empty;
[Keyword(Name = "category")]
public string Category { get; set; } = string.Empty;
[Keyword(Name = "author")]
public string Author { get; set; } = string.Empty;
[Keyword(Name = "tags")]
public string[] Tags { get; set; } = Array.Empty<string>();
[Date(Name = "created_at", Format = "yyyy-MM-dd HH:mm:ss")]
public DateTime CreatedAt { get; set; }
[Number(Name = "view_count")]
public int ViewCount { get; set; }
[Number(Name = "like_count")]
public int LikeCount { get; set; }
[Boolean(Name = "is_published")]
public bool IsPublished { get; set; }
}
// ============================================
// 索引创建服务
// ============================================
public class ArticleIndexService
{
private readonly IElasticClient _client;
public ArticleIndexService(IElasticClient client)
{
_client = client;
}
public async Task EnsureIndexAsync()
{
var exists = await _client.Indices.ExistsAsync("articles");
if (exists.Exists) return;
var response = await _client.Indices.CreateAsync("articles", c => c
.Settings(s => s
.NumberOfShards(1) // 主分片数(生产环境建议 3-5)
.NumberOfReplicas(0) // 副本数(生产环境建议 1)
.RefreshInterval("1s") // 刷新间隔
.Analysis(a => a
.TokenFilters(tf => tf
.LowerCase("lowercase_filter"))
.Analyzers(az => az
.Custom("ik_smart_analyzer", ca => ca
.Tokenizer("ik_smart")
.Filter("lowercase_filter"))
.Custom("ik_max_word_analyzer", ca => ca
.Tokenizer("ik_max_word")
.Filter("lowercase_filter")))))
.Map<ArticleDocument>(m => m
.AutoMap()
.Properties(p => p
.Text(t => t.Name(x => x.Title)
.Fields(f => f
.Keyword(k => k.Name("title.keyword"))
.Text(tt => tt.Name("title.raw", analyzer = "standard"))))
.Text(t => t.Name(x => x.Content))
.Keyword(k => k.Name(x => x.Category))
.Keyword(k => k.Name(x => x.Tags))
.Keyword(k => k.Name(x => x.Author))
.Date(d => d.Name(x => x.CreatedAt))
.Number(n => n.Name(x => x.ViewCount))
.Number(n => n.Name(x => x.LikeCount))
.Boolean(b => b.Name(x => x.IsPublished)))));
if (!response.IsValid)
{
throw new InvalidOperationException($"索引创建失败: {response.DebugInformation}");
}
}
}文档写入与同步策略
// ============================================
// 文档写入服务
// ============================================
public class ArticleSearchWriter
{
private readonly IElasticClient _client;
private readonly ILogger<ArticleSearchWriter> _logger;
public ArticleSearchWriter(IElasticClient client, ILogger<ArticleSearchWriter> logger)
{
_client = client;
_logger = logger;
}
/// <summary>
/// 写入单个文档
/// </summary>
public async Task IndexAsync(ArticleDocument doc)
{
var response = await _client.IndexAsync(doc, i => i
.Index("articles")
.Id(doc.Id)
.Refresh(Refresh.WaitFor)); // 等待刷新,写入即可搜索
if (!response.IsValid)
{
_logger.LogError("ES 写入失败: {DebugInfo}", response.DebugInformation);
throw new InvalidOperationException("ES 写入失败");
}
_logger.LogInformation("ES 写入成功: Id={Id}", doc.Id);
}
/// <summary>
/// 批量写入
/// </summary>
public async Task BulkIndexAsync(IEnumerable<ArticleDocument> docs)
{
var bulkResponse = await _client.BulkAsync(b => b
.Index("articles")
.IndexMany(docs)
.Refresh(Refresh.WaitFor));
if (bulkResponse.Errors)
{
var errors = bulkResponse.ItemsWithErrors
.Select(i => $"ID={i.Id}, Error={i.Error?.Reason}")
.ToList();
_logger.LogError("ES 批量写入部分失败: {Errors}", string.Join("; ", errors));
}
_logger.LogInformation("ES 批量写入完成: Success={Success}, Failed={Failed}",
bulkResponse.Items.Count - bulkResponse.ItemsWithErrors.Count,
bulkResponse.ItemsWithErrors.Count);
}
/// <summary>
/// 删除文档
/// </summary>
public async Task DeleteAsync(string id)
{
var response = await _client.DeleteAsync<ArticleDocument>(id, d => d
.Index("articles"));
if (!response.IsValid)
{
_logger.LogError("ES 删除失败: {DebugInfo}", response.DebugInformation);
}
}
/// <summary>
/// 更新文档(部分更新)
/// </summary>
public async Task UpdateAsync(string id, object partialDoc)
{
var response = await _client.UpdateAsync<ArticleDocument, object>(id, u => u
.Index("articles")
.Doc(partialDoc)
.Refresh(Refresh.WaitFor));
if (!response.IsValid)
{
_logger.LogError("ES 更新失败: {DebugInfo}", response.DebugInformation);
}
}
}查询、过滤、高亮与分页
// ============================================
// 搜索服务 — 多字段匹配 + 过滤 + 高亮
// ============================================
public class ArticleSearchService
{
private readonly IElasticClient _client;
public ArticleSearchService(IElasticClient client)
{
_client = client;
}
/// <summary>
/// 多字段搜索 + 分页 + 排序
/// </summary>
public async Task<SearchResult<ArticleDocument>> SearchAsync(
string keyword,
int page = 1,
int pageSize = 20,
string? category = null,
string? sortBy = null)
{
var response = await _client.SearchAsync<ArticleDocument>(s => s
.Index("articles")
.From((page - 1) * pageSize)
.Size(pageSize)
.Query(q => q
.Bool(b =>
{
// 必须匹配:关键词
if (!string.IsNullOrEmpty(keyword))
{
b.Must(m => m
.MultiMatch(mm => mm
.Fields(f => f
.Field(x => x.Title, boost: 3) // 标题权重高
.Field(x => x.Summary, boost: 2) // 摘要权重中
.Field(x => x.Content)) // 内容权重低
.Query(keyword)
.Type(TextQueryType.BestFields)
.Fuzziness(Fuzziness.Auto)));
}
// 过滤条件:分类
if (!string.IsNullOrEmpty(category))
{
b.Filter(f => f.Term(t => t.Field(x => x.Category).Value(category)));
}
// 过滤条件:只搜索已发布
b.Filter(f => f.Term(t => t.Field(x => x.IsPublished).Value(true)));
}))
.Sort(ss =>
{
// 按相关度排序(默认)
if (sortBy == "date")
ss.Descending(x => x.CreatedAt);
else if (sortBy == "views")
ss.Descending(x => x.ViewCount);
else if (sortBy == "likes")
ss.Descending(x => x.LikeCount);
else
ss.Descending(SortSpecialField.Score); // 默认相关度
})
// 高亮显示
.Highlight(h => h
.Fields(
f => f.Field(x => x.Title)
.PreTags("<em>")
.PostTags("</em>")
.FragmentSize(100)
.NumberOfFragments(1),
f => f.Field(x => x.Summary)
.PreTags("<em>")
.PostTags("</em>")
.FragmentSize(200)
.NumberOfFragments(2))
.PreTags("<mark>")
.PostTags("</mark>")));
return new SearchResult<ArticleDocument>
{
Items = response.Documents.ToList(),
Total = (int)(response.Total ?? 0),
Page = page,
PageSize = pageSize,
MaxScore = response.MaxScore,
Took = response.Took,
Highlights = response.Hits.Select(h => new
{
Id = h.Id,
Highlights = h.Highlights?.ToDictionary(
kv => kv.Key,
kv => string.Join("", kv.Value))
}).ToList()
};
}
}
public class SearchResult<T>
{
public List<T> Items { get; set; } = new();
public int Total { get; set; }
public int Page { get; set; }
public int PageSize { get; set; }
public double? MaxScore { get; set; }
public long Took { get; set; }
public List<HitHighlight> Highlights { get; set; } = new();
}
public class HitHighlight
{
public string Id { get; set; } = string.Empty;
public Dictionary<string, string> Highlights { get; set; } = new();
}聚合分析与统计
// ============================================
// 聚合查询 — 分类统计、时间分布
// ============================================
public async Task<IDictionary<string, long>> GetCategoryStatsAsync()
{
var response = await _client.SearchAsync<ArticleDocument>(s => s
.Index("articles")
.Size(0) // 不需要返回文档
.Query(q => q.Term(t => t.Field(x => x.IsPublished).Value(true)))
.Aggregations(a => a
.Terms("by_category", t => t
.Field(x => x.Category)
.Size(20)
.Order(TermsOrder.CountDescending))));
return response.Aggregations
.Terms("by_category")
.Buckets
.ToDictionary(x => x.Key, x => x.DocCount ?? 0);
}
// 时间直方图
public async Task<IReadOnlyList<DateHistogramBucket>> GetPublishTrendAsync(
DateTime start, DateTime end, string interval = "month")
{
var response = await _client.SearchAsync<ArticleDocument>(s => s
.Index("articles")
.Size(0)
.Query(q => q.Bool(b =>
b.Filter(f => f.DateRange(dr => dr
.Field(x => x.CreatedAt)
.GreaterThanOrEquals(start)
.LessThan(end)))))
.Aggregations(a => a
.DateHistogram("by_month", dh => dh
.Field(x => x.CreatedAt)
.CalendarInterval(interval)
.Format("yyyy-MM"))));
return response.Aggregations
.DateHistogram("by_month")
.Buckets
.Select(b => new DateHistogramBucket
{
Date = DateTime.Parse(b.DateAsString ?? ""),
Count = b.DocCount ?? 0
})
.ToList();
}
public class DateHistogramBucket
{
public DateTime Date { get; set; }
public long Count { get; set; }
}数据同步 — CDC 方式
// ============================================
// 数据库变更 → Kafka → ES 同步
// ============================================
public class ArticleSyncConsumer : BackgroundService
{
private readonly IElasticClient _esClient;
private readonly IArticleRepository _repository;
private readonly ILogger<ArticleSyncConsumer> _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// 监听 Kafka 中的 article-changes Topic
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 消费变更事件
var changeEvent = await ConsumeChangeEventAsync(stoppingToken);
if (changeEvent == null) continue;
switch (changeEvent.Type)
{
case "created":
case "updated":
var article = await _repository.GetByIdAsync(changeEvent.ArticleId);
if (article != null)
{
var doc = MapToDocument(article);
await _esClient.IndexAsync(doc, i => i
.Index("articles")
.Id(doc.Id)
.Refresh(Refresh.WaitFor));
}
break;
case "deleted":
await _esClient.DeleteAsync<ArticleDocument>(
changeEvent.ArticleId, d => d.Index("articles"));
break;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "文章同步失败");
await Task.Delay(5000, stoppingToken);
}
}
}
private ArticleDocument MapToDocument(Article article) => new()
{
Id = article.Id.ToString(),
Title = article.Title,
Content = article.Content,
Summary = article.Summary,
Category = article.Category,
Author = article.Author,
Tags = article.Tags?.Split(',', StringSplitOptions.RemoveEmptyEntries) ?? Array.Empty<string>(),
CreatedAt = article.CreatedAt,
ViewCount = article.ViewCount,
LikeCount = article.LikeCount,
IsPublished = article.IsPublished
};
}优点
缺点
总结
在 ASP.NET Core 中整合 Elasticsearch,真正要做的是"搜索系统设计",而不是"接一个客户端库"。你需要先想清楚数据模型、字段类型、索引更新链路和查询目标,再去写具体的 .SearchAsync() 或 .IndexAsync() 代码,否则后期会在 Mapping、同步和查询性能上反复踩坑。
关键知识点
text和keyword的差别,会直接影响搜索、过滤和聚合能力。- ES 适合搜索和分析,不适合承担主事务数据写入职责。
- 同步策略(同步写、异步写、Outbox)往往决定系统可靠性。
- 高亮、分页、聚合只是功能表象,底层仍然是字段建模问题。
项目落地视角
- 内容管理系统、知识库、商品搜索、后台检索页都很适合 ES。
- 日志平台和业务搜索平台最好分开治理,不要混在一个索引策略里。
- 中小型系统若只有简单筛选,未必需要一上来就用 ES。
- 真正上线前要准备索引重建、数据补偿和慢查询分析手段。
常见误区
- 把数据库字段原样同步到 ES,不做搜索建模。
- 所有字符串字段都用 text,最后过滤和聚合全出问题。
- 只关注"搜得到",不关注同步延迟和一致性。
- 以为接入 ES 就天然比数据库查询更快。
进阶路线
- 深入学习 Analyzer、Tokenizer、IK 分词、同义词词库。
- 学习 Data Stream、ILM、索引模板和冷热分层治理。
- 研究业务搜索、日志搜索、向量检索三类场景的不同设计方式。
- 把 ES 与 Outbox、Kafka、CDC、ELK 链路结合起来理解。
适用场景
- 全文检索。
- 内容平台、商品平台、后台复杂搜索。
- 日志检索与运营分析。
- 需要高维过滤 + 搜索 + 聚合的系统。
落地建议
- 先做索引建模,再做代码接入。
- 为每个索引明确:字段类型、更新来源、查询目标、生命周期策略。
- 高价值索引要能支持全量重建与异常补偿。
- 搜索质量问题优先检查分词、字段类型和查询 DSL,而不是先怀疑代码。
排错清单
- 搜不到:检查索引写入、refresh 时机、字段类型和 analyzer。
- 聚合错:检查字段是不是 keyword。
- 排序异常:检查字段类型和索引值格式是否正确。
- 数据不一致:检查数据库 → ES 同步链路和补偿机制。
