Comments (11)
{
"read_hdfs_json": {
"desc": "测试",
"strategy": "streaming.core.strategy.SparkStreamingRefStrategy",
"algorithm": [],
"ref": [],
"compositor": [
{
"name": "streaming.core.compositor.spark.udf.SQLUDFCompositor",
"params": [
{
"analysis": "streaming.core.compositor.spark.udf.func.MLFunctions"
}
]
},
{
"name": "streaming.core.compositor.spark.streaming.source.KafkaStreamingCompositor",
"params": [{
"topics":"ms",
"auto.offset.reset": "largest",
"metadata.broker.list": "xs69:9092"
}]
},
{
"name": "streaming.core.compositor.spark.streaming.transformation.RegexParserCompositor",
"params": [{
"patten": "^^([\\S]+)[\\s]+(.+?)[\\s]+(.+?)[\\s]+\\[(.+?)\\][\\s]+\"([\\S]+)[\\s]+(.+?)[\\s]+(.+?)\"[\\s]+([\\d]+)[\\s]+([\\d]+)[\\s]+([\\d]+)[\\s]+\"(.*?)\"[\\s]+\"(.*?)\"[\\s]+\"(.*?)\"[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+\"(.*?)\"[\\s]+\"(.*?)\"[\\s]+(.+?)$",
"captureNames": ["ip", "uident", "uname", "time", "method", "path", "protocol", "code", "size", "bodysize", "refer", "ua", "x-forward", "upstream-addr", "domain", "req-id", "upstream-response-time", "resptime", "x-from-cdn", "x-from-qnm", "hitmiss", "scheme"]
}]
},
{
"name": "streaming.core.compositor.spark.streaming.transformation.ScalaMapToJSONCompositor",
"params": [{}]
},
{
"name": "streaming.core.compositor.spark.streaming.transformation.JSONTableCompositor",
"params": [
{
"tableName": "test"
}
]
},
{
"name": "streaming.core.compositor.spark.transformation.ScriptCompositor",
"params": [
{
"inputTableName": "test",
"outputTableName": "test3",
"useDocMap": true
},
{
"anykey": "hdfs://qiniuhadoop/streamingpro/minites.scala"
}
]
},
{
"name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
"params": [
{
"sql": "select * , locIsp(ip) as locIsp from test3",
"outputTableName": "test4"
}
]
}
],
"configParams": {
}
},
"traffic": {
"desc": "测试",
"strategy": "streaming.core.strategy.SparkStreamingStrategy",
"algorithm": [],
"ref": ["read_hdfs_json"],
"compositor":[
{
"name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
"params": [
{
"sql": "select domain, locIsp, sum(size) as traffic, count(*) as requests from test4 group by domain, locIsp"
}
]
},
{
"name":"streaming.core.compositor.spark.streaming.output.SQLKafkaOutputCompositor",
"params":[
{
"topic":"streaming-traffic",
"metadata.broker.list":"xs69:9092,xs70:9092,xs71:9092"
}
]
}
]
}
}
from byzer-lang.
这个是我的配置文件
from byzer-lang.
建议写成这样:
{
"read_hdfs_json": {
"desc": "测试",
"strategy": "streaming.core.strategy.SparkStreamingStrategy",
"algorithm": [],
"ref": ["udf_register"],
"compositor": [
{
"name": "streaming.core.compositor.spark.streaming.source.KafkaStreamingCompositor",
"params": [{
"topics":"ms",
"auto.offset.reset": "largest",
"metadata.broker.list": "xs69:9092"
}]
},
{
"name": "streaming.core.compositor.spark.streaming.transformation.RegexParserCompositor",
"params": [{
"patten": "^^([\\S]+)[\\s]+(.+?)[\\s]+(.+?)[\\s]+\\[(.+?)\\][\\s]+\"([\\S]+)[\\s]+(.+?)[\\s]+(.+?)\"[\\s]+([\\d]+)[\\s]+([\\d]+)[\\s]+([\\d]+)[\\s]+\"(.*?)\"[\\s]+\"(.*?)\"[\\s]+\"(.*?)\"[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+\"(.*?)\"[\\s]+\"(.*?)\"[\\s]+(.+?)$",
"captureNames": ["ip", "uident", "uname", "time", "method", "path", "protocol", "code", "size", "bodysize", "refer", "ua", "x-forward", "upstream-addr", "domain", "req-id", "upstream-response-time", "resptime", "x-from-cdn", "x-from-qnm", "hitmiss", "scheme"]
}]
},
{
"name": "streaming.core.compositor.spark.streaming.transformation.ScalaMapToJSONCompositor",
"params": [{}]
},
{
"name": "streaming.core.compositor.spark.streaming.transformation.JSONTableCompositor",
"params": [
{
"tableName": "test"
}
]
},
{
"name": "streaming.core.compositor.spark.transformation.ScriptCompositor",
"params": [
{
"inputTableName": "test",
"outputTableName": "test3",
"useDocMap": true
},
{
"anykey": "hdfs://qiniuhadoop/streamingpro/minites.scala"
}
]
},
{
"name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
"params": [
{
"sql": "select * , locIsp(ip) as locIsp from test3",
"outputTableName": "test4"
}
]
},
{
"name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
"params": [
{
"sql": "select domain, locIsp, sum(size) as traffic, count(*) as requests from test4 group by domain, locIsp"
}
]
},
{
"name":"streaming.core.compositor.spark.streaming.output.SQLKafkaOutputCompositor",
"params":[
{
"topic":"streaming-traffic",
"metadata.broker.list":"xs69:9092,xs70:9092,xs71:9092"
}
]
}
],
"configParams": {
}
},
"udf_register": {
"desc": "UDF 函数库注册",
"strategy": "streaming.core.strategy.SparkStreamingRefStrategy",
"algorithm": [],
"ref": [],
"compositor": [
{
"name": "streaming.core.compositor.spark.udf.SQLUDFCompositor",
"params": [
{
"analysis": "streaming.core.compositor.spark.udf.func.MLFunctions"
}
]
}
]
}
}
from byzer-lang.
在流式计算中,一般有一个主流程(也就是job),该流程是流式的一般一个输入(比如Kafka,然后被映射成表,假设是KafkaTable),一个输出(比如Kafka,hdfs,es等。然后通过 SparkStreamingRefStrategy 注册一些在主流程中会用到的表,在主流程中,会被KafkaTable关联使用。
udf注册类似Hive里的udfs,你自己提供了一个jar包,里面有合乎要求的udf函数可以供sql中使用。
我们不建议一个配置文件里有多个可以执行的Job(也就是策略为SparkStreamingStrategy),这样会导致配置文件过于复杂。 同时Spark streaming里如果跑多个job(这里的Job和spark里的job无关),维护调优也会很麻烦。
from byzer-lang.
BTW,你这个例子还是挺复杂的..... 如果性能遇到问题了,你可以单独开个话题进行探讨。
from byzer-lang.
你说的意思我都清楚了,但是这里就有一个问题解决不了,
比如我的调用链 a -> b -> c -> d -> e
a -> b -> c -> f ->g
两个都是主流程, 输入相同, 到 c 这一步处理都是一样, 所以我就想 a -> b -> c 注册为一个 SparkStreamingRefStrategy , 然后 d -> e 和 f ->g 都依赖它,
如果按照你的意思, 一个配置里面只有一个主流程 job, 那我必须启动两个 spark streaming 实例, 前面 a ->b ->c 这些没法复用, 都要重新处理一遍,
我觉得这是一个很大的限制啊, 因为 a -> b -> c 这些处理很耗费性能哦, 有没有什么好的解决方案
from byzer-lang.
也就是我没法 复用 RDD lineage , 做多路 计算输出
from byzer-lang.
你说的主流程从输入到处理到输出, 一路到头,
没法从中间 旁路计算输出, 我觉得这应该是很常见的场景哎
from byzer-lang.
以你前面的例子为例,你可以写成这样
将链路a -> b -> c 命名为K
K -> d -> e
K -> f ->g
这个其实和
a -> b -> c -> d -> e
a -> b -> c -> f ->g
在性能上其实是等价的。除非你将 K cache住(也就是Spark的 RDD.cache/persist) 而这个成本很大,一般而言都是cache不住的。
所以你前面提到的,仅仅是配置上的便利,并不会对性能造成影响。 而理论上,streamingpro也是支持那样配置的。但还是如我前面解释的,我们推荐一个job对应一个配置文件,这样方便管理。
from byzer-lang.
streaming.transformation.SQLCompositor"自定义的
这一部分是输出到数据库吗?
from byzer-lang.
不是。SQLCompositor 只是一个支持你填写SQL 的组件。 现在组件不需要配置那么繁琐的名字了 在0.4.7版本里,你只需要使用 batch.sql 或者stream.sql
from byzer-lang.
Related Issues (20)
- !fs 命令要支持全路径(带协议头的)
- datalake does not support huawei cloud obs
- `streaming.mlsql.script.path` do not support yarn-client mode
- 日志文件 byzer.out 无限增长
- Execute Script File should expose more control parameters HOT 1
- all-in-one 模式配置文件不支持 java 内存参数
- 添加 jvm 启动参数gclog 后,启动失败
- Failed to start byzer-lang when using jdk17
- python代码如何读写HDFS
- 建议参考 hex & juypter UI 风格优化 HOT 1
- Optimize the ablility of Model2UDF
- Byzer-lang should catch ScalaUDF Compilation error HOT 1
- 为什么byzer.org无法访问,无法下载最新版本 HOT 1
- Why is there no tag v2.3.8
- org.apache.spark.SparkException: Failed to execute user defined function (Ray$$Lambda$6471/213060338: (array<string>) => array<string>) HOT 1
- 为什么显示不支持chatglm2-6b微调
- 使用docker-compose启动如何设置持久化数据
- 无法连接elasticsearch
- 【Feature】支持将mysql数据load成文件 存储到juicefs中
- 源码不开源了么
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from byzer-lang.