pipeline
前面介绍的Vending
和Interactor
提供了被动异步数据形式,为了弥补主动调用实现异步的空白,我们提供了Pipeline
组件进行支持。
-com.tencent.mm.vending.pipeline,Pipeline
的实现采用了函数式中管道的概念,它是将数据经过一系列函数处理后得到最终结果的函数集合。函数处理逻辑的载体我们称为Functional
类,每个Functional
类接受上一个返回结果,并将处理后的结果传递给下一个Functional
类。
new Pipeline().resolve("arguments").$logic(new Functional<Integer, String>() {
@Override
public Integer call(String input) {
// input is "arguments", then do some process here
return 0; // ret
}
}).$ui(new Functional<Void, Integer>() {
@Override
public Void call(Integer ret) {
// get 0 from last functional
return nil;
}
});
Pipeable
-Pipeline
继承Pipeable
,定义了pipeline执行调用的所有接口。
接口:
Pipeable<_Var> resolve(final Resolve<_Var> resolve);
Pipeable<_Var> resolve(final Object... args);
-resolve()
是pipeline执行链的起始函数,传入参数后pipeline立即进入到执行状态。
Pipeable<_Ret> next(Functional<_Ret, _Var> functional);
-next()
接受Functional对象,并将其接入pipeline中,默认情况下最近指定的Scheduler
。
Pipeable<_Var> $(String schedulerType);
Pipeable<_Var> $(Scheduler scheduler);
-$()
用于指定接下来执行Functional
的Scheduler
。
Scheduler
是Functional
的执行者,用于调度执行线程,可能是单线程,也可能是多线程轮训,这由具体实现决定。
Pipeable<_Ret> $ui(Functional<_Ret, _Var> functional);
Pipeable<_Ret> $logic(Functional<_Ret, _Var> functional);
Pipeable<_Ret> $heavyWork(Functional<_Ret, _Var> functional);
-$ui
指定Scheduler
为"Vending.UI"
并执行next(functional)
-$logic
指定Scheduler
为"Vending.LOGIC"
并执行next(functional)
-$heavyWork
指定Scheduler
为"Vending.HEAVY_WORK"
并执行next(functional)
Pipeable<_Var> lifeCycle(ILifeCycleKeeper keeper);
-lifeCycle()
实现ILifeCycle
接口
Pipeable<_Var> interval(long millisecond);
-interval()
间隔millisecond
微秒后再执行下一个Functional
void stop();
-stop()
停止当前pipeline的执行,不产生任何的回调
PipeableTerminal<_Var> terminate();
-terminate()
pipeline执行链末端,返回PipeableTerminal<_Var>
对象,进行结果处理
Mario mario();
-mario()
返回当前pipeline的Mario
对象,用于操控pipeline的执行流程
PipeableTerminal
-Pipeable
继承PipeableTerminal
,用于标记pipeline执行终点,并注册onProgress()
/onInterrupt()
/onTerminate()
处理函数。为了让pipeline执行链定义明显的边界,调用onProgress()
/onInterrupt()
/onTerminate()
/terminate()
函数,返回的均为PipeableTerminal
对象。PipeableTerminal
的状态标记为已终结,不能再衔接任何的Pipeable
接口。
接口:
PipeableTerminal<_Var> onProgress(Progress progress);
PipeableTerminal<_Var> onProgress(String schedType, Progress progress);
PipeableTerminal<_Var> onProgress(Scheduler scheduler, Progress progress);
-onProgress()
接收来自pipeline的progress消息,可以指定执行Scheduler
。
PipeableTerminal<_Var> onInterrupt(Interrupt interrupt);
PipeableTerminal<_Var> onInterrupt(String schedType, Interrupt interrupt);
PipeableTerminal<_Var> onInterrupt(Scheduler scheduler, Interrupt interrupt);
-onInterrupt()
接收来自pipeline的interrupt消息,可以指定执行Scheduler
。
PipeableTerminal<_Var> onTerminate(Terminate<_Var> terminate);
PipeableTerminal<_Var> onTerminate(String schedType, Terminate<_Var> terminate);
PipeableTerminal<_Var> onTerminate(Scheduler scheduler, Terminate<_Var> terminate);
-onTerminate()
接收pipeline的执行结果,可以指定执行Scheduler
。
Mario
-Mario
支持一些操纵pipeline执行流的功能,从Pipeable.mario()
返回。它支持:
- 发送progress消息
- 以
Functional
为粒度,发送interrupt消息,终止当前pipeline。 - 以
Functional
为粒度,暂停和恢复pipeline的执行链条。
接口:
void progress(Object object);
-`progress()
发送progress信息,可多次调用,如果onProgress定义时间过晚,不保证消息一定送达。
...
.$logic(new Functional<Integer, Void>() {
@Override
public Void call(Integer position) {
int progress = 0;
while (progress != 100) {
mario().progress(progress++);
}
return nil;
}
})
...
void interrupt(Object object);
void retryOrInterrupt(int maxRetry, Object object);
-interrupt()
发送interrupt信息,且不再执行后续Functional
,只能调用一次。
-retryOrInterrupt()
尝试重新执行当前Functional
,执行超过maxRetry
次后,发送interrupt信息,且不再执行后续Functional
。
void pending();
void resume();
void wormhole(Object ... args);
-pending()
暂停pipeline,当前Functional
执行完成后,不调用下一个Functional
,直至resume()
或wormhole()
被调用。
-resume()
恢复pipeline执行
-wormhole()
功能同resume()
,区别是可以传入Object ... args
参数作为当前Functional
的返回结果。
需要注意是,Mario
接口调用,都应该在该pipeline的内部调用,也就是Functional
内调用。resume()
稍微特殊,可以在其他地方调用是因为接口用于恢复pipeline执行。
StatelessPipeline
-com.tencent.mm.vending.app.StatelessPipeline
同样继承Pipeable
。不同于Pipeline
,StatelessPipeline
是无状态的,可以通过调用resolve()
反复执行。
通常用在响应事件的listener上,方便编码:
private OnItemClickPipeline mOnItemClickListener
= new OnItemClickPipeline<Integer>() {
@Override
public void onItemClick(AdapterView<?> parent, View view, int position, long id) {
// call resolve to invoke this StatelessPipeline
resolve(position - mGameListView.getHeaderViewsCount());
}
}
.$logic(new Functional<Void, Integer>() {
@Override
public Void call(Integer position) {
mGamesVending.hot(position);
// request Vending to update struct in this position
mGamesVending.request(position);
return nil;
}
})
.threshold(300) // 300ms threshold
.lifeCycle(getInteractor()) // bind lifeCycle on Interactor
.done();
我们封装的一些常用事件listener:
OnCheckedChangedPipeline<_Var>
OnClickPipeline<_Var>
OnItemClickPipeline<_Var>
OnItemSelectedPipeline<_Var>
OnLongClickPipeline<_Var>