deliveryhero / pipeline Goto Github PK
View Code? Open in Web Editor NEWA library to help you create pipelines in Golang
License: MIT License
A library to help you create pipelines in Golang
License: MIT License
Is your feature request related to a problem? Please describe.
At the moment when we run a pipeline there is no way to check if the spawned goroutines have done their job or not.
Describe the solution you'd like
It would be great if had something like a wait group inside the pipeline to be able to wait until all the processors are done.
Additional context`
I wanted to use the library to implement the following pipeline.
func RunPipeline() (txinput.TxInput, error) {
// create stakeDepositsNum of stakeDepositMessagesRequests.
stakeDepositsNum := dp.Amount / uint64(s.maxStakedBalance)
stakeDepositMessageRequest := s.createStakeDepositMessageRequest(authInfo, stake.ID, dp)
stakeDepositMessagesRequests := s.createStakeDepositMessageRequests(stakeDepositMessageRequest, stakeDepositsNum)
// 1. Create a pipeline that yields req.StakeDepositMessage stakeDepositsNum times.
// 2. Process req.StakeDepositMessages and turn them into DepositMessageResult.
// 3. Process DepositMessageResult and turn them into DepositTxInputResult.
p := pipeline.Emit(stakeDepositMessagesRequests...)
p = pipeline.ProcessConcurrently(ctx, s.numCPU, s.depositMessageProcessor, p)
p = pipeline.ProcessConcurrently(ctx, s.numCPU, s.depositTxInputProcessor, p)
// allocate results
txInputs := make([]txinput.TxInput, stakeDepositsNum)
// if range over p at this time I have no guarantee, that the pipeline goroutines have done their jobs.
// there should be a way to wait for the pipeline to finish.
for v := range p {
// collect results
}
return txInputs, nil
}
In the meantime I have to use errgroup to do the synchronisation.
func RunPipeline() (txinput.TxInput, error) {
// allocate results
txInputs := make([]txinput.TxInput, stakeDepositsNum)
// init channels and an error group to sync goroutines
depositMessageResultCh := make(chan processor.DepositMessageResult)
depositTxInputResultCh := make(chan processor.DepositTxInputResult)
g, gCtx := errgroup.WithContext(ctx)
// 1. Create a pipeline that yields req.StakeDepositMessage stakeDepositsNum times.
pipe := s.repeatFnTimes(gCtx, stakeDepositsNum, s.createStakeDepositMessageRequestFn(authInfo, stake.ID, dp))
// 2. Process req.StakeDepositMessages and turn them into DepositMessageResult.
g.Go(s.processDepositMessageRequests(gCtx, pipe, depositMessageResultCh))
// 3. Process DepositMessageResult and turn them into DepositTxInputResult.
g.Go(s.processDepositMessageResult(gCtx, depositMessageResultCh, depositTxInputResultCh))
// 4. Collect TxInputs.
g.Go(s.collect(gCtx, depositTxInputResultCh, txInputs))
// wait for goroutines to finish.
if err := g.Wait(); err != nil {
return nil, merry.Wrap(ErrCannotRunStakingPipeline, merry.WithCause(err))
}
return txInputs, nil
}
Any advice is really appreciated.
Is your feature request related to a problem? Please describe.
I'm trying to use this wonderful library in one of my project, but I'm not sure how to handle processor errors correctly.
At the moment if an errors occurs the processor.Cancel method is called, but how is it intended by the library to actually
handle these errors by the caller?
Describe the solution you'd like
There should be a simple way to return an error channel from the processor.
Describe alternatives you've considered
Alternatively we have to pass an error channel to all the processor we use in the pipeline and send an error to that channel in the process.Cancel method.
Additional context
At the moment I've created a special Result type to be used by all the processors. It has an Err and a Value fields.
But the code is very verbose:
type Result struct {
Err error
Value interface{}
}
func (p *SomeProcessor) Process(ctx context.Context, ins interface{}) (interface{}, error) {
insSlice, err := toSliceOfInterfaces(ins)
if err != nil {
return nil, merry.Wrap(ErrCannotProcessDepositTxInput, merry.WithCause(err))
}
result := make([]interface{}, 0, len(insSlice))
for _, in := range insSlice {
res, err := ToResult(in)
if err != nil {
result = append(result, Result{
Err: errors.Wrap(ErrCannotConvertInterfaceToResult),
Value: nil,
})
continue
}
foo, err := foo(res.Value)
if err != nil {
result = append(result, Result{
Err: errors.Wrap(ErrCannotRunFoo),
Value: nil,
})
continue
}
bar, err := bar(ctx, foo)
if err != nil {
result = append(result, Result{
Err: errors.Wrap(ErrCannotRunBar),
Value: nil,
})
continue
}
result = append(result, Result{Err: nil, Value: bar})
}
return result, nil
}
Any help or advice is appreciated.
Describe the bug
It seems that the last published version for this project is v1.0.0
.
To Reproduce
Steps to reproduce the behavior:
GOPROXY= go list -m -versions github.com/deliveryhero/pipeline
v1.0.0
is the most recent.Expected behavior
It would reflect up to the latest version, v2.1.1
.
Console output
$ GOPROXY= go list -m -versions github.com/deliveryhero/pipeline
github.com/deliveryhero/pipeline v0.1.0 v0.2.0 v0.2.1 v0.2.2 v0.2.3 v0.3.0 v0.3.1 v0.4.0 v1.0.0
Client info (please complete the following information):
Additional context
I am very new to Go and may very well be doing something wrong!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.