Giter Club home page Giter Club logo

fsharpx.async's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fsharpx.async's Issues

Consider finding a cooler name?

I suppose the code here is partly from my old FSharp.AsyncExtensions project and partly new stuff that has been added to FSharpX. That said, I don't think that either of these is a good brand for the new stand-alone project.

Although I used the "FSharp.XYZ" naming in the past, I think it is actually better to avoid that (as it incorrectly implies that a project is part of the core) and I feel that "FSharpX" is such a general collection of things that it does not give the project clear identity.

So, I was wondering if we could come up with a cooler name for this? (Especially now that it looks like there is a lot of activity here and this can become pretty useful component.) Unfortunately, I'm completely out of ideas when it comes to finding anything better! Any thoughts?

.NET Standard

Would be nice if this package was .NET Standard compatible

AsyncArrow?

The types:

type AsyncArrow<'a, 'b> = 'a -> Async<'b>

type AsyncFilter<'a, 'b, 'c, 'd> = AsyncArrow<'a, 'b> -> AsyncArrow<'c, 'd>

type AsyncFilter<'a, 'b> = AsyncArrow<'a, 'b> -> AsyncArrow<'a, 'b>

The async arrow type can have many interpretations and is particularly well suited at representing an async request-reply protocol. Data driven services are usually a composition of various request-reply interactions and it can be useful to reify the type.

The supported operations would be all those of arrows as seen here but also many others specific to Async and other types. Consider operations such as:

