Giter Club home page Giter Club logo

Comments (17)

nordfjord avatar nordfjord commented on June 30, 2024 1

Thanks for the repro! I looked into it a bit and I think I can tell you what's going on.

The once function is being called while in a pseudo transaction, which means that the value put into the self stream will be updated but its dependents are queued for later. After that the end is triggered which immediately removes all listeners, so by the time the value is propagated there are no listeners.

I believe this behaviour was originally introduced so we could preserve the atomicity of updates, see #179 and #180, but looks like you've found an edge case where the behaviour isn't desired.

To clarify, I think the case you're hitting has to do with the re-use of the same stream here. See this flems

I'm curious, what is your actual use case for this data flow? (Not saying this isn't a bug, but I've never seen this data flow before so I'm curious)

In any case, I think I can get a PR out to fix this soon but am dependent on @paldepind to release to NPM

from flyd.

paldepind avatar paldepind commented on June 30, 2024 1

@NoelAbrahams How did you conclude that lift does not satisfy 3 and 4? Perhaps I am misremembering or misunderstanding, but I think that lift works as you describe in 3 and 4?

Before a stream is updated there is a check that all of its dependencies are "met" meaning that they have a value. This should satisfy 3.

In the implementation of combine (which lift uses) the created end stream listens to the end stream on all its dependencies. This should satisfy 4.

Again, I might misunderstand something so please excuse me if that is the case.

from flyd.

NoelAbrahams avatar NoelAbrahams commented on June 30, 2024

I should add that I also looked at withLatestFrom, which doesn't seem to match on requirements 1 & 4.

WithLatestFrom seems to work best when you have a source stream with which you want to combine another stream. CombineLatest is subtly different in that all source streams are considered equal.

from flyd.

NoelAbrahams avatar NoelAbrahams commented on June 30, 2024

I have some further information.

The problem happens under the following conditions:

  • There is a stream nested within another.
  • The nested stream needs to use an aggregator, such as the combineLastest above but also reproduces with withLatestFrom.
  • The nested stream uses flyd.once.

Under these circumstances, the nested stream's map callback never gets called.

Eg

myouterStream.map(() => { 

  flyd.withLatestFrom(
    getAccessStream(),
    getRolesStream(),
   )
    .pipe(flyd.once())        // Commenting this out makes this work as expected
    .map(([roles, access]) => {
         console.log(roles, access); // never gets called
    });

})

Moving the nested stream out of the parent also gets things working as normal.

Implementation of once

 flyd.once = flyd.curryN(1, function (stream$) {
        return flyd.combine(
            function (s$, self) {
                self(s$());
                self.end(true);
            },
            [stream$],
        );
    });

Does the implementation of once need some changing? I picked it up from here: https://github.com/bortexz/flyd-once and that repo looks like it's no longer being maintained.

from flyd.

nordfjord avatar nordfjord commented on June 30, 2024

Happy to elaborate on this, but really would appreciate if anyone can cast an eye over my implementation and spot anything that is obviously wrong.

Would be great if you could elaborate on the problem, maybe with a code snippet on flems?

from flyd.

nordfjord avatar nordfjord commented on June 30, 2024

Even without the code snippet it's worth calling out that flyd's "end" logic is far from perfect and relies on every operator doing the "right" thing as discussed here #216 (comment)

from flyd.

StreetStrider avatar StreetStrider commented on June 30, 2024

flyd-zip is very similar, I think the code can be adapted to your needs.

It also sounds very similar to what I'd done with join + when_data_all in my flyd-inspired lib.

I think there are two basic strategies, join and merge. This is one of them. The term combine is ambiguous in my opinion.

from flyd.

NoelAbrahams avatar NoelAbrahams commented on June 30, 2024

Hi, @nordfjord

Thanks for your quick response. I have a minimal(ish) repro on flems which I have also posted below.

I think the problem is to do with the number of operators rather than with the implementation of one particular operator. I have added comments in a couple of places on how removing operators fixes the problem. I also noticed that adding a setTimeout around the codeblock starting at Line A causes the success message to print out.

It's possible that I'm doing something stupid here with my implementation of the State class, but hopefully I don't have to change things too much because the rest of the framework that depends on this bit of flyd is working brilliantly for us.

Any help here would be much appreciated! (Thanks, StreetStrider for pointing me to zip)

/*******************************
        Required flyd modules
*****************************/

// Once
flyd.once = flyd.curryN(1, function (stream$) {
    return flyd.combine(
        function (s$, self) {
            self(s$());
            self.end(true);
        },
        [stream$],
    );
});

// With latest from
flyd.withLatestFrom = flyd.curryN(2, function (streams, source$) {
    if (streams.constructor !== Array) streams = [streams];
    return flyd.combine(
        function (source, self) {
            var res = [source()];
            for (var i = 0; i < streams.length; ++i) {
                if (!streams[i].hasVal) return;
                res.push(streams[i]());
            }
            self(res);
        },
        [source$],
    );
});

// Filter
flyd.filter = flyd.curryN(2, function (fn, s) {
    return flyd.combine(
        function (s, self) {
            if (fn(s())) self(s.val);
        },
        [s],
    );
});

/*******************************
        App Code
*****************************/

class State {
  stateStream = flyd.stream({ foo: 'initial-state'});
  count = 0;
  constructor() {}
  getNext() {
    const stream = this.stateStream.map(state => {
     
      return {
         ...state,
         count: ++this.count
      };
     
    })
    .pipe(flyd.filter(value => !!value)); // Removing this also makes it work
    
    return stream;
  }
}

