java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > elasticsearch查询超10000

elasticsearch集群查询超10000的解决方案

作者:苍煜

ES为了避免用户的过大分页请求造成ES服务所在机器内存溢出,默认对深度分页的条数进行了限制,默认的最大条数是10000条,这篇文章主要给大家介绍了关于elasticsearch集群查询超10000的解决方案,需要的朋友可以参考下

前言

默认情况下,Elasticsearch集群中每个分片的搜索结果数量限制为10000。这是为了避免潜在的性能问题。

但是我们 在实际工作过程中时常会遇到 需要深度分页,以及查询批量数据更新的情况

问题:当请求form + size >10000 时,请求直接报错

1:修改max_result_window 参数(不推荐)

在此方案中,我们建议仅限于测试用,生产禁用,毕竟当数据量大的时候,过大的数据量可能导致es的内存溢出,直接崩掉,一年绩效白干。

PUT wkl_test/_settings
{
   "index":{
        "max_result_window":2147483647
    }
}

查看索引的 settings

重新查数据:

2:使用游标 scroll API

使用scroll API:scroll API可以帮助我们在不加载所有数据的情况下获取所有结果。它会在后台执行查询以获取滚动ID,并将其用于进行后续查询。这样就可以一次性获取所有结果,而不必担心限制

ES语句查询

在游标方案中,我们只需要在第一次拿到游标id,之后通过游标就能唯一确定查询,在这个查询中通过我们指定的 size 移动游标,具体操作看看下面实操。

GET wkl_test/_search?scroll=5m
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "seq": {
        "order": "asc"
      }
    }
  ],
  "size": 200
}

java实现

@Test
    public void testScroll(){
        RestHighLevelClient restHighLevelClient ;
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        boolQueryBuilder.mustNot(QueryBuilders.existsQuery("seq"));

        try {
            //滚动查询的Scroll,设置请求滚动时间窗口时间
            Scroll scroll = new Scroll(TimeValue.timeValueMillis(180000));

            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            //加入query语句
            sourceBuilder.query(boolQueryBuilder);
            //每次滚动的长度
            sourceBuilder.size(SIZE);
            //加入排序字段
            sourceBuilder.sort("id", SortOrder.DESC);
            //构建searchRequest
            //加入scroll和构造器
            SearchRequest searchRequest = new SearchRequest()
                    .indices("wkl_test")
                    .source(sourceBuilder)
                    .scroll(scroll);
            //存储scroll的list
            List<String> scrollIdList = new ArrayList<>();
            //执行首次检索
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            //首次检索返回scrollId,用于下一次的滚动查询
            String scrollId = searchResponse.getScrollId();
            //拿到hits结果
            SearchHit[] hits = searchResponse.getHits().getHits();
            long value = searchResponse.getHits().getTotalHits().value;
            //保存返回结果List大小
            Long resultSize = 0L;
            scrollIdList.add(scrollId);
            try {
                //滚动查询将SearchHit封装到result中
                while (ArrayUtils.isNotEmpty(hits) && hits.length > 0) {
                    BulkRequest bulkRequest = new BulkRequest();
                    JSONArray esArray = new JSONArray();
                    for (SearchHit hit : hits) {
                        String sourceAsString = hit.getSourceAsString();
                        String index = hit.getIndex();
                        JSONObject jsonObject = JSONObject.parseObject(sourceAsString);
                        String seq = jsonObject.getString("seq");
                        if(StringUtils.isBlank(seq) ){
                            esArray.add(jsonObject);
                            String uuid = jsonObject.getString("id");
                            jsonObject.put("is_del",1);
                            bulkRequest.add(new UpdateRequest(index, uuid).doc(jsonObject));
                        }
                    }
                    resultSize = resultSize+hits.length;

                    //发送请求
                    //实时更新
                    bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                    BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                    System.out.println(bulk.getTook()+"-------"+bulk.getItems().length);

                    //说明滚动完了,返回结果即可
                    if (resultSize > 20000) {
                        break;
                    }
                    //继续滚动,根据上一个游标,得到这次开始查询位置
                    SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
                    searchScrollRequest.scroll(scroll);
                    //得到结果
                    SearchResponse searchScrollResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
                    //定位游标
                    scrollId = searchScrollResponse.getScrollId();
                    hits = searchScrollResponse.getHits().getHits();
                    scrollIdList.add(scrollId);
                }
                System.out.println("----彻底结束了-----");
            } finally {
                //清理scroll,释放资源
                ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
                clearScrollRequest.setScrollIds(scrollIdList);
                restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

scroll API 的优缺点和总结

优缺点:

适用场景

3: search_after + PIT 深度查询

不带PIT

ES语句实现

检索第一页的查询如下所示:

GET wkl_test/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "seq": {
        "order": "asc"
      }
    }
  ],
  "size": 200
}

