Reactive Programming似乎是已经过时了。
初次接触的时候是在快毕业的时候,大概2017年。那时RxJava/RxAndroid配合网络库来做一些很fancy的重试、消除重复点击之类的任务,可以把代码写的异常的优雅。当时实际的编码量和项目经验不足,理解起来稍微困难了点。现在工作中也有了可以抽象成stream的场景,看看能不能把这个库看明白点用在工作中。
这里不会太关注rx的一些使用场景,更多关注一些稍微细节一些的实现,以帮助和梳理使用时对各个组件生命周期的把控。
Overview
Observable and Observer
首先提一点,这是个header only的库。
任何东西都能成为observable,所以observable要关联一个东西。这个东西就是observable的第一个模板参数:
template<class T, class SourceOperator>
class observable
{
using source_operator_type = rxu::decay_t<SourceOperator>;
mutable source_operator_type source_operator;
};
observable真正持有的也只是它数据来源的上游:SourceOperator。
observer提供3个方法,它关注的也只有这3个方法: on_next/on_error/on_complete,供一个数据源调用,一个observer只处理一个阶段的一种类型的value。
真正把observer挂到observable上的操作,还是observable->subscribe(Subscriber)完成的。
observable自身持有SourceOperator,subscribe方法里核心的就是一句话:SourceOperator->on_subscribe(Subscriber)
对于通过lift成为一个observable的,由lift提供on_subscribe方法。
但是能挂到SourceOperator上的,是subscriber;这说明了subscriber应该包含了observer才对。实际上也是这样的,subscriber的完整签名是subscriber<T, class Observer = observer<T>>。
template<class T, class Observer = observer<T>>
class subscriber : public subscriber_base<T> {
using this_type = subscriber<T, Observer>;
using observer_type = rxu::decay_t<Observer>;
composite_subscription lifetime;
observer_type destination;
};
composite_subscription又是个在官方Rx教程里没提及的概念,可能也是c++里特有的一个组件。
那么什么叫挂到SourceOperator?
values.map([](int v) {return v+1;});
这就是把[](int v) {return v+1;}挂到了values上。这里经历了好几次的类型转变
[](int v) {return v+1;}这是一个类型转化的函数,先把这个函数包到一个struct rxcpp::operators::details::map里。- 将
struct detail::map进行lift操作,提升成detail::lift_operator。lift_operator里包含了values自己的SourceOperator和detail::map。 - 将提升后的
lift_operator进一步包装成observable(二代),observable(二代)的SourceOperator就是第二步里的detail::lift_operator。
后面就可以再续接各种filter/groupby等操作了,因为这时候我们面对的也是一个observable。
真正的传统意义的map/filter/groupby的实际代码并不在这些挂接操作里,而是在最后subscribe中。承载这些逻辑的代码也不是detail::map这个层级的类,而是更深层次的detail::map::map_observer。所谓的on_next/on_error/on_complete只存在于detail::map::map_observer。
每个operator的on_next/on_error/on_complete三件套是由operator自己提供的,最后subscribe的时候的三件套就需要自己传入subscribe函数里了。不出意外,三件套也会包入一个subscriber,subscribe完成后返回的是composite_subscription。
Rx一开始难于理解之处可能也要归咎于此:最终的subscribe完成了实际的逻辑连接。从最后的subscriber一层一层的回调on_subscribe,承载传统意义理解的三件套是在on_subscribe的过程中一级一级的连接到上层的SourceOperator中。
Scheduler
Scheduler给Rx添加了多线程调度运行的能力。官方文档里描述的是这样的:
Scheduler系统的概念来自于RxJava。RxJava提供了scheduler和worker,RxCpp增添了schedulable,coornation和coordinator。
Scheduler- 拥有时间线的概念(timeline),通过
now()方法来导出 - 同时也是这条时间线上worker的factory
- 拥有时间线的概念(timeline),通过
Worker- 持有
schedulable队列,到时间了就从这个队列里取出一个来执行。 - 相同时间点的
schedulable是保持了插入顺序的
- 持有
Schedulable- 持有一个可执行的function,外加worker和lifetime
- 如果lifetime已经unsubscribe(也就是断开),那么function不会执行
coornation和coordinator引入RxCpp的原因有2点
- 简化operator的实现
- 减少不必要的性能开销
文档中提到,Rx.Net/RxJava里的operator采用原子操作和一些互斥手段来协调不同的流,即使只有一个线程也是这样(比如UI线程)。
identity_之类的调度在RxCpp里是没有性能开销的,也是默认的行为。只有synchronize_和observe_on_才用mutex保护了一下。
coordination- 可以理解为
coordinator的factory
- 可以理解为
- 持有
scheduler
- 持有
coordinator- 持有worker
- 是需要进行协调的observables, subscribers and schedulable functions的factory
In Depth Code Reading
还是需要对应到具体的代码上才能理解。