const state = new State();

state.getNext()
.map((value) => {
  console.log('Top level', value);

 // Line A
  flyd.withLatestFrom(state.getNext(), state.getNext())
  .pipe(flyd.once()) // Commenting this out makes it work
  .map(value => {
    console.log('Success', value); // The goal is to get to here
  });


});

from flyd.

NoelAbrahams avatar NoelAbrahams commented on June 30, 2024

Hi, @nordfjord

It's good news that the problem has been identified (and quickly too) as I was fearing something fundamentally was amiss.

Regarding my use case, this is a simple Redux-stye state manager. (I would say this is more like NgRx but without much of the drama.) Basically, a copy of the state object is pushed onto the stream whenever there is an update to the application state (eg user clicks a button). Listeners, which are typically other UI components then react to the state update.

I saw your flems and TBH at one point I was also thinking that I need to be creating a new stream whenever the state got an update. But I assumed that map was already doing that? I probably need to take a closer look at your flems as my brain is a bit scrambled atm!

Many thanks for the quick response.

from flyd.

NoelAbrahams avatar NoelAbrahams commented on June 30, 2024

Hi, @nordfjord, I've applied your fix locally and things are working as expected. Many thanks for this!

from flyd.

NoelAbrahams avatar NoelAbrahams commented on June 30, 2024

Hi, guys

I wonder if someone could help me with this problem? I'm trying to create a static method on flyd that has exactly the same behaviour as RxJs's CombineLatest. Needs to have the following:

  1. Should allow two or more streams to be passed in as parameters eg flyd.combineLatest(stream1, stream2).
  2. Resulting stream emits an array of values, eg flyd.combineLatest(stream1, stream2).map(([v1, v2]) => {...});.
  3. The resulting stream should only emit a value when all source streams have emitted at least one value.
  4. The resulting stream should end when any of the source streams end.

I've looked at the existing lift module, but this does not seem to handle 3 & 4.

Here's my attempt so far:

flyd.combineLatest = curryN_1(2, function (...streams) {
    const result$ = flyd.immediate(
        combine(function (...finalStreams) {
            const res = [];
            const changed = finalStreams[finalStreams.length - 1];
            const self = finalStreams[finalStreams.length - 2];

            for (let i = 0; i < streams.length; ++i) {
                if (!streams[i].hasVal) {
                    return;
                } else {
                    res.push(streams[i]());
                }
            }

            self(res);
        }, streams),
    );
    flyd.endsOn(
        combine(
            function () {
                return true;
            },
            streams.map(item => item.end),
        ),
        result$,
    );
    return result$;
});

This seems to work, except for a very odd problem when combineLatest is used inside another stream.

Happy to elaborate on this, but really would appreciate if anyone can cast an eye over my implementation and spot anything that is obviously wrong.

I should add that my attempt is a bit of guesswork at the moment copied blatantly from the merge implemenation — in case you couldn't tell that already!

Thanks Noel

Going back to the original problem, my implementation of combineLastest seems to be working fine. Any chance of adding this to the list of supported modules? (I'm assuming that will mean no breaking changes.) As I outlined in the original post, there is no similar operator available at present.

from flyd.

NoelAbrahams avatar NoelAbrahams commented on June 30, 2024

Hi, @paldepind, I've gone back and had a look at the example for lift and it doesn't look like anything that I remember seeing! Haven't checked it out, but I agree that on paper lift seems to be doing the same thing as combineLatest.

from flyd.

NoelAbrahams avatar NoelAbrahams commented on June 30, 2024

Hi, @paldepind the problem with lift (vs combineLatest) is really one of style. The fact that the source streams are added after the callback makes it less readable. In other words, when the function body is large, one needs to go looking down the code to see the source streams.

Once could define the function separately of course (as in the example on the lift page) but that creates an unnecessary variable.

This definitely falls within the realm of nitpicking — I admit that. But the point of going functional is to make the code more readable too. So, these things do tend to matter.

from flyd.

nordfjord avatar nordfjord commented on June 30, 2024

But the point of going functional

Depending on your definition of "functional" one might consider lift to be just as valid a functional construct. Flyd streams are Monads. lift takes a function operating on values and lifts it into the context of the Monad. In Haskell the type signature for lift2 is Applicative m => lift2 :: (a -> b -> r) -> m a -> m b -> m r.

Note, you can always derive your comineLatest from lift

const combineLatest = (...streams) => lift((...args) => args, ...streams)

from flyd.

NoelAbrahams avatar NoelAbrahams commented on June 30, 2024

const combineLatest = (...streams) => lift((...args) => args, ...streams)

Doesn't seem to have the same behaviour as my version of combineLatest. I picked up lift from here.

Need to look into this further to see what I'm doing wrong. But thanks — that would certainly be a nice solution!

from flyd.

nordfjord avatar nordfjord commented on June 30, 2024

What is the behaviour you're expecting? here's an example flems

The main difference is that it ends when ALL of the deps end instead of when ANY of the deps end. I would consider that desirable personally.

from flyd.

NoelAbrahams avatar NoelAbrahams commented on June 30, 2024

@nordfjord I think I'm fine with that too (just rolling out flyd on something and will get back if I have further thoughts).

Tracing my steps back to why I mistakenly opened this issue, I remember looking into the implementation of lift and not seeing a check for hasVal, which made me conclude that lift was emitting its value before all dependent streams had emitted. But I see that check is being performed elsewhere.

So, in summary, lift + your reformulation seems to have solved my problem.

Thanks

from flyd.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.