arroyosystems / arroyo Goto Github PK
View Code? Open in Web Editor NEWDistributed stream processing engine in Rust
Home Page: https://arroyo.dev
License: Apache License 2.0
Distributed stream processing engine in Rust
Home Page: https://arroyo.dev
License: Apache License 2.0
DataFusion supports md5, sha224, sha256, sha384, sha512.
The below query should have row_num = 1, but it just partitions by the window.
SELECT counter, ROW_NUMBER() OVER (
PARTITION BY counter, window
ORDER BY counter DESC) AS row_num
FROM (
SELECT counter, hop(interval '1 second', interval '4 second') as window FROM impulse GROUP BY 1,2)
Is there any plan to support beam in arroyo?
When will StarRocks or Doris data sources be supported?
These functions come from DataFusion, and are equivalent to the postgresql regexp_match() and regexp_replace() functions. These are defined for Arrow arrays at https://github.com/apache/arrow-rs/blob/master/arrow-string/src/regexp.rs. Partial progress, such as a solution that does not support any of the optional flags would also be acceptable.
Right now, I don't see any 404 view. This would be nice to add as we start doing more error handling on the console.
Thank you very much for your team open source this project, when will the project be available?
This query which performs a self join using an alias:
select nexmark_50000.person as seller_record
from nexmark_50000
join nexmark_50000 as t2
on nexmark_50000.auction.seller = t2.person.id
where nexmark_50000.person is not null;
does not parse correctly from DF SQL to syn expr
only support converting column expressions to columns.
I've used statig
, which is open source, to implement FSM in the past with Rust, and I loved the user experience. I noticed that the state/transition logic in the controller is not using any library as its basis. It's all code written for Arroyo.
Would it make more sense to use an external library for such logic, so that we keep Arroyo specific for streaming and let the FSM logic be maintained by a project that specifically focuses on such?
Users can view their available sources by looking at the catalog, on the left sidebar of the query editor.
Currently the catalog returns types from our internal naming schema, like Int64
and UnixMillis
. These are returned by the GetSources
gRPC call, which returns a list of SourceDef
messages, each of which contains a list of SourceField
. Ultimately, you end up returning various PrimitiveType
s with names like Int64
, etc.
This API should instead return SQL types that will be familiar to SQL users, like bigint
and timestamp
.
As I tried to implement #25, I had to face two type of errors:
While it is probably true that the Arrow implementation returns error because it receive a String and not a regexp, and try to build the regexp internally, I think unwrapping the error assuming it will never lead to panicking is not ideal. That would be relying on internal implementation details rather than the signature/the types, so I think we should probably come up with a strategy for mapping lower level errors into Arroyo specific errors.
DataFusion also returns an algebraic data type from its API, so I think we need to clarify what's the approach should be for arroyo
https://github.com/apache/arrow-datafusion/blob/8a112484ac7ae89afc7006d56c65fba2dab106ce/datafusion/physical-expr/src/regex_expressions.rs#L54
Currently non-windowed joins in SQL always have a retention of 24 hours. This can be both much too long and much too short, depending on the use case. Flink currently handles this with a configurable execution option, table.exec.state.ttl. We could follow suit by allowing settings to be passed into the GrpcApi, or figure out a way to have it specified within the body of the query.
Currently COUNT DISTINCT is done along a single key, which can become very expensive as the number of distinct elements within that key grows. Flink uses a bucketing method based on the hash of the key to distribute computation of distinct elements, which lets it scale out: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/tuning/#split-distinct-aggregation. We should implement something similar.
In the SQL planning logic aggregates over sliding windows use a two-phase aggregation strategy, rolling up an intermediate aggregate over each step size then updating the overall aggregate window value on each step. The current implementation requires that the step evenly divides the width, and when that isn't true it falls back to a much slower approach.
It is possible to modify the current algorithm so that it works with offset sliding windows, as follows:
Let the width be W, the step be S and some nonzero remainder R = W % S. Rather than having everything happen at every step S, you take a set of actions at time T when T % S = 0, and another set when T % S = R.
When T%S=0
calculate the partial aggregate from [T-S, T). This is then processed by all currently active windows and any new windows become initialized.
When T%S=R
, calculate the final partial window of [T-R, T)
, then have the second phase aggregation consume it and emit for all live windows. If this is the last time a given window will be emitted, evict it from tracking.
With a basic developer setup and the simple tutorial pipeline, compiling is taking longer than 30 minutes.
I am seeing this repeated in the api
logs when on the main page for the pipeline:
2023-04-13T22:26:11.887964Z ERROR response failed{code="Internal" path="/arroyo_api.ApiGrpc/GetJobMetrics"}: arroyo_server_common: new
...
2023-04-13T22:26:42.494625Z ERROR response failed{code="Internal" path="/arroyo_api.ApiGrpc/GetJobMetrics"}: arroyo_server_common: new
Running via postgres -D /usr/local/var/postgres
I made sure to do the setup with the arroyo database, the correct arroyo user/pass and ran the migrations.
cd arroryo/arroyo-api
cargo run
cd arroyo/arroyo-controller
cargo run
pnpm run dev
All in separate panes in iterm mux
I'm going to let it run for an hour or so and check to see if it's made any progress.
Not seeing any logs that incidicate such, the logs that I see on the api
were added above, and here's what I'm seeing on the controller:
2023-04-13T22:20:54.478312Z INFO arroyo_controller: Using process scheduler
2023-04-13T22:20:54.486223Z INFO arroyo_controller: Starting arroyo-controller on 0.0.0.0:9190
2023-04-13T22:20:54.486307Z INFO arroyo_server_common: Starting arroyo-controller admin server on 0.0.0.0:9191
2023-04-13T22:20:54.507758Z INFO arroyo_controller::states: starting state machine job_id="hbmgfamd"
2023-04-13T22:20:54.509053Z INFO arroyo_controller::states: state transition job_id="hbmgfamd" from="Created" to="Compiling"
2023-04-13T22:20:54.516598Z INFO arroyo_controller::states::compiling: Compiling pipeline job_id="hbmgfamd" hash="f9ubevthoxw2g6c7"
2023-04-13T22:20:54.517804Z INFO arroyo_controller::compiler: digraph {
0 [ label = "source_0:UnboundedNexmarkSource<qps: 100>" ]
1 [ label = "watermark_1:Watermark" ]
2 [ label = "sink_4:NullSink" ]
3 [ label = "map_2:expression<fused<map,filter>:OptionalRecord>" ]
0 -> 1 [ label = "() → arroyo_types::nexmark::Event" ]
3 -> 2 [ label = "() → generated_struct_16429945049069439673" ]
1 -> 3 [ label = "() → arroyo_types::nexmark::Event" ]
}
2023-04-13T22:23:43.822146Z INFO arroyo_controller::states: starting state machine job_id="dtendbjv"
2023-04-13T22:23:43.823730Z INFO arroyo_controller::states: state transition job_id="dtendbjv" from="Created" to="Compiling"
2023-04-13T22:23:43.832809Z INFO arroyo_controller::states::compiling: Compiling pipeline job_id="dtendbjv" hash="kbnrqmwueowoiayb"
2023-04-13T22:23:43.834899Z INFO arroyo_controller::compiler: digraph {
0 [ label = "source_0:UnboundedNexmarkSource<qps: 100>" ]
1 [ label = "watermark_1:Watermark" ]
2 [ label = "sink_10:NullSink" ]
3 [ label = "map_2:expression<fused<map,aggregator_key>:Record>" ]
4 [ label = "aggregate_window_4:SlidingWindowAggregator<SlidingWindow(size: 1m, slide: 5s)>" ]
5 [ label = "aggregation_8:expression<fused<aggregation,map>:Record>" ]
6 [ label = "aggregation_5:expression<fused<aggregation,aggregator_key>:Record>" ]
7 [ label = "aggregate_window_7:TumblingWindowAggregator<TumblingWindow(0ms)>" ]
0 -> 1 [ label = "() → arroyo_types::nexmark::Event" ]
7 -> 5 [ label = "generated_struct_7077236267445092260 → generated_struct_11296174908488371850" ]
1 -> 3 [ label = "() → arroyo_types::nexmark::Event" ]
3 -> 4 [ label = "generated_struct_2862623353463512223 ⤨ generated_struct_16429945049069439673" ]
5 -> 2 [ label = "() → generated_struct_8568894031591935709" ]
4 -> 6 [ label = "generated_struct_2862623353463512223 → generated_struct_8556445246977061536" ]
6 -> 7 [ label = "generated_struct_7077236267445092260 ⤨ generated_struct_8241898945625653774" ]
}
The two pipelines that I'm trying to compile are:
SELECT count(distinct bid.price) AS count,
hop(interval '5 seconds', interval '60 seconds') AS window
FROM nexmark
GROUP BY window;
AND
select bid from nexmark where bid is not null;
I noticed that most of the functions in the API run self.authenticate
. For now, this is just a passthrough that returns some AuthData and always returns OK. Instead of having this logic running individually for each request, I'm wondering if this is something that could be run as a middleware or a tower::Layer
or somewhere that abstracts it away from the requests.
On startup, the controller spawns a tokio task that watches the postgres job_configs table. This is what allows it to respond to changes in configuration, starting, stopping and modifying pipelines.
However, if that connection fails (because of a configuration issue or because the DB is not available) the task will panic, but the overall controller process will continue on. However, because that task is gone it will not do any useful work.
This should be fixed by:
Currently if you stop a job while it is taking a checkpoint it will launch a second checkpoint in CheckpointStopping mode. The controller doesn't like this and the job will end up hung. Restarting the controller does put it back in a healthy state, but this is still pretty bad. The job controller should cleanly handle a stop signal mid-checkpoint.
20, 50, 100 etc.
Some sort of selection to limit the records so it doesn't grow infinitely but also get more than 20 records which is currently hardcoded would be awesome!
I think that the impl GenericClient
argument to these functions makes more sense to allow them to use either Transaction
or a generic Postgres client.
Would it make sense to refactor all of the functions such as create_connection
, delete_source
, get_pipelines
etc. to all use GenericClient
instead of a concrete type such as Transaction
?
Arroyo version 0.3.0
Using confluent schema registry during kafka source setup, this is the only response I am able to retrieve:
Missing 'schemaType' field in schema registry response
My set up is standard debezium + postgres w/confluent schema registry v6.1.0 using avro, self-hosted.
The input will be a possibly optional SystemTime, as that is the representation for timestamps currently. Follow the documentation from data fusion for date_trunc, date_part, and extract.
The most natural place is a new variant on Expression for DateFunction(), with a corresponding DateExpression. Testing should be done by adding additional assertions to arroyo-sql-testing.
We already support creating a Kafka connection with SASL auth, but it seems that lost the support for SSL certificate configs.
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md?plain=1#L63-L83
For me, the most important configs are:
ssl.certificate.pem
enable.ssl.certificate.verification
ssl.endpoint.identification.algorithm
Is there any release date plan of this feature?
It should be possible to use Kubernetes as a scheduler for jobs, implemented in a similar fashion to our current Nomad scheduler.
Besides blank values where a user would expect to see a number, list of pipelines, etc. There isn't an indication or a clear message that the web console could not connect to the API.
Just like what https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/values.yaml#L28 does.
Can we change the default image tag in the Helm values to tip
?
If the end users want to specify the version, they should override the field and defaults to tip (latest) for others.
Click continue, and finally give the source a name, like “nexmark_100,” and click “Publish” to finish creating the source.
SELECT bid FROM nexmark WHERE bid IS NOT NULL;
is the query that is displayed for the user to use as their first pipeline, however the previous step recommends the name nexmark_100
.
I knew instinctively that I should change this, but for new users this could be confusing, especially to those unfamiliar with SQL or data systems such as Arroyo.
Seems pedantic, but I believe that this could be a barrier to entry and this first pipeline process tutorial needs to be as smooth as possible for adoption of the tool.
Currently arroyo streams are of the style that Flink calls "append", where every new record is emitted from the operator. This limits the system in two ways:
First, it doesn't allow for updating single records. For example, doing a non-windowed outer join isn't currently supported because there is never a guarantee that the left or right side won't appear later.
Second, some sliding window applications only want to know when a key has entered or left the result set. Think, for example, of a query that detects unusual account behavior. Emitting all currently triggered keys could overload the downstream system. It should be possible to only emit the delta for such queries.
https://doc.arroyo.dev/developing/dev-setup
We use pnpm and vite for frotend development.
Should be frontend.
I had to run pnpm install
before building.
$ cd arroyo-console
$ pnpm install
$ pnpm build
Instructions on front end console states:
However, I had to run pnpm run dev
to start the local console.
When a node in the job graph is not keeping up with its input data, its input queue fills up and its upstreams cannot continue sending it data until it pulls items from the queue. This backpressures the upstream, which is blocked from doing work until it is able to write its messages downstream. In this way, we prevent faster upstreams from overloading slower downstreams. See this blogpost for more on the general theory of backpressure in streaming systems (although note the details are bit different in arroyo).
Figuring out whether and where backpressure is occurring is important for users to understand the behavior and performance of their pipelines.
In Arroyo, we have a metric arroyo_worker_tx_queue_rem
that reports how much space remains in a task's transmit queue. When this is 0, that means that the downstream node is causing backpressure on us.
This data should be visible in the UI. We already have infrastructure to pass metrics back to the UI (which currently powers the data rate graphs) so this would involve extending that API to add the arroyo_worker_tx_queue_rem
metric. For visualization, the simplest approach would be to color the nodes in the pipeline graph according to how backpressured they are (for example, as a fraction of the remaining queue size and total queue size).
The nodes in the graph represent logical operators, but in the physical execution each operator is subdivided into N
parallel subtasks. Similarly, each operator may have M
downstream nodes if the edge between them is a shuffle. The backpressure for an operator will be some combination of the backpressure of its parallel subtasks (median or min?).
So it will also be helpful to see the per-subtask backpressure, for example in the operator detail view that currently shows the data rate graphs.
For example GetJobs
should probably paginate results.
Right now, a user cannot stop a pipeline unless they navigate through the console to the pipeline's view.
I propose adding a button in the pipelines list view to stop a pipeline.
Pipelines cannot be deleted unless they are stopped, this makes the deletion process more streamlined.
This should mirror the default postgres behavior. After that, add support for to_char so users can specify the output string.
Pipelines have an associated parallelism configuration that controls how many parallel subtasks we run for each operator. (Inside the dataflow itself, we support operators have different parallelism, but in the current API we only allow setting a single parallelism across the entire job).
This parallelism is set to an inferred value at pipeline creation, however that may be too high or low depending on the actual data volume and complexity of the query.
There is a gRPC API (UpdateJob
) that allows users to change the parallelism of a running job, but it is not currently exposed on the Web UI.
This issue covers adding the ability to change the parallelism from the job details page (http://localhost:8000/jobs/{job_id}).
Note that because we do not currently support dynamic rescaling of pipelines, changing the parallelism triggers this sequence in the controller:
This can take several seconds, and the UI should reflect that the change is rolling out.
I'd like to see a GH discussion section for questions for users and devs that aren't necessarily issues, bugs, or feature requests. Would this be possible with the Apache license now?
We'd like to support joins between a windowed aggregate and a non-windowed stream. This would allow us to run Nexmark Query 6. The Arroyo version of the query is below. Ideally we'd also inspect the WHERE clause in order to not keep data around after the window has passed, as the only data from B1 we care about is that within active windows.
WITH
auction as (
SELECT auction.category as category,
auction.datetime as datetime,
auction.expires as expires,
auction.id as id
FROM nexmark where auction is not null),
bid as (
SELECT bid.auction as auction,
bid.bidder as bidder,
bid.extra as extra,
bid.datetime as datetime,
bid.price as price
FROM nexmark where bid is not null)
SELECT B.auction, B.price, B.bidder, B.dateTime, B.extra
from bid B
JOIN (
SELECT MAX(B1.price) AS maxprice, tumble(INTERVAL '10' SECOND) as window
FROM bid B1
GROUP BY 2
) B1
ON B.price = B1.maxprice
WHERE B.dateTime BETWEEN B1.window.start_time AND B1.window.end_time;
This does not give an error:
select 101.0 / 100 from nexmark_50000;
While this does:
select round(bid.price * 100) / 100 from nexmark_50000;
no implementation for f64 / {integer}
It should be possible to deploy the entire Kubernetes control plane (API, controller, compiler service) into Kubernetes, and along with #76 this setup should also be possible to schedule jobs on kubernetes.
Currently this is translated in a single call to get_program_from_operator().
However, this code path is overloaded, responsible for code generation, creation of the operator graph, edge construction and several in-lined optimizations. This results in unclear, complex code.
For this issue, the task is to define another representation, SqlPlanGraph which can be more directly translated to the Program.
[SqlOperator] - (graph construction, code gen, optimization) -> [Program].
[SqlOperator] -(graph construction) -> [SqlPlanGraph] - (optimizations) -> [SqlPlanGraph] - (code gen) -> [Program]
This test passes, but shouldn't
single_test_codegen!(
"mixed_type_addition",
"100.0",
arroyo_sql::TestStruct {
..Default::default()
},
100u64
);
And the correct version where the right term is 100f64 fails. This is the root cause of #59. @alex-astronomer.
Hi there 👋 ! I saw in the docs to open an issue to propose new connectors. Have you heard of NATS.io (https://github.com/nats-io/nats-server) by chance? It would be a great addition to the set of connectors. What is the process for contributing one? Thanks!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.