This proposal introduce two new components into Elixir,
GenStage and Broker.
Stages are computation stages that send and/or receive data
from other stages. When a stage sends data, it acts as
a producer. When it receives data, it acts as a consumer.
Stages may take both producer and consumer roles at once.
From now on, when we mention "producer" and "consumer", we
imply a stage taking its producer or consumer roles.
When data is sent between stages, it is done by a message
protocol that provides back-pressure. It starts by the
consumer stage subscribing to the producer stage and
asking for events. A consumer stage will never receive
more data than it has asked for from its producer stage.
By default, a stage may only connect to a single producer
and/or a single consumer. A broker lifts this limitation
by allowing M producers to connect to N subscribers according
to a given strategy.
This document describes the messages received by both
producers and consumer roles. It also specifies both stage
and broker behaviours.
Message protocol
This section specifies the message protocol for both producers
and consumers. Most developers won't implement those messages
but rely on GenStage and Broker behaviours defined in later
sections.
Producer
The producer is responsible for sending events to consumers
based on demand.
A producer MUST manage at least one subscription by receiving a
subscription message from a consumer stage or from a consumer
broker. Once a subscription is established, new connections
MAY be established and demand MAY be received.
Except by the initial subscription message, the producer does
not make distinction about its consumers. All messages it must
receive are defined below:
-
{:"$gen_producer", from :: {consumer_pid, subscription_ref}, {:stage, options}}
-
sent by the consumer to the producer to start a new subscription.
Once sent, the consumer MAY immediately send demand to the producer.
The subscription_ref
is unique to identify the subscription. The
consumer MUST monitor the producer for clean-up purposes in case of
crashes. The consumer MUST NOT establish new connections over this
subscription.
Once received, the producer MUST monitor the consumer. If the producer
already has a subscription, it MAY ignore future subscriptions by
sending a disconnect reply (defined in the Consumer section) except
for cases where the new subscription matches the subscription_ref
.
In such cases, the producer MUST crash.
-
{:"$gen_producer", from :: {consumer_pid, subscription_ref}, {:broker, strategy, options}}
-
sent by the consumer to the producer to start a new subscription.
The consumer MAY establish new connections by sending :connect
messages defined below. The subscription_ref
is unique to identify
the subscription. The consumer MUST monitor the producer for clean-up
purposes in case of crashes.
Once received, the producer MUST monitor the consumer. The producer
MUST initialize the strategy
by calling strategy.init(from, options)
.
If the producer already has a subscription, it MAY ignore future
subscriptions by sending a disconnect reply (defined in the Consumer
section) except for cases where the new subscription matches the
subscription_ref
. In such cases, the producer MUST crash.
-
{:"$gen_producer", from :: {pid, subscription_ref}, {:connect, consumers :: [pid]}}
-
sent by the consumer to producers to start new connections.
Once sent, the consumer MAY immediately send demand to the producer.
The subscription_ref
is unique to identify the subscription.
Once received, the producer MUST call strategy.connect(consumers, from, state)
if one is available. If the subscription_ref
is unknown, the
producer MUST send an appropriate disconnect reply to each consumer.
-
{:"$gen_producer", from :: {consumer_pid, subscription_ref}, {:disconnect, reason}}
-
sent by the consumer to disconnect a given consumer-subscription pair.
Once received, the producer MAY call strategy.disconnect(reason, from, state)
if one is available. The strategy MUST send a disconnect message to the
consumer pid. If the consumer_pid
refers to the process that started
the subscription, all connections MUST be disconnected. If the
consumer-subscription is unknown, a disconnect MUST still be sent with
proper reason. In all cases, however, there is no guarantee the message
will be delivered (for example, the producer may crash just before sending
the confirmation).
-
{:"$gen_producer", from :: {consumer_pid, subscription_ref}, {:ask, count}}
-
sent by consumers to ask data from a producer for a given consumer-subscription pair.
Once received, the producer MUST call strategy.ask(count, from, state)
if one is available. The producer MUST send data up to the demand. If the
pair is unknown, the produder MUST send an appropriate disconnect reply.
Consumer
The consumer is responsible for starting the subscription
and sending demand to producers.
A consumer MUST manage at least one subscription by sending a
subscription message to a producer. Once a subscription is
established, new connections MAY be established and demand MAY
be sent. Once demand is sent, messages may be received as
defined below:
-
{:"$gen_consumer", from :: {producer_pid, subscription_ref}, {:connect, producers :: [pid]}}
-
sent by producers to consumers to start new connections.
Once received, the consumer MAY immediately send demand to
the producer. The subscription_ref
is unique to identify
the subscription. If the subscription is not known, a
disconnect message must be sent back to each producer.
-
{:"$gen_consumer", from :: {producer_pid, subscription_ref}, {:disconnect, reason}}
-
sent by producers to disconnect a given producer-subscription pair.
It is used as a confirmation for client disconnects OR whenever
the producer wants to cancel some upstream demand. Reason may be
:done
, :halted
or :unknown_subscription
.
-
{:"$gen_consumer", from :: {producer_pid, subscription_ref}, [event]}
-
events sent by producers to consumers.
subscription_ref
identifies the subscription. The third argument
is a non-empty list of events. If the subscription is unknown, the
events must be ignored.
GenStage
GenStage is a generic stage that may act as a producer,
consumer or both. It is built on top of a GenServer with
the following changes:
init(args)
may return {:ok, state, opts}
where opts
MAY contain keys such as:
:subscribe_to
- the producer to subscribe to (enables consumer)
:max_demand
- the maximum demand it may ask from producer
:min_demand
- the minimum demand which, once reached, requests for more demand upstream
handle_event(event, from, state)
invoked on consumers.
Must return the same as GenServer.handle_info/2
.
handle_call/3
, handle_cast/2
and handle_info/2
will
be changed to allow emitting events (for producers).
handle_demand(demand, from, state)
invoked on producers.
Must return the same as GenStage.handle_call/2
.
TODO: Should we copy all of the GenServer API (call, cast, multicall) into GenStage
? Part of it?
Or should we ask them to use GenServer?
Consumer example
A simple consumer that inspects events:
defmodule InspectConsumer do
use GenStage
def init(_) do
# TODO: How to specify options for the subscription itself?
# I.e. the options in {:"$gen_producer", from, {:stage, options}}?
{:ok, %{}, subscribe_to: ..., max_demand: 50, min_demand: 25}
end
def handle_event(event, _from, state) do
IO.inspect event
{:noreply, state}
end
end
Producer example
A simple producer that returns data according to a counter:
defmodule CounterProducer do
use GenStage
def init(_) do
{:ok, 0}
end
def handle_demand(demand, _from, state) do
{:dispatch, Enum.to_list(counter..demand-1), counter + demand}
end
end
Broker
The broker is responsible for connecting M producers to
N consumers. The connections between producers and consumers
are established directly and not intermediated by the broker.
This means consumers will send demand to M producers and
producers will send events to N consumers. How the demand is
handled by the producer is done via a broker strategy.
Subscribing a consumer to a broker is the same as subscribing it
to any other producer. A broker may also subscribe itself to a
producer, the only difference from the producer perspective is
that subscription message is tagged as :broker
with a strategy
instead of :stage
(as specified in the "$gen_producer" messages
defined in earlier sections).
A broker will never send demand to its producers. That's because
the producer is never expected to send events directly to the
broker. Demand is always received directly from consumers and
events are sent directly to customers to avoid overhead.
A broker, however, will receive demand from consumers. Such
are used for dynamically dispatch events through the broker.
Finally, a broker is responsible for monitoring all producers
and consumers and relay the proper connect and disconnect
messages to producers and consumers.
Connection management
When a new producer is added to the broker, the broker will send
N connect messages to the producer, each referencing all the
existing N consumers. After it will send a connect message to all
existing N consumers referencing the producer. The Broker will
remain subscribed to the producer but never send demand upstream.
When a new consumer is added to the broker, a demand will be
established between broker and consumer, where the broker is
effectively a producer. This will be used for dynamic borker
dispatch. After the broker-consumer relationship is established,
the broker will send M subscribe messages to all existing M
producers referencing the consumer as well as M messages to
the new consumer referencing all M producers.
Broker strategy
TODO: specify all callbacks in the broker strategy
Dynamic broker dispatch
TODO: specify how dynamic dispatch through the broker works