上述请求的结果包括每个文档的 sort 值数组。

这些 sort 值可以与 search_after 参数一起使用,以开始返回在这个结果列表之后的任何文档。例如,我们可以使用上一个文档的 sort 值并将其传递给 search_after 以检索下一页结果:

Java 实现

@Test
    public void testSearchAfter() throws IOException {
        RestHighLevelClient restHighLevelClient = es7UtilApi.getRestHighLevelClient();

        MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery();

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(matchAllQueryBuilder);
        searchSourceBuilder.from(0);
        searchSourceBuilder.size(200);
        searchSourceBuilder.sort("seq", SortOrder.ASC);
        searchSourceBuilder.trackTotalHits(true);

        SearchRequest searchRequest = new SearchRequest()
                .indices("wkl_test")
                .source(searchSourceBuilder);

        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        SearchHits hits = searchResponse.getHits();
        long value = hits.getTotalHits().value;
        System.out.println("查询到记录数=" + value);

        List<JSONObject> list = new ArrayList<>();
        SearchHit[] searchHists = hits.getHits();
        Object[] sortValues = searchHists[searchHists.length - 1].getSortValues();
        if (searchHists.length > 0) {
            for (SearchHit hit : searchHists) {
                String sourceAsString = hit.getSourceAsString();
                JSONObject jsonObject = JSON.parseObject(sourceAsString);
                jsonObject.put("_id", hit.getId());
                list.add(jsonObject);
            }
        }

        //往后的每次请求都携带上一次的sort_id进行访问。
        while (ArrayUtils.isNotEmpty(searchHists) && searchHists.length > 0){
            searchSourceBuilder.searchAfter(sortValues);
            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponseAfter = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            hits = searchResponseAfter.getHits();
            searchHists = hits.getHits();
            sortValues = searchHists[searchHists.length - 1].getSortValues();
            if (searchHists.length > 0) {
                for (SearchHit hit : searchHists) {
                    String sourceAsString = hit.getSourceAsString();
                    JSONObject jsonObject = JSON.parseObject(sourceAsString);
                    jsonObject.put("_id", hit.getId());
                    list.add(jsonObject);
                }
            }
            if(list.size()>20000){
                break;
            }
            System.out.println("-----彻底结束了-------");
        }

    }

问题

「优点:」

「缺点:」

带PIT

关于PIT

ES语句实现

1:生成pit

#keep_alive必须要加上,它表示这个pit能存在多久,这里设置的是1分钟
POST wkl_test/_pit?keep_alive=1m

在这里插入图片描述

2:在搜索请求中指定PIT:

在每个搜索请求中添加 keep_alive 参数来延长 PIT 的保留期,相当于是重置了一下时间

GET _search
{
  "query": {
    "match_all": {}
  },
  "pit":{
    "id":"t_yxAwEId2tsX3Rlc3QWU0hzbEJkYWNTVEd0ZGRoN0xsQVVNdwAWUGQtaXJpT0xTa2VUN0RGLXZfTlBvZwAAAAAACHG1fxY1UWNKX1RHOFMybXBaV20zbWx3enp3ARZTSHNsQmRhY1NUR3RkZGg3TGxBVU13AAA=",
    "keep_alive":"5m"
  },
  "sort": [
    {
      "seq": {
        "order": "asc"
      }
    }
  ],
  "size": 200
}

3:删除PIT

DELETE _pit
{
 "id":"t_yxAwEId2tsX3Rlc3QWU0hzbEJkYWNTVEd0ZGRoN0xsQVVNdwAWUGQtaXJpT0xTa2VUN0RGLXZfTlBvZwAAAAAACHG1fxY1UWNKX1RHOFMybXBaV20zbWx3enp3ARZTSHNsQmRhY1NUR3RkZGg3TGxBVU13AAA="
}

总结

到此这篇关于elasticsearch集群查询超10000解决方案的文章就介绍到这了,更多相关elasticsearch查询超10000内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文