Reactive Programming似乎是已经过时了。
初次接触的时候是在快毕业的时候,大概2017年。那时RxJava/RxAndroid配合网络库来做一些很fancy的重试、消除重复点击之类的任务,可以把代码写的异常的优雅。当时实际的编码量和项目经验不足,理解起来稍微困难了点。现在工作中也有了可以抽象成stream
的场景,看看能不能把这个库看明白点用在工作中。
这里不会太关注rx的一些使用场景,更多关注一些稍微细节一些的实现,以帮助和梳理使用时对各个组件生命周期的把控。
Overview
Observable and Observer
首先提一点,这是个header only的库。
任何东西都能成为observable
,所以observable
要关联一个东西。这个东西就是observable
的第一个模板参数:
template<class T, class SourceOperator>
class observable
{
using source_operator_type = rxu::decay_t<SourceOperator>;
mutable source_operator_type source_operator;
};
observable
真正持有的也只是它数据来源的上游:SourceOperator。
observer
提供3个方法,它关注的也只有这3个方法: on_next/on_error/on_complete,供一个数据源调用,一个observer
只处理一个阶段的一种类型的value。
真正把observer
挂到observable
上的操作,还是observable->subscribe(Subscriber)
完成的。
observable
自身持有SourceOperator
,subscribe
方法里核心的就是一句话:SourceOperator->on_subscribe(Subscriber)
对于通过lift
成为一个observable
的,由lift提供on_subscribe
方法。
但是能挂到SourceOperator
上的,是subscriber
;这说明了subscriber
应该包含了observer
才对。实际上也是这样的,subscriber
的完整签名是subscriber<T, class Observer = observer<T>>
。
template<class T, class Observer = observer<T>>
class subscriber : public subscriber_base<T> {
using this_type = subscriber<T, Observer>;
using observer_type = rxu::decay_t<Observer>;
composite_subscription lifetime;
observer_type destination;
};
composite_subscription
又是个在官方Rx
教程里没提及的概念,可能也是c++
里特有的一个组件。
那么什么叫挂到SourceOperator
?
values.map([](int v) {return v+1;});
这就是把[](int v) {return v+1;}
挂到了values上。这里经历了好几次的类型转变
[](int v) {return v+1;}
这是一个类型转化的函数,先把这个函数包到一个struct rxcpp::operators::details::map
里。- 将
struct detail::map
进行lift操作,提升成detail::lift_operator
。lift_operator
里包含了values自己的SourceOperator
和detail::map
。 - 将提升后的
lift_operator
进一步包装成observable
(二代),observable
(二代)的SourceOperator
就是第二步里的detail::lift_operator
。
后面就可以再续接各种filter/groupby等操作了,因为这时候我们面对的也是一个observable
。
真正的传统意义的map/filter/groupby的实际代码并不在这些挂接操作里,而是在最后subscribe
中。承载这些逻辑的代码也不是detail::map
这个层级的类,而是更深层次的detail::map::map_observer
。所谓的on_next/on_error/on_complete只存在于detail::map::map_observer
。
每个operator
的on_next/on_error/on_complete三件套是由operator自己提供的,最后subscribe的时候的三件套就需要自己传入subscribe
函数里了。不出意外,三件套也会包入一个subscriber,subscribe完成后返回的是composite_subscription
。
Rx
一开始难于理解之处可能也要归咎于此:最终的subscribe
完成了实际的逻辑连接。从最后的subscriber
一层一层的回调on_subscribe
,承载传统意义理解的三件套是在on_subscribe
的过程中一级一级的连接到上层的SourceOperator
中。
Scheduler
Scheduler
给Rx添加了多线程调度运行的能力。官方文档里描述的是这样的:
Scheduler系统的概念来自于RxJava。RxJava提供了scheduler和worker,RxCpp增添了schedulable,coornation和coordinator。
Scheduler
- 拥有时间线的概念(timeline),通过
now()
方法来导出 - 同时也是这条时间线上worker的factory
- 拥有时间线的概念(timeline),通过
Worker
- 持有
schedulable
队列,到时间了就从这个队列里取出一个来执行。 - 相同时间点的
schedulable
是保持了插入顺序的
- 持有
Schedulable
- 持有一个可执行的function,外加worker和lifetime
- 如果lifetime已经unsubscribe(也就是断开),那么function不会执行
coornation和coordinator引入RxCpp的原因有2点
- 简化operator的实现
- 减少不必要的性能开销
文档中提到,Rx.Net/RxJava里的operator采用原子操作和一些互斥手段来协调不同的流,即使只有一个线程也是这样(比如UI线程)。
identity_
之类的调度在RxCpp里是没有性能开销的,也是默认的行为。只有synchronize_
和observe_on_
才用mutex保护了一下。
coordination
- 可以理解为
coordinator
的factory
- 可以理解为
- 持有
scheduler
- 持有
coordinator
- 持有worker
- 是需要进行协调的observables, subscribers and schedulable functions的factory
In Depth Code Reading
还是需要对应到具体的代码上才能理解。