RxCpp

Reactive Programming似乎是已经过时了。

初次接触的时候是在快毕业的时候,大概2017年。那时RxJava/RxAndroid配合网络库来做一些很fancy的重试、消除重复点击之类的任务,可以把代码写的异常的优雅。当时实际的编码量和项目经验不足,理解起来稍微困难了点。现在工作中也有了可以抽象成stream的场景,看看能不能把这个库看明白点用在工作中。

这里不会太关注rx的一些使用场景,更多关注一些稍微细节一些的实现,以帮助和梳理使用时对各个组件生命周期的把控。

Overview

Observable and Observer

首先提一点,这是个header only的库。

任何东西都能成为observable,所以observable要关联一个东西。这个东西就是observable的第一个模板参数:

template<class T, class SourceOperator>
class observable
{
    using source_operator_type = rxu::decay_t<SourceOperator>;
    mutable source_operator_type source_operator;
};

observable真正持有的也只是它数据来源的上游:SourceOperator。

observer提供3个方法,它关注的也只有这3个方法: on_next/on_error/on_complete,供一个数据源调用,一个observer只处理一个阶段的一种类型的value。

真正把observer挂到observable上的操作,还是observable->subscribe(Subscriber)完成的。 observable自身持有SourceOperatorsubscribe方法里核心的就是一句话:SourceOperator->on_subscribe(Subscriber)

对于通过lift成为一个observable的,由lift提供on_subscribe方法。

但是能挂到SourceOperator上的,是subscriber;这说明了subscriber应该包含了observer才对。实际上也是这样的,subscriber的完整签名是subscriber<T, class Observer = observer<T>>

template<class T, class Observer = observer<T>>
class subscriber : public subscriber_base<T> {
    using this_type = subscriber<T, Observer>;
    using observer_type = rxu::decay_t<Observer>;

    composite_subscription lifetime;
    observer_type destination;
};

composite_subscription又是个在官方Rx教程里没提及的概念,可能也是c++里特有的一个组件。

那么什么叫挂到SourceOperator?

values.map([](int v) {return v+1;});

这就是把[](int v) {return v+1;}挂到了values上。这里经历了好几次的类型转变

  1. [](int v) {return v+1;}这是一个类型转化的函数,先把这个函数包到一个struct rxcpp::operators::details::map里。
  2. struct detail::map进行lift操作,提升成detail::lift_operatorlift_operator里包含了values自己的SourceOperatordetail::map
  3. 将提升后的lift_operator进一步包装成observable(二代),observable(二代)的SourceOperator就是第二步里的detail::lift_operator

后面就可以再续接各种filter/groupby等操作了,因为这时候我们面对的也是一个observable

真正的传统意义的map/filter/groupby的实际代码并不在这些挂接操作里,而是在最后subscribe中。承载这些逻辑的代码也不是detail::map这个层级的类,而是更深层次的detail::map::map_observer。所谓的on_next/on_error/on_complete只存在于detail::map::map_observer

每个operator的on_next/on_error/on_complete三件套是由operator自己提供的,最后subscribe的时候的三件套就需要自己传入subscribe函数里了。不出意外,三件套也会包入一个subscriber,subscribe完成后返回的是composite_subscription

Rx一开始难于理解之处可能也要归咎于此:最终的subscribe完成了实际的逻辑连接。从最后的subscriber一层一层的回调on_subscribe,承载传统意义理解的三件套是在on_subscribe的过程中一级一级的连接到上层的SourceOperator中。

Scheduler

Scheduler给Rx添加了多线程调度运行的能力。官方文档里描述的是这样的:

Scheduler系统的概念来自于RxJava。RxJava提供了scheduler和worker,RxCpp增添了schedulable,coornation和coordinator。

Scheduler
  • 拥有时间线的概念(timeline),通过now()方法来导出
  • 同时也是这条时间线上worker的factory
Worker
  • 持有schedulable队列,到时间了就从这个队列里取出一个来执行。
  • 相同时间点的schedulable是保持了插入顺序的
Schedulable
  • 持有一个可执行的function,外加worker和lifetime
  • 如果lifetime已经unsubscribe(也就是断开),那么function不会执行

coornation和coordinator引入RxCpp的原因有2点

  1. 简化operator的实现
  2. 减少不必要的性能开销

文档中提到,Rx.Net/RxJava里的operator采用原子操作和一些互斥手段来协调不同的流,即使只有一个线程也是这样(比如UI线程)。identity_之类的调度在RxCpp里是没有性能开销的,也是默认的行为。只有synchronize_observe_on_才用mutex保护了一下。

coordination
  • 可以理解为coordinator的factory
  • 持有scheduler
coordinator
  • 持有worker
  • 是需要进行协调的observables, subscribers and schedulable functions的factory

In Depth Code Reading

还是需要对应到具体的代码上才能理解。