Spring Boot + Elasticsearch 快速整合指南
引言
Elasticsearch作為高性能的分布式搜索引擎,在現代應用開發中被廣泛使用。具有如下特點:
- 一個分布式的實時文檔存儲引擎,每個字段都可以被索引與搜索。
- 一個分布式實時分析搜索引擎,支持各種查詢和聚合操作。
- 能勝任上百個服務節點的擴展,并可以支持PB級別的結構化或者非結構化數據。
倒排索引
倒排索引是整個Elasticsearch的核心,正常的搜索以一本書為例,應該是由目錄 -> 章節 -> 頁碼 -> 內容這樣的查找順序,這樣是正排索引的思想。
但是設想一下,我在一本書中快速查找elasticsearch這個關鍵字所在的頁面該怎么辦?
倒排索引的思路是通過單詞到文檔ID的關系對應。
圖片
本文將詳細介紹通過ElasticsearchRepository和ElasticsearchRestTemplate兩種方式實現整合的方法。
案例
使用 ElasticsearchRepository
ElasticsearchRepository是Spring Data提供的接口,通過繼承該接口,可快速實現基本的CRUD操作,極大地簡化了開發流程。
1. 創建Repository接口:繼承ElasticsearchRepository,并指定實體類和主鍵類型,還可自定義查詢方法。
public interface DemoRepository extends ElasticsearchRepository<Demo, String> {
// 自定義查詢方法
List<Demo> findByImsi(String imsi);
// 使用@Query注解定義DSL查詢
@Query("{\"bool\": {\"must\": [{\"match\": {\"imsi\": \"?0\"}}], \"filter\": {\"range\": {\"costTime\": {\"gte\": ?1, \"lte\": ?2}}}}}")
List<Demo> findByImsiAndPriceRange(String imsi, double min, double max);
}
2. 服務層實現:在服務類中注入Repository,調用其方法完成數據操作。
@Service
public class DemoService {
@Autowired
private DemoRepository demoRepository;
public Demo save(Demo demo) {
return demoRepository.save(demo);
}
public Optional<Demo> findById(String id) {
return demoRepository.findById(id);
}
public List<Demo> findByName(String imsi) {
return demoRepository.findByImsi(imsi);
}
public Iterable<Demo> findAll() {
return demoRepository.findAll();
}
public void delete(Demo demo) {
demoRepository.delete(demo);
}
public List<Demo> findByImsiAndPriceRange(String imsi, double min, double max) {
return demoRepository.findByImsiAndPriceRange(imsi, min, max);
}
}
使用 ElasticsearchRestTemplate
1. 配置ElasticsearchRestTemplate
@Configuration
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
@Value("${spring.elasticsearch.uris: localhost:9200}")
private String[] uris;
@Bean(name = { "elasticsearchOperations", "elasticsearchRestTemplate" })
public ElasticsearchRestTemplate elasticsearchTemplate() {
return new ElasticsearchRestTemplate(elasticsearchClient());
}
@Override
public RestHighLevelClient elasticsearchClient() {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("username", "password"));
HttpHost[] httpHosts = Arrays.stream(uris).map(HttpHost::create).toArray(HttpHost[]::new);
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts)
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
return new RestHighLevelClient(restClientBuilder);
}
}
2. 服務層實現:在服務類中注入ElasticsearchRestTemplate,通過構建查詢條件實現各種數據操作。
@Service
public class DslQueryService {
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
// 1. 基本Match查詢
public List<Demo> searchByKeyword(String keyword) {
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchQuery("imsi", keyword))
.build();
SearchHits<Demo> searchHits = elasticsearchRestTemplate.search(query, Demo.class);
return searchHits.getSearchHits().stream()
.map(SearchHit::getContent)
.collect(Collectors.toList());
}
// 2. 組合Bool查詢
public List<Demo> complexSearch(String imsi, Double min, Double max, String desc) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
if (imsi != null &&!imsi.isEmpty()) {
boolQuery.must(QueryBuilders.matchQuery("imsi", imsi));
}
if (min != null && max != null) {
boolQuery.filter(QueryBuilders.rangeQuery("costTime").gte(min).lte(max));
}
if (desc != null &&!desc.isEmpty()) {
boolQuery.filter(QueryBuilders.termQuery("desc", desc));
}
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(boolQuery)
.withPageable(PageRequest.of(0, 20))
.build();
SearchHits<Demo> searchHits = elasticsearchRestTemplate.search(query, Demo.class);
return searchHits.getSearchHits().stream()
.map(SearchHit::getContent)
.collect(Collectors.toList());
}
// 3. 聚合查詢示例
public void getCategoryCounts() {
SearchRequest searchRequest = new SearchRequest("demo");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termsQuery("imsi", "test","000","1"));
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.size(0);
searchSourceBuilder.trackTotalHits(true);
Script scriptGroup = new Script("doc['imsi'].value");
TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("by_imsi").script(scriptGroup).size(10);
termsAggregationBuilder.subAggregation(AggregationBuilders.sum("sumTime").field("costTime"));
// Map<String, String> bucketsPathsMap = new HashMap<>();
// bucketsPathsMap.put("sumTime", "sumTime");
// BucketSelectorPipelineAggregationBuilder selectorPipelineAggregationBuilder = PipelineAggregatorBuilders
// .bucketSelector("having_count", bucketsPathsMap, new Script("params.sumTime<10000"));
// termsAggregationBuilder.subAggregation(selectorPipelineAggregationBuilder);
TopHitsAggregationBuilder topHit = new TopHitsAggregationBuilder("top_result").size(10);
termsAggregationBuilder.subAggregation(topHit);
searchSourceBuilder.aggregation(termsAggregationBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = elasticsearchRestTemplate.execute(client -> {
return client.search(searchRequest, RequestOptions.DEFAULT);
});
Terms terms = (Terms) searchResponse.getAggregations().get("by_imsi");
for(Terms.Bucket bucket : terms.getBuckets()) {
Aggregations aggregations = bucket.getAggregations();
Sum sum = aggregations.get("sumTime");
System.out.println(bucket.getKeyAsString()+":"+bucket.getDocCount()+":"+sum.getValueAsString());
}
}
// 4. 滾動查詢示例
public List<Demo> scrollSearch(String scrollId, int pageSize) {
SearchScrollHits<Demo> searchScrollHits;
if (scrollId == null) {
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchAllQuery())
.withPageable(PageRequest.of(0, pageSize))
.build();
searchScrollHits = elasticsearchRestTemplate.searchScrollStart(30000L, searchQuery, Demo.class, IndexCoordinates.of("demo"));
} else {
searchScrollHits = elasticsearchRestTemplate.searchScrollContinue(scrollId, 30000L, Demo.class, IndexCoordinates.of("demo"));
}
elasticsearchRestTemplate.searchScrollClear(Collections.singletonList(searchScrollHits.getScrollId()));
return searchScrollHits.getSearchHits().stream()
.map(SearchHit::getContent)
.collect(Collectors.toList());
}
}
測試方法
@Slf4j
@SpringBootTest
public class TestDemo {
@Autowired
private DemoService demoService;
@Autowired
private DslQueryService dslQueryService;
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
@Test
public void test1(){
demoService.findById("vkwztJMBXiMbcxs-8Npt").ifPresent(demo -> log.info(demo.toString()));
}
@Test
public void test2(){
demoService.findByImsiAndPriceRange("test", 0.0, 50.0).forEach(demo -> log.info(demo.toString()));
}
@Test
public void test3(){
dslQueryService.searchByKeyword("test").forEach(demo -> log.info(demo.toString()));
}
@Test
public void test4(){
dslQueryService.getCategoryCounts();
}
@Test
public void test5(){
dslQueryService.scrollSearch(null, 10).forEach(demo -> log.info(demo.toString()));
}
@Test
public void test6(){
Boolean flag = elasticsearchRestTemplate.indexOps(Demo.class).exists();
if (flag == false) {
log.info(" createIndex.......");
elasticsearchRestTemplate.indexOps(Demo.class).create();
elasticsearchRestTemplate.indexOps(Demo.class).putMapping(Demo.class);
} else {
String indexName = elasticsearchRestTemplate.getIndexCoordinatesFor(Demo.class).getIndexName();
log.info(" refreshIndex......");
refreshAsync(indexName);
}
}
@Test
public void test7(){
List list = new ArrayList();
Demo bean = new Demo("test", "test", "test", "test", "test", 1L);
IndexQuery indexQuery = new IndexQueryBuilder().withSource(JSONObject.toJSONString(bean)).build();
list.add(indexQuery);
elasticsearchRestTemplate.bulkIndex(list, Demo.class);
}
public void refreshAsync(String index) {
try {
elasticsearchRestTemplate.execute(client -> client.indices().refreshAsync(refreshRequest(index), RequestOptions.DEFAULT, new ActionListener<RefreshResponse>() {
@Override
public void onResponse(RefreshResponse refreshResponse) {
}
@Override
public void onFailure(Exception e) {
log.info("failed callback to refresh index={},exception--->{}" + index, e);
}
}));
} catch (Exception e) {
log.info("failed to refresh index={},exception--->{}" + index, e);
}
}
}
復制聚合場景
- 使用嵌套的terms聚合實現三級分組:時間、域和 IMSI
- 對每個分組添加計數聚合,計算總數和失敗數
- 使用filter聚合篩選失敗記錄(resulCode 不為 "0000")
// 構建基礎查詢條件
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
// 時間范圍條件
String startTime = jsonParam.getString("startTime");
String endTime = jsonParam.getString("endTime");
if (!StringUtils.isEmpty(startTime) && !StringUtils.isEmpty(endTime)) {
startTime = startTime + ":00";
endTime = endTime + ":59";
boolQueryBuilder.must(QueryBuilders.rangeQuery("rtime").gte(startTime).lte(endTime));
}
// 數據域權限條件
List<String> vpndomains = CommonTools.strList(perms);
if (CollectionUtils.isNotEmpty(vpndomains)) {
boolQueryBuilder.must(QueryBuilders.termsQuery("vpdndomain", vpndomains));
}
// 數據源類型條件
if (!StringUtils.isEmpty(publicPerms) && publicPerms.contains("oldora")) {
boolQueryBuilder.must(QueryBuilders.matchQuery("sourceType", 4));
} else {
boolQueryBuilder.must(QueryBuilders.matchQuery("sourceType", 1));
}
// 構建聚合查詢
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(boolQueryBuilder)
// 使用date_histogram聚合按分鐘分組
.addAggregation(
AggregationBuilders.dateHistogram("by_minute")
.field("rtime")
.fixedInterval(DateHistogramInterval.MINUTE)
.format("yyyy-MM-dd HH:mm")
.subAggregation(
AggregationBuilders.terms("by_domain")
.field("vpdndomain")
.subAggregation(
AggregationBuilders.terms("by_imsi")
.field("imsi")
.subAggregation(
// 統計總數
AggregationBuilders.count("total_count").field("_index")
)
.subAggregation(
// 統計失敗數
AggregationBuilders.filter("fail_count",QueryBuilders.boolQuery()
.mustNot(QueryBuilders.termQuery("resulCode", "0000")))
)
)
)
)
.build();
// 執行查詢
SearchHits<Authlog> searchHits = elasticsearchRestTemplate.search(searchQuery, Authlog.class, IndexCoordinates.of(authlog_index_name));
// 處理聚合結果
List<Map> statsList = new ArrayList<>();
Integer overLimitCount = jsonParam.getInteger("overLimitCount");
Histogram timeTerms = searchHits.getAggregations().get("by_minute");
if (timeTerms != null) {
for (Histogram.Bucket timeBucket : timeTerms.getBuckets()) {
String rtime = timeBucket.getKeyAsString();
Terms domainTerms = timeBucket.getAggregations().get("by_domain");
for (Terms.Bucket domainBucket : domainTerms.getBuckets()) {
String vpdndomain = domainBucket.getKeyAsString();
Terms imsiTerms = domainBucket.getAggregations().get("by_imsi");
for (Terms.Bucket imsiBucket : imsiTerms.getBuckets()) {
String imsi = imsiBucket.getKeyAsString();
// 獲取總數
ValueCount totalCount = imsiBucket.getAggregations().get("total_count");
long total = totalCount.getValue();
// 跳過不滿足閾值的記錄
if (total < overLimitCount) continue;
// 獲取失敗數
Filter failCount = imsiBucket.getAggregations().get("fail_count");
long fail = failCount.getDocCount();
// 構建結果
Map<Object, Object> result = MapUtil.builder()
.put("rtime", rtime)
.put("vpdndomain", vpdndomain)
.put("imsi", imsi)
.put("total", total)
.put("fail", fail)
.map();
statsList.add(result);
}
}
}
}
在實際項目中,可根據需求靈活選擇:
- 對于簡單的CRUD操作和基礎查詢,優先選擇ElasticsearchRepository,其簡潔的代碼結構能快速完成開發。
- 若涉及復雜的查詢邏輯、聚合分析或自定義操作,ElasticsearchRestTemplate更能滿足需求,開發者可通過構建DSL實現強大的搜索功能。