Giter Club home page Giter Club logo

asyncextensions's Introduction

AsyncExtensions

Build Status AsyncExtensions supports Swift Package Manager (SPM)

AsyncExtensions provides a collection of operators that intends to ease the creation and combination of AsyncSequences.

AsyncExtensions can be seen as a companion to Apple swift-async-algorithms. For now there is an overlap between both libraries, but when swift-async-algorithms becomes stable the overlapping operators while be deprecated in AsyncExtensions. Nevertheless AsyncExtensions will continue to provide the operators that the community needs and are not provided by Apple.

Adding AsyncExtensions as a Dependency

To use the AsyncExtensions library in a SwiftPM project, add the following line to the dependencies in your Package.swift file:

.package(url: "https://github.com/sideeffect-io/AsyncExtensions"),

Include "AsyncExtensions" as a dependency for your executable target:

.target(name: "<target>", dependencies: ["AsyncExtensions"]),

Finally, add import AsyncExtensions to your source code.

Features

Channels

Subjects

Combiners

  • zip(_:_:): Zips two AsyncSequence into an AsyncSequence of tuple of elements
  • zip(_:_:_:): Zips three AsyncSequence into an AsyncSequence of tuple of elements
  • zip(_:): Zips any async sequences into an array of elements
  • merge(_:_:): Merges two AsyncSequence into an AsyncSequence of elements
  • merge(_:_:_:): Merges three AsyncSequence into an AsyncSequence of elements
  • merge(_:): Merges any AsyncSequence into an AsyncSequence of elements
  • withLatest(_:): Combines elements from self with the last known element from an other AsyncSequence
  • withLatest(_:_:): Combines elements from self with the last known elements from two other async sequences

Creators

Operators

  • handleEvents(): Executes closures during the lifecycle of the self
  • mapToResult(): Maps elements and failure from self to a Result type
  • prepend(_:): Prepends an element to self
  • scan(_:_:): Transforms elements from self by providing the current element to a closure along with the last value returned by the closure
  • assign(_:): Assigns elements from self to a property
  • collect(_:): Iterate over elements from self and execute a closure
  • eraseToAnyAsyncSequence(): Erases to AnyAsyncSequence
  • flatMapLatest(_:): Transforms elements from self into a AsyncSequence and republishes elements sent by the most recently received AsyncSequence when self is an AsyncSequence of AsyncSequence
  • multicast(_:): Shares values from self to several consumers thanks to a provided Subject
  • share(): Shares values from self to several consumers
  • switchToLatest(): Republishes elements sent by the most recently received AsyncSequence when self is an AsyncSequence of AsyncSequence

More operators and extensions are to come. Pull requests are of course welcome.

asyncextensions's People

Contributors

jordanekay avatar prince2k3 avatar swhitty avatar twittemb avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

asyncextensions's Issues

[BUG] attempt to await next() on more than one task

Describe the bug
When using .multicast I got unexpected error: attempt to await next() on more than one task. It is my - maybe incorrect? - understanding that I could use several tasks consuming an Async Multicasted sequence?

To Reproduce
I have these variables:

actor Connection {
    private let asyncStream: AsyncThrowingStream<MySendableStruct, Swift.Error>
    private let asyncContinuation: AsyncThrowingStream<MySendableStruct, Swift.Error>.Continuation
    private let multicastSubject = AsyncThrowingPassthroughSubject<MySendableStruct, Swift.Error>()

    ...
    // using this static func: https://github.com/pointfreeco/swift-composable-architecture/blob/53ddc5904c065190d05c035ca0e4589cb6d45d61/Sources/ComposableArchitecture/Effects/ConcurrencySupport.swift#L222-L228
    (asyncStream, asyncContinuation) = AsyncThrowingStream.streamWithContinuation()
    ...

func autoconnectingMulticastedAsync() -> AsyncMulticastSequence<AsyncThrowingStream<MySendableStruct, Swift.Error>, AsyncThrowingPassthroughSubject<MySendableStruct, Swift.Error>>  {
        asyncStream
            .multicast(multicastSubject)
            .autoconnect()
    }
}

and callsite

