jetlinks
Jetlinks
消息上报流程
设备消息对应事件总线topic
协议包将设备上报的报文解析成平台统一处理的消息后,会将消息转换为对应的topic,并发送到对应的事件总线,可以同过从事件总线订阅消息来处理这些消息。
注意:
- 此topic和mqtt的topic没有任何关系,仅仅作为内部通知方式。
设备接入流程
Reactor中的Mono和Flux
相应流的特点
- 响应流必须是无阻塞的
- 响应流必须是一个数据流
- 他必须可以异步执行
- 他也应该可以处理背压
什么是背压
背压是一个反应流的重要概念,可以理解为生产者可以感受到消费者消费的压力,并根据压力进行动态的调整生产速率。如下图:
Publisher
由于向应流的特点,我们不能在返回一个简单的POJO对象来表式结果了。必须返回一个java中类似Future的概念。在有结果可用时通知消费者进行消费响应。Reactor中这种规范被定义为Publisher
我的理解:A1到A9是发布者,A10-A13是订阅者,当只有发布者时是没有任何意义的,就比如这个例子里面,如果没有订阅者的话,这些数字就是毫无意义的,只有有了订阅者这些数据才是有意义的,这就像廖雪峰说的惰性计算,在前面是不进进行计算的,只有到了订阅这里才会计算。
而Mono和Flux都是publisher在reactor 3 实现。publisher提供了subscribe方法,允许消费者在有结果可用的时候进行消费。如果没有消费者时不做作任何事情。他根据消费者的情况进行响应。publisher可以返回零个或者多个,甚至可能是无限个,为了更清楚的表示返回结果就引入了两个模型Mono和Flux .
Flux
他是一个可以发出0-n个元素组成的异步序列的publisher,可以被onError或者onCompelete信号所终止,在响应流中有三种给下游消费者调用的方法,onNext,onCompelete,onError.下面这张图表示了Flux的抽象模型:
就是说流像一个流水线,他们在传送带上等着被处理,operator就相当于流水线工人,当流水线执行完时,这条流水线就会终止,如果在流水作业中发生异常也会提前终止。只不过一次时正常结束,一个是异常终止。
与之前写法对比
1 |
|
这个list 里面也有很多个item但是呢如果你想获得item你得主动的去get,获取或者不获取取决于自己
流式数据处理
1 |
|
和传统的内容一样但是呢,这个更像是吃旋转小火锅,他是一个流,不需要我们去主动的get,不过我们可以搭配着吃。
反应式数据处理
在reactor中我们可以改写成Flux:
1 |
|
这根旋转小火锅又有区别了,旋转小火锅有的的东西我们不喜欢吃但是他仍然会有, 那么Flux就是需要我们点餐,只点我们喜欢吃的,而且可以根据我们的饭量进行调整,如果没有食客订餐那么厨师就什么都不用做了。当然flux的特性不止这一点。
Mono
Mono是一个可以发出0-1个元素的publisher,可以被onCompelete和onError信号所终止
这里整体和Flux差不多,就是Mono只会发出0-1个元素。 也就是说就是有或者没有。
传统数据处理
1 |
|
Optional 的处理方式
1 |
|
这个Optional
我觉得就有反应式的那种味儿了,当然它并不是反应式。当我们不从返回值Optional
取其中具体的对象时,我们不清楚里面到底有没有,但是Optional
是一定客观存在的, 不会出现 NPE 问题。
反应式数据处理
1 |
|
总结
Mono、Flux是推送给消费者的,而我们之前则是通过拉的方式。这种范式让数据有了新特性,比如基于发布订阅的事件驱动、异步驱动、背压等等。同时我们也可以通过map,flatmap去操作他们。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!