Giter Club home page Giter Club logo

pipeline's People

Contributors

marksalpeter avatar solokirrik avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pipeline's Issues

โญ๏ธ How to make sure that all the processors finished their job?

Is your feature request related to a problem? Please describe.
At the moment when we run a pipeline there is no way to check if the spawned goroutines have done their job or not.

Describe the solution you'd like
It would be great if had something like a wait group inside the pipeline to be able to wait until all the processors are done.

Additional context`

I wanted to use the library to implement the following pipeline.

func RunPipeline() (txinput.TxInput, error) {
         // create stakeDepositsNum of stakeDepositMessagesRequests.
	stakeDepositsNum := dp.Amount / uint64(s.maxStakedBalance)
	stakeDepositMessageRequest := s.createStakeDepositMessageRequest(authInfo, stake.ID, dp)
	stakeDepositMessagesRequests := s.createStakeDepositMessageRequests(stakeDepositMessageRequest, stakeDepositsNum)

	// 1. Create a pipeline that yields req.StakeDepositMessage stakeDepositsNum times.
	// 2. Process req.StakeDepositMessages and turn them into DepositMessageResult.
	// 3. Process DepositMessageResult and turn them into DepositTxInputResult.
	p := pipeline.Emit(stakeDepositMessagesRequests...)
	p = pipeline.ProcessConcurrently(ctx, s.numCPU, s.depositMessageProcessor, p)
	p = pipeline.ProcessConcurrently(ctx, s.numCPU, s.depositTxInputProcessor, p)

	// allocate results
	txInputs := make([]txinput.TxInput, stakeDepositsNum)
	
	// if range over p at this time I have no guarantee, that the pipeline goroutines have done their jobs.
        // there should be a way to wait for the pipeline to finish.
       for v := range p {
          // collect results  
       } 

       return txInputs, nil
}

In the meantime I have to use errgroup to do the synchronisation.

func RunPipeline() (txinput.TxInput, error) {
	// allocate results
	txInputs := make([]txinput.TxInput, stakeDepositsNum)

	// init channels and an error group to sync goroutines
	depositMessageResultCh := make(chan processor.DepositMessageResult)
	depositTxInputResultCh := make(chan processor.DepositTxInputResult)

	g, gCtx := errgroup.WithContext(ctx)

	// 1. Create a pipeline that yields req.StakeDepositMessage stakeDepositsNum times.
	pipe := s.repeatFnTimes(gCtx, stakeDepositsNum, s.createStakeDepositMessageRequestFn(authInfo, stake.ID, dp))

	// 2. Process req.StakeDepositMessages and turn them into DepositMessageResult.
	g.Go(s.processDepositMessageRequests(gCtx, pipe, depositMessageResultCh))

	// 3. Process DepositMessageResult and turn them into DepositTxInputResult.
	g.Go(s.processDepositMessageResult(gCtx, depositMessageResultCh, depositTxInputResultCh))

	// 4. Collect TxInputs.
	g.Go(s.collect(gCtx, depositTxInputResultCh, txInputs))

	// wait for goroutines to finish.
	if err := g.Wait(); err != nil {
		return nil, merry.Wrap(ErrCannotRunStakingPipeline, merry.WithCause(err))
	}

        return txInputs, nil
}

Any advice is really appreciated.

Error Handling Example

Is your feature request related to a problem? Please describe.
I'm trying to use this wonderful library in one of my project, but I'm not sure how to handle processor errors correctly.
At the moment if an errors occurs the processor.Cancel method is called, but how is it intended by the library to actually
handle these errors by the caller?

Describe the solution you'd like
There should be a simple way to return an error channel from the processor.

Describe alternatives you've considered
Alternatively we have to pass an error channel to all the processor we use in the pipeline and send an error to that channel in the process.Cancel method.

Additional context
At the moment I've created a special Result type to be used by all the processors. It has an Err and a Value fields.
But the code is very verbose:

type Result struct {
    Err error
    Value interface{}
}

func (p *SomeProcessor) Process(ctx context.Context, ins interface{}) (interface{}, error) {
	insSlice, err := toSliceOfInterfaces(ins)
	if err != nil {
		return nil, merry.Wrap(ErrCannotProcessDepositTxInput, merry.WithCause(err))
	}

	result := make([]interface{}, 0, len(insSlice))

	for _, in := range insSlice {
		res, err := ToResult(in)
		if err != nil {
			result = append(result, Result{
				Err:   errors.Wrap(ErrCannotConvertInterfaceToResult),
				Value: nil,
			})

			continue
		}

		foo, err := foo(res.Value)
		if err != nil {
			result = append(result, Result{
				Err:   errors.Wrap(ErrCannotRunFoo),
				Value: nil,
			})

			continue
		}

		bar, err := bar(ctx, foo)
		if err != nil {
			result = append(result, Result{
				Err:   errors.Wrap(ErrCannotRunBar),
				Value: nil,
			})

			continue
		}

		result = append(result, Result{Err: nil, Value: bar})
	}

	return result, nil
}

Any help or advice is appreciated.

๐Ÿž Latest versions unpublished?

Describe the bug
It seems that the last published version for this project is v1.0.0.

To Reproduce
Steps to reproduce the behavior:

  1. Run GOPROXY= go list -m -versions github.com/deliveryhero/pipeline
  2. Observe that v1.0.0 is the most recent.

Expected behavior
It would reflect up to the latest version, v2.1.1.

Console output

$ GOPROXY= go list -m -versions github.com/deliveryhero/pipeline
github.com/deliveryhero/pipeline v0.1.0 v0.2.0 v0.2.1 v0.2.2 v0.2.3 v0.3.0 v0.3.1 v0.4.0 v1.0.0

Client info (please complete the following information):

  • Device: MacOS
  • OS: Ventura
  • Version: 13.4.1 (c)
  • go version go1.20.5 darwin/amd64

Additional context
I am very new to Go and may very well be doing something wrong!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.