pipeline

前面介绍的VendingInteractor提供了被动异步数据形式,为了弥补主动调用实现异步的空白,我们提供了Pipeline组件进行支持。

-com.tencent.mm.vending.pipeline,Pipeline的实现采用了函数式中管道的概念,它是将数据经过一系列函数处理后得到最终结果的函数集合。函数处理逻辑的载体我们称为Functional类,每个Functional类接受上一个返回结果,并将处理后的结果传递给下一个Functional类。

new Pipeline().resolve("arguments").$logic(new Functional<Integer, String>() {
        @Override
        public Integer call(String input) {
            // input is "arguments", then do some process here
            return 0; // ret
        }
    }).$ui(new Functional<Void, Integer>() {
        @Override
        public Void call(Integer ret) {
            // get 0 from last functional
            return nil;
        }
    });

Pipeable

-Pipeline继承Pipeable,定义了pipeline执行调用的所有接口。

接口:

Pipeable<_Var> resolve(final Resolve<_Var> resolve);
Pipeable<_Var> resolve(final Object... args);

-resolve()是pipeline执行链的起始函数,传入参数后pipeline立即进入到执行状态

Pipeable<_Ret> next(Functional<_Ret, _Var> functional);

-next()接受Functional对象,并将其接入pipeline中,默认情况下最近指定的Scheduler

Pipeable<_Var> $(String schedulerType);
Pipeable<_Var> $(Scheduler scheduler);

-$()用于指定接下来执行FunctionalScheduler

SchedulerFunctional的执行者,用于调度执行线程,可能是单线程,也可能是多线程轮训,这由具体实现决定。

Pipeable<_Ret> $ui(Functional<_Ret, _Var> functional);
Pipeable<_Ret> $logic(Functional<_Ret, _Var> functional);
Pipeable<_Ret> $heavyWork(Functional<_Ret, _Var> functional);

-$ui 指定Scheduler"Vending.UI"并执行next(functional)

-$logic 指定Scheduler"Vending.LOGIC"并执行next(functional)

-$heavyWork 指定Scheduler"Vending.HEAVY_WORK"并执行next(functional)

Pipeable<_Var> lifeCycle(ILifeCycleKeeper keeper);

-lifeCycle() 实现ILifeCycle接口

Pipeable<_Var> interval(long millisecond);

-interval() 间隔millisecond微秒后再执行下一个Functional

void stop();

-stop() 停止当前pipeline的执行,不产生任何的回调

PipeableTerminal<_Var> terminate();

-terminate() pipeline执行链末端,返回PipeableTerminal<_Var>对象,进行结果处理

Mario mario();

-mario() 返回当前pipeline的Mario对象,用于操控pipeline的执行流程

PipeableTerminal

-Pipeable继承PipeableTerminal,用于标记pipeline执行终点,并注册onProgress()/onInterrupt()/onTerminate()处理函数。为了让pipeline执行链定义明显的边界,调用onProgress()/onInterrupt()/onTerminate()/terminate()函数,返回的均为PipeableTerminal对象。PipeableTerminal的状态标记为已终结,不能再衔接任何的Pipeable接口。

接口:

PipeableTerminal<_Var> onProgress(Progress progress);
PipeableTerminal<_Var> onProgress(String schedType, Progress progress);
PipeableTerminal<_Var> onProgress(Scheduler scheduler, Progress progress);

-onProgress() 接收来自pipeline的progress消息,可以指定执行Scheduler

PipeableTerminal<_Var> onInterrupt(Interrupt interrupt);
PipeableTerminal<_Var> onInterrupt(String schedType, Interrupt interrupt);
PipeableTerminal<_Var> onInterrupt(Scheduler scheduler, Interrupt interrupt);

-onInterrupt() 接收来自pipeline的interrupt消息,可以指定执行Scheduler

PipeableTerminal<_Var> onTerminate(Terminate<_Var> terminate);
PipeableTerminal<_Var> onTerminate(String schedType, Terminate<_Var> terminate);
PipeableTerminal<_Var> onTerminate(Scheduler scheduler, Terminate<_Var> terminate);

-onTerminate() 接收pipeline的执行结果,可以指定执行Scheduler

Mario

-Mario支持一些操纵pipeline执行流的功能,从Pipeable.mario()返回。它支持:

  • 发送progress消息
  • Functional为粒度,发送interrupt消息,终止当前pipeline。
  • Functional为粒度,暂停和恢复pipeline的执行链条。

接口:

void progress(Object object);

-`progress()发送progress信息,可多次调用,如果onProgress定义时间过晚,不保证消息一定送达。

...
.$logic(new Functional<Integer, Void>() {
        @Override
        public Void call(Integer position) {
            int progress = 0;

            while (progress != 100) {
                mario().progress(progress++);
            }

            return nil;
        }
    })
...
void interrupt(Object object);
void retryOrInterrupt(int maxRetry, Object object);

-interrupt() 发送interrupt信息,且不再执行后续Functional,只能调用一次。

-retryOrInterrupt() 尝试重新执行当前Functional,执行超过maxRetry次后,发送interrupt信息,且不再执行后续Functional

void pending();
void resume();
void wormhole(Object ... args);

-pending() 暂停pipeline,当前Functional执行完成后,不调用下一个Functional,直至resume()wormhole()被调用。

-resume() 恢复pipeline执行

-wormhole() 功能同resume(),区别是可以传入Object ... args参数作为当前Functional的返回结果。

需要注意是,Mario接口调用,都应该在该pipeline的内部调用,也就是Functional内调用。resume()稍微特殊,可以在其他地方调用是因为接口用于恢复pipeline执行。

StatelessPipeline

-com.tencent.mm.vending.app.StatelessPipeline同样继承Pipeable。不同于PipelineStatelessPipeline是无状态的,可以通过调用resolve()反复执行。

通常用在响应事件的listener上,方便编码:

private OnItemClickPipeline mOnItemClickListener 
        = new OnItemClickPipeline<Integer>() {
            @Override
            public void onItemClick(AdapterView<?> parent, View view, int position, long id) {
                // call resolve to invoke this StatelessPipeline
                resolve(position - mGameListView.getHeaderViewsCount());
            }
        }
        .$logic(new Functional<Void, Integer>() {
            @Override
            public Void call(Integer position) {
                mGamesVending.hot(position);

                // request Vending to update struct in this position 
                mGamesVending.request(position); 

                return nil;
            }
        })
        .threshold(300)              // 300ms threshold
        .lifeCycle(getInteractor())  // bind lifeCycle on Interactor
        .done();

我们封装的一些常用事件listener:

  • OnCheckedChangedPipeline<_Var>
  • OnClickPipeline<_Var>
  • OnItemClickPipeline<_Var>
  • OnItemSelectedPipeline<_Var>
  • OnLongClickPipeline<_Var>

results matching ""

    No results matching ""