Giter Club home page Giter Club logo

Comments (11)

zheniantoushipashi avatar zheniantoushipashi commented on July 22, 2024
{
  "read_hdfs_json": {
    "desc": "测试",
    "strategy": "streaming.core.strategy.SparkStreamingRefStrategy",
    "algorithm": [],
    "ref": [],
    "compositor": [
       {
        "name": "streaming.core.compositor.spark.udf.SQLUDFCompositor",
        "params": [
          {
            "analysis": "streaming.core.compositor.spark.udf.func.MLFunctions"
          }
        ]
       },
       {
        "name": "streaming.core.compositor.spark.streaming.source.KafkaStreamingCompositor",
        "params": [{
                 "topics":"ms",
                 "auto.offset.reset": "largest",
                 "metadata.broker.list": "xs69:9092"
                }]
      },
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.RegexParserCompositor",
        "params": [{
           "patten":   "^^([\\S]+)[\\s]+(.+?)[\\s]+(.+?)[\\s]+\\[(.+?)\\][\\s]+\"([\\S]+)[\\s]+(.+?)[\\s]+(.+?)\"[\\s]+([\\d]+)[\\s]+([\\d]+)[\\s]+([\\d]+)[\\s]+\"(.*?)\"[\\s]+\"(.*?)\"[\\s]+\"(.*?)\"[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+\"(.*?)\"[\\s]+\"(.*?)\"[\\s]+(.+?)$",
           "captureNames": ["ip", "uident", "uname", "time", "method", "path", "protocol", "code", "size", "bodysize", "refer", "ua", "x-forward", "upstream-addr", "domain", "req-id", "upstream-response-time", "resptime", "x-from-cdn", "x-from-qnm", "hitmiss", "scheme"]
          }]
      },
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.ScalaMapToJSONCompositor",
        "params": [{}]
      },
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.JSONTableCompositor",
        "params": [
          {
            "tableName": "test"
          }
        ]
      },
      {
        "name": "streaming.core.compositor.spark.transformation.ScriptCompositor",
    "params": [
        {
        "inputTableName": "test",
        "outputTableName": "test3",
        "useDocMap": true
        },
        {
        "anykey": "hdfs://qiniuhadoop/streamingpro/minites.scala"
        }
    ]
      },
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
        "params": [
          {
            "sql": "select  *  , locIsp(ip) as  locIsp  from test3",
            "outputTableName": "test4"
          }
        ]
      }
    ],
    "configParams": {
    }
  },
  "traffic": {
    "desc": "测试",
    "strategy": "streaming.core.strategy.SparkStreamingStrategy",
    "algorithm": [],
    "ref": ["read_hdfs_json"],
    "compositor":[
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
        "params": [
          {
            "sql": "select  domain,  locIsp, sum(size)  as  traffic, count(*) as  requests  from test4  group by domain, locIsp"
          }
        ]
      },
      {
        "name":"streaming.core.compositor.spark.streaming.output.SQLKafkaOutputCompositor",
        "params":[
          {
            "topic":"streaming-traffic",
            "metadata.broker.list":"xs69:9092,xs70:9092,xs71:9092"
          }
        ]
     }
    ]
  }
}

from byzer-lang.

zheniantoushipashi avatar zheniantoushipashi commented on July 22, 2024

这个是我的配置文件

from byzer-lang.

allwefantasy avatar allwefantasy commented on July 22, 2024

建议写成这样:

