Giter Club home page Giter Club logo

lwt-pipe's Introduction

Lwt Pipe build

An alternative to Lwt_stream with interfaces for producers and consumers and a bounded internal buffer.

Online Documentation

Build

opam install lwt-pipe

or:

opam pin https://github.com/c-cube/lwt-pipe.git

License

permissive free software (BSD-2)

Use

A pipe can be used as a regular iterator:

# #require "lwt";;
# #require "lwt-pipe";;

# open Lwt.Infix;;

# let l = [1;2;3;4];;
val l : int list = [1; 2; 3; 4]

# Lwt_pipe.of_list l
  |> Lwt_pipe.Reader.map ~f:(fun x->x+1)
  |> Lwt_pipe.to_list;;
- : int list = [2; 3; 4; 5]

But also as a streaming queue (here with two producers push_ints that will put 1, 2, โ€ฆ 5 into the pipe, and one reader that consumes the whole pipe):

# let rec push_ints p i : unit Lwt.t =
  if i <= 0 then Lwt.return ()
  else Lwt_pipe.write_exn p i >>= fun () -> push_ints p (i-1) ;;
val push_ints : (int, [< `r | `w > `w ]) Lwt_pipe.t -> int -> unit Lwt.t =
  <fun>

# let reader =
    let p = Lwt_pipe.create ~max_size:3 () in
    let t1 = push_ints p 5
    and t2 = push_ints p 5
    and t_read = Lwt_pipe.to_list p in
    Lwt.join [t1;t2] >>= fun () ->
    Lwt_pipe.close p >>= fun () ->
    t_read
  in
  List.sort compare @@ Lwt_main.run reader
  ;;
- : int list = [1; 1; 2; 2; 3; 3; 4; 4; 5; 5]

This can be expressed with higher level constructs:

# let rec list_range i = if i<=0 then [] else i :: list_range (i-1);;
val list_range : int -> int list = <fun>
# let int_range n = Lwt_pipe.of_list @@ list_range n ;;
val int_range : int -> int Lwt_pipe.Reader.t = <fun>

# Lwt_main.run @@ Lwt_pipe.to_list (int_range 5);;
- : int list = [5; 4; 3; 2; 1]

# let reader =
    let p1 = int_range 6
    and p2 = int_range 6
    and p3 = int_range 6 in
    Lwt_pipe.to_list (Lwt_pipe.Reader.merge_all [p1;p2;p3])
  in
  List.sort compare @@ Lwt_main.run reader
  ;;
- : int list = [1; 1; 1; 2; 2; 2; 3; 3; 3; 4; 4; 4; 5; 5; 5; 6; 6; 6]

lwt-pipe's People

Contributors

c-cube avatar fardalem avatar hcarty avatar kydos avatar ozanmakes avatar pdonadeo avatar rgrinberg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

lwt-pipe's Issues

[FEATURE REQUEST] Publishing lwt-pipe in OPAM

I'm opening this "issue" only to ask... why aren't you publishing lwt-pipe? In my opinion it's a very useful library, with also a mention in the official documentation of Lwt.

It definitely deserve to be a first class citizen in the OCaml ecosystem.

Passing rw pipe into reader combinator methods

I'm trying to use this library for creating communication channel between Lwt "threads", as it's recommended in Lwt documentation, but can't figure it out (disclaimer: ocaml newbie here).

I'm trying to create an [r|w] pipe, use it in producer and consumer. According to the docs it should be possible, but Lwt_pipe.Reader.iter expects [r] permissions and when I try to pass [r|w] pipe into Lwt_pipe.Reader.iter I get the following error:

# Error: This expression has type (item, [ `r | `w ]) Lwt_pipe.t
#        but an expression was expected of type
#          item Lwt_pipe.Reader.t = (item, [ `r ]) Lwt_pipe.t
#        The second variant type does not allow tag(s) `w

I tried fiddling with coercion but no luck. Usage example in readme creates Lwt_pipe.Reader.t using of_list which is not exactly what I'm trying to do. Any help is greatly appreciated!

[DISCUSSION] Read from a pipe with timeout

This is the result of my experiments.

TL;DR; I WIN! I IMPLEMENTED A SUPER USEFUL FUNCTION! ๐Ÿ˜„

Longer and more serious version.

First I stole got inspired by the values_avalaible function present in Async. What I need is a way to read from a pipe, blocking if nothing is available like a normal read, but not forever. If you think about it it's more or less a "high level" form of Unix select.

The code was convoluted and @c-cube asked me a revision, and he was right.

So I re-implemented it in another branch using a mutable list of "waiters". This somewhat works but there are at least two problems here:

  1. waiters and readers are in different list so... when a thread writes, who is supposed to be signaled? The readers, the waiters or... both? I decided to wake up all the waiters, clear the list and continue with the normal write, waking up a reader
  2. this works with one reader/waiter and many writers but when multiple threads are reading/waiting the expected result is... undefined?

Not good at all.

So I decided to give up with this funny function and implement a read_with_timeout in the read-with-timeout branch. The patch is this one: pdonadeo/lwt-pipe@071e12e

The idea here is to store in the readers Queue a bool ref with Lwt.u. This bool becomes true when the timeout is reached. In the normal read it's stored as false and never changed. In the "read with timeout" it's stored false at the beginning but another thread starts concurrently and sets it to true.

The write: when a reader is available from the Queue it's popped as usual. If the timeout expired that reader is thrown away and the write_step is called again recursively.

This function has a clear semantics, I believe, with no ambiguity and only one queue of readers.

What do you think?

I want to discuss here before pushing a request.

Intended behavior when creating pipe's from Lwt_io's channels

This is more of a question about the intended behavior when creating pipes from Lwt's channels.
One usecase that I had in mind was to use connect to shuttle content between an input and output channel. A simplified version can be seen in the following snippet for an echo server.

module Pipe = Lwt_pipe
Lwt_io.establish_server_with_client_address listen_address (fun _ (ic, oc) ->
    let reader = Pipe.IO.read ic in
    let writer = Pipe.IO.write oc in
    Lwt_pipe.connect ~ownership:`OutOwnsIn reader writer;
    Pipe.wait writer)

When using connect it is clear that using ownsership' we can control which pipes owns the other, and that when the owner is closed, the ownee will also be closed. But when creating pipes via a channel its not clear to me when to close the owner pipe. connect returns unit so i can't wait on a promise that resolves when all the content has been transferred from owner to ownee. Without that if i can't close the reader pipe, then the writer pipe will also remain open and in my example above the promise in the server handle will never be resolved.

I'm not very familiar with Lwt's channels yet, but should the reader pipe created from an input_channel automatically close when all content from the channel has been consumed? (whenever the channel encounters EOF)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.