springboot整合ES 项目使用ES存储设备数据,故记录一下springboot与ES的整合,下次用的时候就不用到处找博客了,hhh
spring data elasticsearch 我在项目中用的是这个东西,刚开始也不懂,后面捉摸着用感觉还挺方便的。用它的方式有两种一种是
Repository
一种是ElasticsearchRestTemplate
。下面在详细的说,这里先提一下。
pom <dependency > <groupId > org.springframework.data</groupId > <artifactId > spring-data-elasticsearch</artifactId > </dependency >
网上很多导入spring-boot-starter-data-elasticsearch
的,个人感觉不对,我们点开一看就知道spring-boot-starter-data-elasticsearch里面有spring-boot-starter和spring-data-elasticsearch的pom,然而spring-boot-starter是用来启动一个springboot服务的,然而我们肯定已经是一个spring boot服务并且也已经导入他的依赖了,所以我们是不需要的,我尝试将spring-boot-starter-data-elasticsearch删掉只留一个spring-data-elasticsearch也是可以的
yml 1 2 3 4 5 6 7 8 spring: elasticsearch: rest: connection-timeout: 1s username: password: read-timeout: 30s uris: ${ES_HOST:localhost}:${ES_PORT:9200}
这么配应该就可以了
实体类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package com.ruoyi.aiot.domain.message;import lombok.*;import org.springframework.data.annotation.Id;import org.springframework.data.elasticsearch.annotations.Document;import org.springframework.data.elasticsearch.annotations.Field;import org.springframework.data.elasticsearch.annotations.FieldType;import java.io.Serializable;@Data @AllArgsConstructor @NoArgsConstructor @Document(indexName = "devicelog", shards = 3, replicas = 1) public class DeviceLog implements Serializable { @Id private String id; @Field(type = FieldType.Keyword) private String deviceId; @Field(type = FieldType.Text) private String messageContent; @Field(type = FieldType.Keyword) private String logType; @Field(name = "timestamp", type = FieldType.Text, fielddata = true) private String timestamp; }
通过注解来设置ES文档的mapping结构,一些我们不希望被分词的可以设置为keyword,fieldata默认为false,这时候不能用这个字段排序的,设置为true才行
Repository 1 2 3 4 5 6 7 @Repository public interface DeviceLogRepo extends ElasticsearchRepository <DeviceLog, String> { public Page<DeviceLog> findAllByDeviceIdAndTimestampIsBetween (String deviceId, String startTime, String endTime, Pageable pageable) ; public Page<DeviceLog> searchAllByDeviceId (NativeSearchQuery nativeSearchQuery, Pageable pageable) ; }
我们自定义一个接口继承自ElasticsearchRepository,即可使用给我们提供的一些默认的方法,我们也可以自定义一些接口,接口名称和参数的定义是要符合语义的,这样spring data es会自动给我们去实现这个接口。刚开始我以为这会很方便,实际操作起来认为,这只适合最做一些简单的crud稍微复杂的不太行,比如save find这些的用起来还是很爽的。我们save后es就会出现跟我们尸体了对应的文档结构了。
ElasticsearchRestTemplate 这个我们写一些复杂的查询可以用这个,不过使用这个就稍微有一点门槛了,要求我们知道构建es查询的json的一个结构,如果不是很熟的话,我们可以先借助kibana的devtools来写,写完以后再通过代码把请求体构建出来就行了。
看一下下面的两个例子吧
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Sort.Order timestamp = new Sort .Order(Sort.Direction.DESC, "timestamp" ); PageRequest of = PageRequest.of(deviceDataDto.getPageNum(), deviceDataDto.getPageSize(), Sort.by(timestamp)); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders.termQuery("deviceId" , deviceDataDto.getDeviceId())); String logType = deviceDataDto.getLogType(); if (StringUtils.isNotEmpty(logType)){ boolQueryBuilder.must(QueryBuilders.termQuery("logType" , logType)); } String startTime = deviceDataDto.getStartTime(); String endTime = deviceDataDto.getEndTime(); if (StringUtils.isNotEmpty(startTime) && StringUtils.isNotEmpty(endTime)){ boolQueryBuilder.filter(QueryBuilders.rangeQuery("timestamp" ).gte(startTime).lte(endTime)); } NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder (); NativeSearchQuery build = nativeSearchQueryBuilder.withQuery(boolQueryBuilder).withPageable(of).build(); SearchHits<DeviceLog> search = elasticsearchRestTemplate.search(build, DeviceLog.class);
上面构建了一个根据deviceID和时间范围的一个倒叙查找,大概分为三个步骤
通过QueryBuilders创建请求的体,也就是那个大json
NativeSearchQueryBuilder将构建出来的json构建成一个可以执行的语句,相当于在QueryBuilders外面包了一层
通过ElasticsearchRestTemplate执行构建的语句,它相当于就是一个客户端,它负责将我们的请求发出去
比较适合做一些复杂的查询比较灵活,我们可以看到它具体的实现,也方便我们调试。
1 2 3 4 5 6 NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder (); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders.termsQuery("deviceId" , list)); boolQueryBuilder.filter(QueryBuilders.rangeQuery("timestamp" ).gte(from).lte(to)); NativeSearchQuery build = nativeSearchQueryBuilder.withQuery(boolQueryBuilder).build(); return elasticsearchRestTemplate.count(build, ReportPropertyMessage.class);
这个例子是用来查询一组设备在某个时间段的消息数量。
优化后
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders.termsQuery("deviceId" , deviceIds));RangeAggregationBuilder field = AggregationBuilders.range("time_range" ) .field("timestamp" );for (int i = 0 ; i < limit; i++) { if (i == 0 ){ field.addRange(fromTemp , toTemp); }else { fromTemp = fromTemp + step; toTemp = toTemp + step; field.addRange(fromTemp , toTemp); } }NativeSearchQuery build = new NativeSearchQueryBuilder () .withSourceFilter(new FetchSourceFilter (new String []{"" }, null )) .withQuery(boolQueryBuilder) .addAggregation(field) .build(); SearchHits<TestPojo> search = elasticsearchRestTemplate.search(build, TestPojo.class);
问题解决 一段时间不操作过后再次试用报错:connection rest by peer其实就是tcp被断开了然后你又使用了这个tcp。需要通过@Bean手动将我们配置的Bean注入进去。手动设置keepAlive并开启,keepAlive的时间应该小于服务器的keepAlive时间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Configuration public class ElasticserachRestClientConfig { @Value("${spring.elasticsearch.rest.uris}") private String uris; @Value("${spring.elasticsearch.rest.username}") private String userName; @Value("${spring.elasticsearch.rest.password}") private String password; @Bean public RestHighLevelClient restHighLevelClient () { String[] split = uris.split(":" ); RestHighLevelClient restHighLevelClient = new RestHighLevelClient ( RestClient.builder( new HttpHost (split[1 ].replace("//" , "" ), Integer.parseInt(split[2 ])) ).setRequestConfigCallback(new RestClientBuilder .RequestConfigCallback() { @Override public RequestConfig.Builder customizeRequestConfig (RequestConfig.Builder requestConfigBuilder) { return requestConfigBuilder.setConnectTimeout(5000 ) .setSocketTimeout(60000 ); } }).setHttpClientConfigCallback(httpClientBuilder -> { CredentialsProvider credentialsProvider = new BasicCredentialsProvider (); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials (userName, password)); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider) .setKeepAliveStrategy((httpResponse, httpContext) -> TimeUnit.SECONDS.toMillis(3 )) .setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true ).build()); return httpClientBuilder; } ) ); return restHighLevelClient; } }