Skip to content Skip to sidebar Skip to footer

How To Process Rxjs Stream N Items At A Time And Once An Item Is Done, Autofill Back To N Again?

I have a stream of events and I would like to call a function that returns a promise for each of those events, the problem is that this function is very expensive, so I would like

Solution 1:

You actually don't need to use backpressure at all. There is an operator called flatMapWithMaxConcurrent that does this for you. It is essentially an alias for calling .map().merge(concurrency) and it only allows a maximum number of streams to be in flight at a time.

I updated your jsbin here: http://jsbin.com/weheyuceke/1/edit?js,output

But I annotated the important bit below:

const concurrency = 4;

var done$ = events$
  //Only allows a maximum number of items to be subscribed to at a time
  .flatMapWithMaxConcurrent(concurrency, 
    ({timestamp}) =>//This overload of `fromPromise` defers the execution of the lambda//until subscription                    Rx.Observable.fromPromise(() => { 
        //Notify the ui that this task is in progress                                 updatePanelAppend(inProgress, timestamp);
        removeFromPanel(pending, timestamp);
        //return the taskreturnexpensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
     }));

Post a Comment for "How To Process Rxjs Stream N Items At A Time And Once An Item Is Done, Autofill Back To N Again?"