...
func connect() async throws {
    connectTask?.cancel()
    connectTask = Task {
    for try await value in autoconnectingMulticastedAsync() {
        guard !Task.isCancelled else { return }
        // do stuff with `value` that in some case might trigger a new nested `Task`
       ...
    }
}
..

Provide code snippets, or links to gist or repos if possible.

Expected behavior
I would expect to be able to await values in multiple tasks from a multicasted autoconnected Async Seqyuence.

Screenshots
If applicable, add screenshots to help explain your problem.

Environment:
Mac Studio (M1), running macOS Ventura: 13.0.1 (22A400)
Xcode Version 14.1 (14B47b)
AsyncExtension version 0.5.1

Additional context
Add any other context about the problem here.

Swift Concurrency warning flags

Q: Is there a reason you're not yet using the warnings to help ensure accuracy?

.target(
  ...,
  swiftSettings: [
    .unsafeFlags([
      "-Xfrontend", "-warn-concurrency",
      "-Xfrontend", "-enable-actor-data-race-checks",
    ])
  ]
)

Perhaps it shouldn't be shipped that way, perhaps adding a new target just for development?

(Fantastic library, btw)

[BUG] AsyncCurrentValueStream `element` is not public

Unable to use property element. It isn't public

To Reproduce
Steps to reproduce the behavior:

  1. use AsyncStreams.CurrentValue
  2. attempt to print(currentValue.element)
  3. See build error
let currentValue = AsyncStreams.CurrentValue<Int>(1)

Task {
    for try await element in currentValue {
        print(element) 
    }
}

print(currentValue.element)  // error will occur

Expected behavior
Expect that element is accessible

Environment:

  • 0.1.0

[BUG] Memory Leak issue with flatMapLatest having inner asyncSequence

Describe the bug

Here is the sample code that we are using to reproduce the memory leak. We are expecting BatteryFeature to be de-allocated once we remove it from the features array

protocol Feature {}

class BatteryFeature: Feature {
    private let sub = AsyncCurrentValueSubject<Double>(50)
    var batteryLabel: AnyAsyncSequence<Double>
    init() {
        self.batteryLabel = sub.eraseToAnyAsyncSequence()
    }

    deinit {
        print("BatteryFeature Deinit")
    }
}

class Device {
    var features = [AnyAsyncSequence<Feature>]()
        init() {
        let feature: AnyAsyncSequence<Feature> = AsyncJustSequence(BatteryFeature()).eraseToAnyAsyncSequence()
        features.append(feature)
    }

}

class Provider {
    private let device = Device()
    func getDevice() -> AnyAsyncSequence<Device> {
        AsyncJustSequence(device).eraseToAnyAsyncSequence()
    }
    
    func remove() {
        device.features.removeFirst()
    }
}

class BatteryVM: ObservableObject {
    private let provider = Provider()
    
    @Published var batteryLbl: Double = 0
    
    init() {
        let result: AnyAsyncSequence<Double> = provider.getDevice().flatMapLatest { device -> AnyAsyncSequence<Double>  in
            let featureSeq: AnyAsyncSequence<Feature> = device.features.first!
            let v: AnyAsyncSequence<Double> = featureSeq.flatMap { feature -> AnyAsyncSequence<Double> in
                let bat = feature as! BatteryFeature
                return bat.batteryLabel
            }.eraseToAnyAsyncSequence()
            return v
        }.eraseToAnyAsyncSequence()
        
        
        Task { [weak self, result] in
            for try await value in result {
                print("Bat: \(value)")
                await MainActor.run { [weak self] in
                    self?.batteryLbl = value
                }
            }
        }
    }
    
    func remove() {
        provider.remove()
    }
    deinit {
        print("BatteryVM Deinit")
    }
}

Environment:

  • Version of the library: 0.5.2

Youre missing async sequence builder like kotlin's

I'm a kotlin developer familiar with kotlin coroutines and Flow, which is direct mapping to async/await + AsyncSequence.

The most powerful thing that kotlin flow has is the builder, which allows you to emit values arbitrarily and use all the async stuff since the closure is async, like so (I'll write it in async/await syntax so you understand better)

flow { emitter in
   await emitter.emit(1)
   await Task.sleep(1_000)
   await emitter.emit(10)
   await Task.sleep(1_000)
   for 0...10 {
      await emitter.emit(100)
   }
}

when the closure returns, the flow terminates, if you throw inside the closure, then error is propagated as usual

It allow you to then create arbitrary custom operators, or simply way to pipe async function into AsyncSequence like

asyncSequence {
   await someFunction()
}
.flatMapLatest { ... }
.collect { ... }

this is very needed

[BUG] Latest Main branch commit breaks AsyncStreams (CurrentValue, Passthrough etc)

Describe the bug
I noticed a code behavior change since adding the actor code.
For example, I use Passthrough as a way to pass events to an onReceive method. That stopped as it supposed the moment I updated to the updated main. The best I can describe the behavior is that fires once and then stops working after that.

To Reproduce
Steps to reproduce the behavior:

var events = AsyncStreams.Passthrough<Event>()

// I used 
events.nonBlockingSend(.myEvent)

then

// I have a modifier that wraps this in SwiftUI (onReceive)
task {
 do {
         for try await event in viewModel.events {
               switch event {
                   ...
               }
         }
     } catch {}
}

Expected behavior
AsyncStreams works like the commit e4aeb0f and before.

Environment:

  • Main

** Additional Info **

I'm not sure how to really explain what is going on here. This is the best I can do.

flatMapLatest + Just issues

I don't know if I'm writing an issue or a feature request or just question. I have the following scenario, very simplified:

let root = CurrentValue(false)
let results = root.flatMapLatest {
    $0 ? Just(...).eraseToAnyAsyncSequence() : AsyncStream { ... }.eraseToAnyAsyncSequence()
}

// later...
for await result in results { ... }
fatalError()

I put the fatalError after the loop because as I see it, root never completes and so we'd never reach that point. However, whenever the condition becomes true and we use Just, collection of results also exits. Now I have a few questions:

[Question] How to resolve an issue I ran into from the current changes.

First off let me start of saying that I like the changes you've made recently it helps clean things up and make the code more maintainable.

I have a property wrapper that wraps UserDefaults. And before I had Combine as a way to listen to changes, via CurrentValueSubject, on UserDefaults. Since I'm in the process of removing Combine and using strictly AsyncSequence I started using CurrentValue version.

@propertyWrapper
public struct Defaults<T: Codable> {
    private let key: String
    
    public let currentValue: AsyncStreams.CurrentValue<T>

    public var projectedValue: Defaults<T> { self }

    public var wrappedValue: T {
        get { currentValue.element }  <-- Here is my problem
        set {
            currentValue.send(newValue) <-- Here is my problem
            userDefaults.set(object: newValue, forKey: key)
        }
    }

    public init(wrappedValue defaultValue: T, _ key: String) {
        self.key = key
        self.currentValue = AsyncStreams.CurrentValue<T>(defaultValue)
        self.wrappedValue = userDefaults.object(T.self, with: key) ?? defaultValue
    }
}

element property now requires I use async/await here. My question is ...
Do you know how I can resolve this and still meet the property wrappers need for wrappedValue?

Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.