springboot整合ES

springboot整合ES

项目使用ES存储设备数据,故记录一下springboot与ES的整合,下次用的时候就不用到处找博客了,hhh

spring data elasticsearch

我在项目中用的是这个东西,刚开始也不懂,后面捉摸着用感觉还挺方便的。用它的方式有两种一种是

Repository一种是ElasticsearchRestTemplate。下面在详细的说,这里先提一下。

pom

1
2
3
4
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
</dependency>

网上很多导入spring-boot-starter-data-elasticsearch的,个人感觉不对,我们点开一看就知道spring-boot-starter-data-elasticsearch里面有spring-boot-starter和spring-data-elasticsearch的pom,然而spring-boot-starter是用来启动一个springboot服务的,然而我们肯定已经是一个spring boot服务并且也已经导入他的依赖了,所以我们是不需要的,我尝试将spring-boot-starter-data-elasticsearch删掉只留一个spring-data-elasticsearch也是可以的

yml

1
2
3
4
5
6
7
8
spring:
elasticsearch:
rest:
connection-timeout: 1s # 连接超时时间
username: # 连接用户名
password: # 连接密码
read-timeout: 30s # 读取超时时间
uris: ${ES_HOST:localhost}:${ES_PORT:9200} # es rest 接口地址,多个用逗号隔开

这么配应该就可以了

实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.ruoyi.aiot.domain.message;

import lombok.*;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import java.io.Serializable;

/**
* 设备日志 三个分片 一个副本
* @author yecao
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Document(indexName = "devicelog", shards = 3, replicas = 1)
public class DeviceLog implements Serializable {

@Id
private String id;

/**
* deviceId存为keyword防止分词
*/
@Field(type = FieldType.Keyword)
private String deviceId;

/**
* 日志信息
*/
@Field(type = FieldType.Text)
private String messageContent;

/**
* 日志类型 propertyReport eventLog functionInvoke onlineLog offlineLog
*/
@Field(type = FieldType.Keyword)
private String logType;

/**
* 时间戳
*/
@Field(name = "timestamp", type = FieldType.Text, fielddata = true)
private String timestamp;

}

通过注解来设置ES文档的mapping结构,一些我们不希望被分词的可以设置为keyword,fieldata默认为false,这时候不能用这个字段排序的,设置为true才行

Repository

1
2
3
4
5
6
7
@Repository
public interface DeviceLogRepo extends ElasticsearchRepository<DeviceLog, String> {

public Page<DeviceLog> findAllByDeviceIdAndTimestampIsBetween(String deviceId, String startTime, String endTime, Pageable pageable);

public Page<DeviceLog> searchAllByDeviceId(NativeSearchQuery nativeSearchQuery, Pageable pageable);
}

我们自定义一个接口继承自ElasticsearchRepository,即可使用给我们提供的一些默认的方法,我们也可以自定义一些接口,接口名称和参数的定义是要符合语义的,这样spring data es会自动给我们去实现这个接口。刚开始我以为这会很方便,实际操作起来认为,这只适合最做一些简单的crud稍微复杂的不太行,比如save find这些的用起来还是很爽的。我们save后es就会出现跟我们尸体了对应的文档结构了。

ElasticsearchRestTemplate

这个我们写一些复杂的查询可以用这个,不过使用这个就稍微有一点门槛了,要求我们知道构建es查询的json的一个结构,如果不是很熟的话,我们可以先借助kibana的devtools来写,写完以后再通过代码把请求体构建出来就行了。

看一下下面的两个例子吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Sort.Order timestamp = new Sort.Order(Sort.Direction.DESC, "timestamp");
PageRequest of = PageRequest.of(deviceDataDto.getPageNum(), deviceDataDto.getPageSize(), Sort.by(timestamp));

BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery("deviceId", deviceDataDto.getDeviceId()));

String logType = deviceDataDto.getLogType();
if (StringUtils.isNotEmpty(logType)){
boolQueryBuilder.must(QueryBuilders.termQuery("logType", logType));
}

String startTime = deviceDataDto.getStartTime();
String endTime = deviceDataDto.getEndTime();
if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)){
boolQueryBuilder.filter(QueryBuilders.rangeQuery("timestamp").gte(startTime).lte(endTime));
}
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
NativeSearchQuery build = nativeSearchQueryBuilder.withQuery(boolQueryBuilder).withPageable(of).build();
SearchHits<DeviceLog> search = elasticsearchRestTemplate.search(build, DeviceLog.class);

上面构建了一个根据deviceID和时间范围的一个倒叙查找,大概分为三个步骤

  1. 通过QueryBuilders创建请求的体,也就是那个大json
  2. NativeSearchQueryBuilder将构建出来的json构建成一个可以执行的语句,相当于在QueryBuilders外面包了一层
  3. 通过ElasticsearchRestTemplate执行构建的语句,它相当于就是一个客户端,它负责将我们的请求发出去

比较适合做一些复杂的查询比较灵活,我们可以看到它具体的实现,也方便我们调试。

1
2
3
4
5
6
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termsQuery("deviceId", list));
boolQueryBuilder.filter(QueryBuilders.rangeQuery("timestamp").gte(from).lte(to));
NativeSearchQuery build = nativeSearchQueryBuilder.withQuery(boolQueryBuilder).build();
return elasticsearchRestTemplate.count(build, ReportPropertyMessage.class);

这个例子是用来查询一组设备在某个时间段的消息数量。

优化后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 先查询
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termsQuery("deviceId", deviceIds));

// 后聚合
RangeAggregationBuilder field = AggregationBuilders.range("time_range")
.field("timestamp");
for (int i = 0; i < limit; i++) {
if (i == 0){
field.addRange(fromTemp , toTemp);
}else {
fromTemp = fromTemp + step;
toTemp = toTemp + step;
field.addRange(fromTemp , toTemp);
}
}

NativeSearchQuery build = new NativeSearchQueryBuilder()
// 不查询任何返回结果,这一点很重要,
// 我们要的是聚合结果,不需要他的返回结果,这里直接影响查询速度
.withSourceFilter(new FetchSourceFilter(new String[]{""}, null))
.withQuery(boolQueryBuilder)
.addAggregation(field)
.build();

SearchHits<TestPojo> search = elasticsearchRestTemplate.search(build, TestPojo.class);

问题解决

一段时间不操作过后再次试用报错:connection rest by peer其实就是tcp被断开了然后你又使用了这个tcp。需要通过@Bean手动将我们配置的Bean注入进去。手动设置keepAlive并开启,keepAlive的时间应该小于服务器的keepAlive时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Configuration
public class ElasticserachRestClientConfig {

@Value("${spring.elasticsearch.rest.uris}")
private String uris;

@Value("${spring.elasticsearch.rest.username}")
private String userName;

@Value("${spring.elasticsearch.rest.password}")
private String password;

@Bean
public RestHighLevelClient restHighLevelClient() {
String[] split = uris.split(":");
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(
RestClient.builder(
new HttpHost(split[1].replace("//", ""), Integer.parseInt(split[2]))
).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder.setConnectTimeout(5000)//25hours
.setSocketTimeout(60000);
}
}).setHttpClientConfigCallback(httpClientBuilder -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
.setKeepAliveStrategy((httpResponse, httpContext) -> TimeUnit.SECONDS.toMillis(3))
.setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build());
return httpClientBuilder;
}
)

);
return restHighLevelClient;
}
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!