/// Invokes arrow g and then invokes f if successful (Choice1Of2).
val tryBefore (g:AsyncArrow<'a, Choice<'a, 'c>>) (f:AsyncArrow<'a, 'b>) : AsyncArrow<'a, Choice<'b, 'c>>

/// Creates an arrow which propagates its input into the output.
val strength (f:AsyncArrow<'a, 'b>) : AsyncArrow<'a, 'a * 'b>

/// Creates an arrow which filters inputs to the specified arrow.
val filterAsync (p:AsyncArrow<'a, bool>) (df:Async<'b>) (f:AsyncArrow<'a, 'b>) : AsyncArrow<'a, 'b>

Async filters represent mappings between arrows. A filter can be use to inject various cross-cutting concerns. For example, a timing filter:

  let timing (log:NLog.Logger) : AsyncFilter<_,_,_,_> =
    fun (f:AsyncArrow<'a, 'b>) a -> async { 
      let sw = System.Diagnostics.Stopwatch.StartNew()
      let! res = f a
      sw.Stop()
      log.Trace("Time Elapsed={0}", sw.ElapsedMilliseconds)
      return res 
    }

A filter can also change the input/output type of an arrow. This can be used for layering a JSON codec onto a request/reply protocol, for instance.

type AsyncSink<'a> = AsyncArrow<'a, unit>

A sink can be used to represent functions which are run solely for the side-effects. An example operation:

  let mergeAllPar (ss:seq<AsyncSink<'a>>) : AsyncSink<'a> =
    fun a -> ss |> Seq.map ((|>) a) |> Async.Parallel |> Async.Ignore

Copy samples & tutorials

There is a couple of samples, demos & tutorials that I wrote for the original FSharp.AsyncExtensions library that could be copied here - and with a few lines of documentation, they would make good additions to the docs.

I'd be happy to contribute this, but it looks like I won't have much time in the next two weeks, so if someone wants to do this, feel free to use the samples above! (Perhaps send a quick comment here, to avoid any race conditions :-)).

I also wrote a blog post about AsyncSeq and I'd be more than happy to add it to the documentation here (a link back to the blog would be nice though).

This was written before I had F# Formatting, but I found some source HTML + F# code for this and uploaded it here.

API thoughts/notes

It would be good to trim down the public API to exclude things that perhaps don't quite fit, to allow other libraries to more readily take a dependency on this library.

For example,

  • I can see quite a few libraries wanting AsyncSeq in particular. Perhaps that should even be a separate nuget package.
  • Some items such as Buffer are mutable concurrent collections. Do they have alternatives in the .NET Parallel collections? Could they be internalized?
  • The Observable module has overlap with FSharp.Control.Reactive. Perhaps it should be trimmed out here.
  • "Utils" should be hidden.
  • I'm curious if anyone ever uses AsyncWorker inn practice.
  • The agents and "ISubject" make up much of the rest of the API. I'm interested to know how much these get used, and if they augment or conflict with other in-memory actor frameworks. Do any of these directly belong alongside AsyncSeq, or is there no strong relation.

I'd be particularly interested in getting the Nessos MBrace/Streams guys to take a look at this component, to see if/how it fits with those components, and how they would go about arranging the functionality in mutually supportive packages.

AsyncStream?

An async stream would like much like AsyncSeq but inherently unbounded. This can be used to represent data stream subscriptions. Many of the operations would be the same as AsyncSeq. Moreover, certain operations are easier to implement in terms of AsyncStream and then trimmed to an AsyncSeq.

type AsyncStreamCons<'a> = AsyncStreamCons of 'a * Async<AsyncStreamCons<'a>>

type AsyncStream<'a> = Async<AsyncStreamCons<'a>>

Much like AsyncSeq is a specialization of ListT to Async, this would be the same for StreamT

Please release nuget package

Description

Please update the nuget package with fixed ParallelWithThrottle (#35)

Known workarounds

Use paket with single file github reference

github fsprojects/FSharpx.Async src/FSharpx.Async/Agent.fs
github fsprojects/FSharpx.Async src/FSharpx.Async/Async.fs

AsyncEvent?

This would be an async version of IEvent or IObservable. The issue with IObservable alone is that consumers have to be functions of type 'a -> unit. As a result, certain operations can't be implemented. Suppose for example you wished to run an async computation as part of the observer. Calling Async.RunSynchronously defeats the purpose and Async.Start can break composition because one can no longer rely on the consuming operation to be complete after it returns.

The API would be like:

type AsyncObserver<'a> = 'a option -> Async<unit>

type AsyncEvent<'a> = AsyncObserver<'a> -> Async<unit>

module  AsyncObserver =

  val contramapAsync (f:'b -> Async<'a>) (o:AsyncObserver<'a>) : AsyncObserver<'b>
  val filterAsync (f:'a -> Async<bool>) (o:AsyncObserver<'a>) : AsyncObserver<'a>
  val skipWhile (f:'a -> Async<bool>) (o:AsyncObserver<'a>) : AsyncObserver<'a>
  val contrachooseAsync (f:'b -> Async<'a option>) (o:AsyncObserver<'a>) : AsyncObserver<'b>

module AsyncEvent =

  val mapAsync (f:'a -> Async<'b>) (o:AsyncEvent<'a>) : AsyncEvent<'b>
  val bind (f:'a -> AsyncEvent<'b>) (o:AsyncEvent<'a>) : AsyncEvent<'b>
  val filterAsync (f:'a -> Async<bool>) (o:AsyncEvent<'a>) : AsyncEvent<'a>
  val merge (o1:AsyncEvent<'a>) (o2:AsyncEvent<'a>) : AsyncEvent<'a>
  val ofAsyncSeq (s:AsyncSeq<'a>) : AsyncEvent<'a>
  val unfoldAsync (f:'State -> Async<('a * 'State) option>) (s:'State) : AsyncEvent<'a>

Synchronous Channel?

A channel akin to CML / Hopac with the following operations:

type Ch<'a>

val create : unit -> Ch<'a>

val createFull : 'a -> Ch<'a>

/// Creates an async computation which produces a result when a value is available.
val take : Ch<'a> -> Async<'a>

/// Creates an async computation which completes when a matching take operation is available.
val fill : 'a -> Ch<'a> -> Async<unit>

What should it be named? AsyncCh could be misleading because the semantics are synchronous, but Async is used for the notifications.

AsyncSeq.groupBy?

There are use cases for a groupBy operation on AsyncSeq, however the behavior is different from Seq.groupBy and perhaps a different name is in order. The use case is as follows - there is an async sequence that represents a stream of data. Each data item is associated with an entity using an identifier. Data items corresponding to the same entity must be processed in order, however data items belonging to different entities can be processed in parallel. Using AsyncSeq.iterAsync would process the data items in order, however it does not exploit the parallelism across entities. The groupBy operation would result in a sequence AsyncSeq<'k * AsyncSeq<'a>> and it would be iterated by iterating the inner sequences using iterAsync. The difference from the usual groupBy is that keys can come and go multiple times in the resulting sequence. This would be to allow processing unbounded sequences. The lifecycle of the sub-sequences can be implemented in a few ways. One would be time based, such that a sub-sequence is alive for a certain time period after which it expires and then when another item with the same key appears, a new pair is returned by the outer sequence. Another would be for the sub-sequences to be expired after each item is processed, unless another item is in progress.

`InvalidOperationException` with hot observable calling `AwaitObservable` from within `Async`

I just got an InvalidOperationException with the message Async.FromContinuations was invoked multiple time" when using the AwaitObservable that takes only a single IObservable in FSharpx.Async 1.12.0

I found this commit which might be related, but I don't know if it is already in the current nuget package of FSharpx.Async 1.12.0.

I am calling AwaitObservable from within an Async returned by AwaitObservable that is still running. Might this be causing the problem? Are your tests covering that case?

AsyncPipe?

Wanted to see if anybody had input on another async-based control abstraction. Source code here. An async pipe is similar to an AsyncSeq but more general. In addition to emitting values, it can also await input. Finally, it can complete with some value, such as an exception. The base types are:

/// An async pipeline which consumes values of type 'i, produces
/// values of type 'o and completes with a result 'a or an error.
type AsyncPipe<'i, 'o, 'a> = Async<AsyncPipeStep<'i, 'o, 'a>>

/// An individual step in an async pipeline.
and AsyncPipeStep<'i, 'o, 'a> =

  /// The pipeline completed with result 'a.
  | Done of 'a

  /// The pipeline is emitting a value of type 'o.
  | Emit of 'o * AsyncPipe<'i, 'o, 'a>

  /// The pipeline is consuming a value of type 'i.
  | Await of ('i option -> AsyncPipe<'i, 'o, 'a>)

An interesting operation on pipes is composition:

val compose (p1:AsyncPipe<'i, 'o, 'a>) (p2:AsyncPipe<'o, 'o2, 'a>) : AsyncPipe<'i, 'o2, 'a>

This operation fuses two pipes together.This can be useful for declaring certain types of async workflows. For example, if you're iterating an AsyncSeq using iterAsync, there is no way to signal a stop to the iteration, but it can be done with pipes using a consuming pipe which stops at some point, causing the entire workflow to stop.

An AsyncSeq is a special case of AsyncPipe in the following way:

type AsyncSeq<'a> = AsyncPipe<Void, 'a, 'unit>

Async.Parallel swallows exceptions

Description

When running multiple asyncs using FSharpx.Async.Parallel, an exception thrown in one doesn't cancel the running of others. This isn't the behavior when running using F#'s built-in Async.

Repro steps

Run this simplified snippet and observe that async a still keeps printing.

let rec a: Async<unit> = async {
    do! Async.Sleep 1000
    printfn "Running"
    return! a
}

let b = async {
    failwith "Failure"
}

Async.Parallel(a, b) |> Async.RunSynchronously

Expected behavior

Just like in the built-in F# Async.Parallel, an exception thrown in one Async should cancel the entire Async, without waiting for that Async to complete (I believe it'll bubble down the CancellationToken to the other running Asyncs).

Actual behavior

Async a in the example will keep running and printing. If Async a is made to complete, then once it completes the exception will be propagated up to the caller as expected.

Known workarounds

Use F# built-in Async.Parallel.

Related information

  • Operating system - Windows
  • Branch - Master

CircularQueueAgent blocks all readers when request to dequeue is larger than maxLength

The following code reproduces the issues:

#r "./bin/Debug/FSharpx.Async.dll"

//  Let's use a curcular queue agent to distribute work
let queue = FSharpx.Control.CircularQueueAgent<string>(5)

//  Let's create a simple reader process
let reader(name, taking,delay) = 
  async {
    printfn "Starting Reader"
    try
      while true do
        let! values = queue.AsyncDequeue(taking)
        for value in values do
          printfn "%s: Reading %s" name value
        do! Async.Sleep(delay)
    finally
      printfn "Stopped Reader"
  } |> Async.Start

//  And a function to continually add items
let addItems(x) = 
  async {
    for x in [1..x] do
      do! queue.AsyncEnqueue([|sprintf "Hi %i" x |])
  } |> Async.Start

//  This works fine
addItems(500)
System.Threading.Thread.Sleep(2000)
reader("Hao", 1, 100)
System.Threading.Thread.Sleep(2000)
reader("Bob", 5, 1000)
System.Threading.Thread.Sleep(2000)
//  This is where everything stops
reader("Xen", 10, 1000)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.