{
  "read_hdfs_json": {
    "desc": "测试",
    "strategy": "streaming.core.strategy.SparkStreamingStrategy",
    "algorithm": [],
    "ref": ["udf_register"],
    "compositor": [       
       {
        "name": "streaming.core.compositor.spark.streaming.source.KafkaStreamingCompositor",
        "params": [{
                 "topics":"ms",
                 "auto.offset.reset": "largest",
                 "metadata.broker.list": "xs69:9092"
                }]
      },
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.RegexParserCompositor",
        "params": [{
           "patten":   "^^([\\S]+)[\\s]+(.+?)[\\s]+(.+?)[\\s]+\\[(.+?)\\][\\s]+\"([\\S]+)[\\s]+(.+?)[\\s]+(.+?)\"[\\s]+([\\d]+)[\\s]+([\\d]+)[\\s]+([\\d]+)[\\s]+\"(.*?)\"[\\s]+\"(.*?)\"[\\s]+\"(.*?)\"[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+(.+?)[\\s]+\"(.*?)\"[\\s]+\"(.*?)\"[\\s]+(.+?)$",
           "captureNames": ["ip", "uident", "uname", "time", "method", "path", "protocol", "code", "size", "bodysize", "refer", "ua", "x-forward", "upstream-addr", "domain", "req-id", "upstream-response-time", "resptime", "x-from-cdn", "x-from-qnm", "hitmiss", "scheme"]
          }]
      },
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.ScalaMapToJSONCompositor",
        "params": [{}]
      },
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.JSONTableCompositor",
        "params": [
          {
            "tableName": "test"
          }
        ]
      },
      {
        "name": "streaming.core.compositor.spark.transformation.ScriptCompositor",
    "params": [
        {
        "inputTableName": "test",
        "outputTableName": "test3",
        "useDocMap": true
        },
        {
        "anykey": "hdfs://qiniuhadoop/streamingpro/minites.scala"
        }
    ]
      },
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
        "params": [
          {
            "sql": "select  *  , locIsp(ip) as  locIsp  from test3",
            "outputTableName": "test4"
          }
        ]
      },
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",
        "params": [
          {
            "sql": "select  domain,  locIsp, sum(size)  as  traffic, count(*) as  requests  from test4  group by domain, locIsp"
          }
        ]
      },
      {
        "name":"streaming.core.compositor.spark.streaming.output.SQLKafkaOutputCompositor",
        "params":[
          {
            "topic":"streaming-traffic",
            "metadata.broker.list":"xs69:9092,xs70:9092,xs71:9092"
          }
        ]
     }
    ],
    "configParams": {
    }
  },
  "udf_register": {
    "desc": "UDF 函数库注册",
    "strategy": "streaming.core.strategy.SparkStreamingRefStrategy",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "streaming.core.compositor.spark.udf.SQLUDFCompositor",
        "params": [
          {
            "analysis": "streaming.core.compositor.spark.udf.func.MLFunctions"
          }
        ]
      }
    ]
  }
}

from byzer-lang.

allwefantasy avatar allwefantasy commented on July 22, 2024

在流式计算中,一般有一个主流程(也就是job),该流程是流式的一般一个输入(比如Kafka,然后被映射成表,假设是KafkaTable),一个输出(比如Kafka,hdfs,es等。然后通过 SparkStreamingRefStrategy 注册一些在主流程中会用到的表,在主流程中,会被KafkaTable关联使用。

udf注册类似Hive里的udfs,你自己提供了一个jar包,里面有合乎要求的udf函数可以供sql中使用。

我们不建议一个配置文件里有多个可以执行的Job(也就是策略为SparkStreamingStrategy),这样会导致配置文件过于复杂。 同时Spark streaming里如果跑多个job(这里的Job和spark里的job无关),维护调优也会很麻烦。

from byzer-lang.

allwefantasy avatar allwefantasy commented on July 22, 2024

BTW,你这个例子还是挺复杂的..... 如果性能遇到问题了,你可以单独开个话题进行探讨。

from byzer-lang.

zheniantoushipashi avatar zheniantoushipashi commented on July 22, 2024

你说的意思我都清楚了,但是这里就有一个问题解决不了,
比如我的调用链 a -> b -> c -> d -> e
a -> b -> c -> f ->g
两个都是主流程, 输入相同, 到 c 这一步处理都是一样, 所以我就想 a -> b -> c 注册为一个 SparkStreamingRefStrategy , 然后 d -> e 和 f ->g 都依赖它,
如果按照你的意思, 一个配置里面只有一个主流程 job, 那我必须启动两个 spark streaming 实例, 前面 a ->b ->c 这些没法复用, 都要重新处理一遍,

我觉得这是一个很大的限制啊, 因为 a -> b -> c 这些处理很耗费性能哦, 有没有什么好的解决方案

from byzer-lang.

zheniantoushipashi avatar zheniantoushipashi commented on July 22, 2024

也就是我没法 复用 RDD lineage , 做多路 计算输出

from byzer-lang.

zheniantoushipashi avatar zheniantoushipashi commented on July 22, 2024

你说的主流程从输入到处理到输出, 一路到头,
没法从中间 旁路计算输出, 我觉得这应该是很常见的场景哎

from byzer-lang.

allwefantasy avatar allwefantasy commented on July 22, 2024

以你前面的例子为例,你可以写成这样

将链路a -> b -> c 命名为K
K -> d -> e 
K -> f ->g

这个其实和

a -> b -> c -> d -> e 
a -> b -> c -> f ->g

在性能上其实是等价的。除非你将 K cache住(也就是Spark的 RDD.cache/persist) 而这个成本很大,一般而言都是cache不住的。

所以你前面提到的,仅仅是配置上的便利,并不会对性能造成影响。 而理论上,streamingpro也是支持那样配置的。但还是如我前面解释的,我们推荐一个job对应一个配置文件,这样方便管理。

from byzer-lang.

ww102111 avatar ww102111 commented on July 22, 2024

streaming.transformation.SQLCompositor"自定义的
这一部分是输出到数据库吗?

from byzer-lang.

allwefantasy avatar allwefantasy commented on July 22, 2024

不是。SQLCompositor 只是一个支持你填写SQL 的组件。 现在组件不需要配置那么繁琐的名字了 在0.4.7版本里,你只需要使用 batch.sql 或者stream.sql

from byzer-lang.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.