pipeline
前面介绍的Vending和Interactor提供了被动异步数据形式,为了弥补主动调用实现异步的空白,我们提供了Pipeline组件进行支持。
-com.tencent.mm.vending.pipeline,Pipeline的实现采用了函数式中管道的概念,它是将数据经过一系列函数处理后得到最终结果的函数集合。函数处理逻辑的载体我们称为Functional类,每个Functional类接受上一个返回结果,并将处理后的结果传递给下一个Functional类。
new Pipeline().resolve("arguments").$logic(new Functional<Integer, String>() {
@Override
public Integer call(String input) {
// input is "arguments", then do some process here
return 0; // ret
}
}).$ui(new Functional<Void, Integer>() {
@Override
public Void call(Integer ret) {
// get 0 from last functional
return nil;
}
});
Pipeable
-Pipeline继承Pipeable,定义了pipeline执行调用的所有接口。
接口:
Pipeable<_Var> resolve(final Resolve<_Var> resolve);
Pipeable<_Var> resolve(final Object... args);
-resolve()是pipeline执行链的起始函数,传入参数后pipeline立即进入到执行状态。
Pipeable<_Ret> next(Functional<_Ret, _Var> functional);
-next()接受Functional对象,并将其接入pipeline中,默认情况下最近指定的Scheduler。
Pipeable<_Var> $(String schedulerType);
Pipeable<_Var> $(Scheduler scheduler);
-$()用于指定接下来执行Functional的Scheduler。
Scheduler是Functional的执行者,用于调度执行线程,可能是单线程,也可能是多线程轮训,这由具体实现决定。
Pipeable<_Ret> $ui(Functional<_Ret, _Var> functional);
Pipeable<_Ret> $logic(Functional<_Ret, _Var> functional);
Pipeable<_Ret> $heavyWork(Functional<_Ret, _Var> functional);
-$ui 指定Scheduler为"Vending.UI"并执行next(functional)
-$logic 指定Scheduler为"Vending.LOGIC"并执行next(functional)
-$heavyWork 指定Scheduler为"Vending.HEAVY_WORK"并执行next(functional)
Pipeable<_Var> lifeCycle(ILifeCycleKeeper keeper);
-lifeCycle() 实现ILifeCycle接口
Pipeable<_Var> interval(long millisecond);
-interval() 间隔millisecond微秒后再执行下一个Functional
void stop();
-stop() 停止当前pipeline的执行,不产生任何的回调
PipeableTerminal<_Var> terminate();
-terminate() pipeline执行链末端,返回PipeableTerminal<_Var>对象,进行结果处理
Mario mario();
-mario() 返回当前pipeline的Mario对象,用于操控pipeline的执行流程
PipeableTerminal
-Pipeable继承PipeableTerminal,用于标记pipeline执行终点,并注册onProgress()/onInterrupt()/onTerminate()处理函数。为了让pipeline执行链定义明显的边界,调用onProgress()/onInterrupt()/onTerminate()/terminate()函数,返回的均为PipeableTerminal对象。PipeableTerminal的状态标记为已终结,不能再衔接任何的Pipeable接口。
接口:
PipeableTerminal<_Var> onProgress(Progress progress);
PipeableTerminal<_Var> onProgress(String schedType, Progress progress);
PipeableTerminal<_Var> onProgress(Scheduler scheduler, Progress progress);
-onProgress() 接收来自pipeline的progress消息,可以指定执行Scheduler。
PipeableTerminal<_Var> onInterrupt(Interrupt interrupt);
PipeableTerminal<_Var> onInterrupt(String schedType, Interrupt interrupt);
PipeableTerminal<_Var> onInterrupt(Scheduler scheduler, Interrupt interrupt);
-onInterrupt() 接收来自pipeline的interrupt消息,可以指定执行Scheduler。
PipeableTerminal<_Var> onTerminate(Terminate<_Var> terminate);
PipeableTerminal<_Var> onTerminate(String schedType, Terminate<_Var> terminate);
PipeableTerminal<_Var> onTerminate(Scheduler scheduler, Terminate<_Var> terminate);
-onTerminate() 接收pipeline的执行结果,可以指定执行Scheduler。
Mario
-Mario支持一些操纵pipeline执行流的功能,从Pipeable.mario()返回。它支持:
- 发送progress消息
- 以
Functional为粒度,发送interrupt消息,终止当前pipeline。 - 以
Functional为粒度,暂停和恢复pipeline的执行链条。
接口:
void progress(Object object);
-`progress()发送progress信息,可多次调用,如果onProgress定义时间过晚,不保证消息一定送达。
...
.$logic(new Functional<Integer, Void>() {
@Override
public Void call(Integer position) {
int progress = 0;
while (progress != 100) {
mario().progress(progress++);
}
return nil;
}
})
...
void interrupt(Object object);
void retryOrInterrupt(int maxRetry, Object object);
-interrupt() 发送interrupt信息,且不再执行后续Functional,只能调用一次。
-retryOrInterrupt() 尝试重新执行当前Functional,执行超过maxRetry次后,发送interrupt信息,且不再执行后续Functional。
void pending();
void resume();
void wormhole(Object ... args);
-pending() 暂停pipeline,当前Functional执行完成后,不调用下一个Functional,直至resume()或wormhole()被调用。
-resume() 恢复pipeline执行
-wormhole() 功能同resume(),区别是可以传入Object ... args参数作为当前Functional的返回结果。
需要注意是,Mario接口调用,都应该在该pipeline的内部调用,也就是Functional内调用。resume()稍微特殊,可以在其他地方调用是因为接口用于恢复pipeline执行。
StatelessPipeline
-com.tencent.mm.vending.app.StatelessPipeline同样继承Pipeable。不同于Pipeline,StatelessPipeline是无状态的,可以通过调用resolve()反复执行。
通常用在响应事件的listener上,方便编码:
private OnItemClickPipeline mOnItemClickListener
= new OnItemClickPipeline<Integer>() {
@Override
public void onItemClick(AdapterView<?> parent, View view, int position, long id) {
// call resolve to invoke this StatelessPipeline
resolve(position - mGameListView.getHeaderViewsCount());
}
}
.$logic(new Functional<Void, Integer>() {
@Override
public Void call(Integer position) {
mGamesVending.hot(position);
// request Vending to update struct in this position
mGamesVending.request(position);
return nil;
}
})
.threshold(300) // 300ms threshold
.lifeCycle(getInteractor()) // bind lifeCycle on Interactor
.done();
我们封装的一些常用事件listener:
OnCheckedChangedPipeline<_Var>OnClickPipeline<_Var>OnItemClickPipeline<_Var>OnItemSelectedPipeline<_Var>OnLongClickPipeline<_Var>