jetlinks

Jetlinks

消息上报流程

image-20210830092842457

设备消息对应事件总线topic

协议包将设备上报的报文解析成平台统一处理的消息后,会将消息转换为对应的topic,并发送到对应的事件总线,可以同过从事件总线订阅消息来处理这些消息。

注意

  1. 此topic和mqtt的topic没有任何关系,仅仅作为内部通知方式。

设备接入流程

flow

Reactor中的Mono和Flux

相应流的特点

  1. 响应流必须是无阻塞的
  2. 响应流必须是一个数据流
  3. 他必须可以异步执行
  4. 他也应该可以处理背压

什么是背压

背压是一个反应流的重要概念,可以理解为生产者可以感受到消费者消费的压力,并根据压力进行动态的调整生产速率。如下图:

image-20210831230354966

Publisher

由于向应流的特点,我们不能在返回一个简单的POJO对象来表式结果了。必须返回一个java中类似Future的概念。在有结果可用时通知消费者进行消费响应。Reactor中这种规范被定义为Publisher ,Publisher是一个可以提供0-N个序列元素的提供者,并根据其订阅者Subscriber的需求进行推送。 一个publisher支持多个订阅者,并可以根据订阅者的逻辑进行判断推送元素序列,如下面的Exel表格。

image-20210831231128802

我的理解:A1到A9是发布者,A10-A13是订阅者,当只有发布者时是没有任何意义的,就比如这个例子里面,如果没有订阅者的话,这些数字就是毫无意义的,只有有了订阅者这些数据才是有意义的,这就像廖雪峰说的惰性计算,在前面是不进进行计算的,只有到了订阅这里才会计算。

而Mono和Flux都是publisher在reactor 3 实现。publisher提供了subscribe方法,允许消费者在有结果可用的时候进行消费。如果没有消费者时不做作任何事情。他根据消费者的情况进行响应。publisher可以返回零个或者多个,甚至可能是无限个,为了更清楚的表示返回结果就引入了两个模型Mono和Flux .

Flux

他是一个可以发出0-n个元素组成的异步序列的publisher,可以被onError或者onCompelete信号所终止,在响应流中有三种给下游消费者调用的方法,onNext,onCompelete,onError.下面这张图表示了Flux的抽象模型:

img

就是说流像一个流水线,他们在传送带上等着被处理,operator就相当于流水线工人,当流水线执行完时,这条流水线就会终止,如果在流水作业中发生异常也会提前终止。只不过一次时正常结束,一个是异常终止。

与之前写法对比

1
2
3
4
public List<ClientUser> allUsers() {
return Arrays.asList(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}

这个list 里面也有很多个item但是呢如果你想获得item你得主动的去get,获取或者不获取取决于自己

流式数据处理

1
2
3
4
public Stream<ClientUser> allUsers() {
return Stream.of(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}

和传统的内容一样但是呢,这个更像是吃旋转小火锅,他是一个流,不需要我们去主动的get,不过我们可以搭配着吃。

反应式数据处理

在reactor中我们可以改写成Flux:

1
2
3
4
public Flux<ClientUser> allUsers(){
return Flux.just(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}

这根旋转小火锅又有区别了,旋转小火锅有的的东西我们不喜欢吃但是他仍然会有, 那么Flux就是需要我们点餐,只点我们喜欢吃的,而且可以根据我们的饭量进行调整,如果没有食客订餐那么厨师就什么都不用做了。当然flux的特性不止这一点。

Mono

Mono是一个可以发出0-1个元素的publisher,可以被onCompelete和onError信号所终止

img

这里整体和Flux差不多,就是Mono只会发出0-1个元素。 也就是说就是有或者没有。

传统数据处理

1
2
3
public ClientUser currentUser () {
return isAuthenticated ? new ClientUser("felord.cn", "reactive") : null;
}

Optional 的处理方式

1
2
3
4
public Optional<ClientUser> currentUser () {
return isAuthenticated ? Optional.of(new ClientUser("felord.cn", "reactive"))
: Optional.empty();
}

这个Optional我觉得就有反应式的那种味儿了,当然它并不是反应式。当我们不从返回值Optional取其中具体的对象时,我们不清楚里面到底有没有,但是Optional是一定客观存在的, 不会出现 NPE 问题。

反应式数据处理

1
2
3
4
public Mono<ClientUser>	currentUser{
return isAuthenticated ?Mono.just(new ClientUser("felord.cn","reactive"))
:Mono.empty();
}

总结

​ Mono、Flux是推送给消费者的,而我们之前则是通过拉的方式。这种范式让数据有了新特性,比如基于发布订阅的事件驱动、异步驱动、背压等等。同时我们也可以通过map,flatmap去操作他们。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!