Giter Club home page Giter Club logo

ksqldb-recipes's Introduction

Stream Processing Use Cases with ksqlDB [DEPRECATED]

This repository contains the source for the collection of Stream Processing Use Cases with ksqlDB.

https://developer.confluent.io/tutorials/use-cases.html

Source moved to https://github.com/confluentinc/kafka-tutorials

Goals of the project:

  • Provide short, concrete descriptions of how ksqlDB is used in the real world—including SQL code.
  • Make it easy to replicate that code end-to-end, with a 1-click experience to populate the code into the ksqlDB editor in Confluent Cloud Console.

How to contribute

We welcome all contributions, thank you!

Contributing an idea? Submit a GitHub issue.

Contributing a full recipe to be published?

  1. Self-assign a recipe idea from the list in GitHub issues.
  2. Create a new branch (based off main) for the new recipe
  3. Create a new subfolder for the new recipe, e.g. docs/<industry>/<new-recipe-name>. Note: <new-recipe-name> is the slug in Confluent Cloud. Use hyphens, not underscores.
  4. The recipe should follow the structure of existing recipes. Copy the contents of an existing recipe (e.g. aviation) or the template directory as the basis for your new recipe.
  • index.md: explain the use case, why it matters, add a graphic if available
  • source.json: JSON configuration to create Confluent Cloud source connectors to pull from a real end system
  • source.sql: SQL-equivalent of source.json (this file is not referenced today in index.md, but getting ready for ksqlDB-connect integration)
  • manual.sql: SQL commands to insert mock data into Kafka topics, if a user does not have a real end system
  • process.sql: this is the core code of the recipe, the SQL commands that correspond to the event stream processing
  • sink.json: (optional) JSON configuration to create Confluent Cloud sink connectors to push results to a real end system
  • sink.sql: (optional unless sink.json is provided) SQL-equivalent of sink.json (this file is not referenced today in index.md, but getting ready for ksqlDB-connect integration)
  1. Validation: you do not need to create a real end system, real data, and a real source connector, but you should ensure the connector configuration is syntactically correct. Do validate that the core ksqlDB stream processing code works with the manual INSERT INTO statements, and that the last ksqlDB query returns the expected records.

  2. Submit a GitHub Pull Request. Ensure the new recipe adheres to the checklist and then tag confluentinc/devx for review.

Handling connectors

A recipe is more compelling if it uses Confluent Cloud fully-managed connectors, especially when the ksqlDB-connect integration is ready. But what if the recipe you want to write does not have a connector available in Confluent Cloud? Some options for your to consider, in order of preference:

  1. Stick with the original recipe idea, but use another connector in Confluent Cloud, that still fits the use case
  2. Pick a different recipe, maybe in the same industry, that uses a connector available in Confluent Cloud. This maximizes the impact of your recipe contribution
  3. Stick with your original recipe idea, and use a self-managed connector that runs locally. Follow precedent steps in this recipe

Build recipes docs locally

To view your new recipes locally, you can build a local version of the recipes site with mkdocs.

  • Install mkdocs (https://www.mkdocs.org/)

    On macOS, you can use Homebrew:

    brew install mkdocs
    pip3 install mkdocs pymdown-extensions
    pip3 install mkdocs-material
    pip3 install mkdocs-exclude
    pip3 install mkdocs-redirects
  • Build and serve a local version of the site. In this step, mkdocs will give you information if you have any errors in your new recipe file.

    python3 -m mkdocs serve  
    

    (If this doesn't work try mkdocs serve on its own)

  • Point a web browser to the local site at http://localhost:8000 and navigate to your new recipe.

Publishing recipes to live site

If you are a Confluent employee, you can publish using the mkdocs GitHub integration. From the main branch (in the desired state):

ksqldb-recipes's People

Contributors

bbejeck avatar danicafine avatar daveklein avatar fifthposition avatar jnh5y avatar krisajenkins avatar rspurgeon avatar vsyu avatar ybyzek avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ksqldb-recipes's Issues

Verify aviation recipe and whether `FLIGHT_ID` needs to be projected because it is used in the INNER JOIN

From @wicknicks

just a quick pointer: was going through this https://confluentinc.github.io/ksqldb-recipes/customer-360/aviation/#ksqldb-code, and noticed that the CREATE STREAM customer_flight_updates ... statement must also project a FLIGHT_ID column too? it’d be an error otherwise?

yes, I got an error (but I was using some internal ksqldb libs when working with this recipe though). so you might want to take a run too, in case I had something wrong with my setup.

Mainframe offload

Describe the use case
Recipe author needs to work on the use case description, pull it from https://www.confluent.io/use-case/mainframe-offload/

Provide the ksqlDB application

Src for Confluent-internal folks.

Need to convert this from Docker local to Confluent Cloud (e.g. custom schemas as used in'schema.filename' = '/usr/share/java/confluent-common/deposit-val.avsc' is not supported in Confluent Cloud)

echo "CREATE SOURCE CONNECTOR datagen_transactions_01 WITH ( \
         'connector.class' = 'io.confluent.kafka.connect.datagen.DatagenConnector'  \
        , 'kafka.topic' = 'transactions'  \
        , 'schema.filename' = '/usr/share/java/confluent-common/deposit-val.avsc'  \
        , 'iterations' = '5000000'      \
        , 'tasks' = '2' \
        , 'tasks.max' = '2' \
        , 'topic.creation.default.partitions' = '2' \
        , 'topic.creation.default.replication.factor' = '1' \
        );" | ksql http://ksql1:8088

        echo " CREATE STREAM IF NOT EXISTS TRANSACTIONS  WITH (kafka_topic='transactions', value_format='avro');  " | ksql http://ksql1:8088

        echo " CREATE STREAM IF NOT EXISTS ST_TRANSACTIONS WITH (kafka_topic='st_transactions', value_format='avro') \
               AS SELECT CAST(DEP_ACCOUNT_NO as STRING) as DEP_ACCOUNT_NO \
              , CONCAT('$$',CAST(DEP_BALANCE_INT + (DEP_BALANCE_CENTS_INT/100) AS STRING)) as BALANCE \
              , UNIX_TIMESTAMP() - rowtime    AS delta_ms \
              , rowtime                       AS ts_1_capture \
              , UNIX_TIMESTAMP()              AS ts_2_stream \
              , TIMESTAMPTOSTRING(ROWTIME         , 'yyyy-MM-dd HH:mm:ss.SSS') AS ts_1 \
              , TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss.SSS') AS ts_2 \
              FROM TRANSACTIONS;" | ksql http://ksql1:8088

        echo "CREATE TABLE IF NOT EXISTS balance_cache WITH (kafka_topic='balance_cache', value_format='avro',key_format='avro') AS \
              SELECT  DEP_ACCOUNT_NO       \
              , latest_by_offset(balance) as BALANCE \
              , latest_by_offset(DELTA_MS) as DELTA_1_MS \
              , (Latest_by_offset(UNIX_TIMESTAMP()) - Latest_by_offset(ts_1_capture))  AS delta_2_ms \
              , latest_by_offset(TS_1_CAPTURE) as TS_1_CAPTURE \
              , latest_by_offset(TS_2_STREAM) as TS_2_STREAM \
              , Latest_by_offset(UNIX_TIMESTAMP()) AS TS_3_CACHE \
              , Latest_by_offset(ts_1) as TS_1 \
              , Latest_by_offset(ts_2) as TS_2 \
              , Latest_by_Offset(TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss.SSS')) AS ts_3 \
              FROM ST_TRANSACTIONS \
              GROUP BY DEP_ACCOUNT_NO; " | ksql http://ksql1:8088


        echo "CREATE TABLE IF NOT EXISTS influx_balance_cache_latency  WITH (kafka_topic='influx_balance_cache_latency',value_format='KAFKA') AS \
             SELECT 'X' as X, replace(replace(replace('{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"map\",  \
                                                     \"keys\":{\"type\":\"string\", \"optional\":false},  \
                                                   \"values\":{\"type\":\"string\",\"optional\":false},\"optional\":false,  \
                                                             \"field\":\"tags\"},{\"type\":\"string\",\"optional\":false,  \
                                                             \"field\":\"time\"}, {\"type\":\"double\",\"optional\":true,  \
                                                             \"field\":\"value\"}],  \
                                                  \"optional\":false,\"version\":1},  \
                                                  \"payload\":{\"tags\":{\"id\":\"REPLACEME_ID\"},  \
                                                                   \"time\":\"REPLACEME_TS\",  \
                                                                   \"value\":REPLACEME_VALUE}}'   \
              ,'REPLACEME_ID'     ,TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd-HH:mm:ss'))   \
              ,'REPLACEME_TS'     ,TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd-HH:mm:ss'))   \
              ,'REPLACEME_VALUE'  ,CAST(Avg( UNIX_TIMESTAMP() - ts_1_capture) AS STRING))   \
              as influx_json_row   \
              FROM ST_TRANSACTIONS \
              WINDOW TUMBLING (SIZE 5 SECONDS) \
              GROUP BY 'X'; " | ksql http://ksql1:8088



        echo "CREATE TABLE IF NOT EXISTS influx_balance_cache_count WITH (kafka_topic='influx_balance_cache_count',value_format='KAFKA') AS \
             SELECT 'X' as X, replace(replace(replace('{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"map\",  \
                                                     \"keys\":{\"type\":\"string\", \"optional\":false},  \
                                                   \"values\":{\"type\":\"string\",\"optional\":false},\"optional\":false,  \
                                                             \"field\":\"tags\"},{\"type\":\"string\",\"optional\":false,  \
                                                             \"field\":\"time\"}, {\"type\":\"double\",\"optional\":true,  \
                                                             \"field\":\"value\"}],  \
                                                  \"optional\":false,\"version\":1},  \
                                                  \"payload\":{\"tags\":{\"id\":\"REPLACEME_ID\"},  \
                                                                   \"time\":\"REPLACEME_TS\",  \
                                                                   \"value\":REPLACEME_VALUE}}'   \
              ,'REPLACEME_ID'     ,TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyy-MM-dd-HH:mm:ss'))   \
              ,'REPLACEME_TS'     ,TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyy-MM-dd-HH:mm:ss'))   \
              ,'REPLACEME_VALUE'  ,CAST(count(*) AS STRING)       )   \
              as influx_json_row   \
              FROM BALANCE_CACHE \
              GROUP BY 'X'; " | ksql http://ksql1:8088

        # turning on TRACE for influxDB connectors
        #sleep 5
        #curl -s -X PUT -H "Content-Type:application/json" http://connect1:8083/admin/loggers/io.confluent.influxdb -d '{"level": "TRACE"}'
        sleep 5
        echo " CREATE SINK CONNECTOR INFLUX_COUNT_BALANCE_CACHE  WITH ( \
                         'connector.class'='io.confluent.influxdb.InfluxDBSinkConnector'  \
                ,              'tasks.max'='2'  \
                ,                 'topics'='influx_balance_cache_count'  \
                ,           'influxdb.url'='http://influxdb:8086'  \
                ,            'influxdb.db'='telegraf'  \
                ,     'influxdb.username'='root'  \
                ,      'influxdb.password'='root'  \
                ,'measurement.name.format'='balance_cache_count'  \
                ,        'value.converter'='org.apache.kafka.connect.json.JsonConverter' \
                ); " | ksql http://ksql1:8088

        sleep 5
        echo " CREATE SINK CONNECTOR INFLUX_LATENCY_BALANCE_CACHE  WITH ( \
                         'connector.class'='io.confluent.influxdb.InfluxDBSinkConnector'  \
                ,              'tasks.max'='2'  \
                ,                 'topics'='influx_balance_cache_latency'  \
                ,           'influxdb.url'='http://influxdb:8086'  \
                ,            'influxdb.db'='telegraf'  \
                ,     'influxdb.username'='root'  \
                ,      'influxdb.password'='root'  \
                ,'measurement.name.format'='balance_cache_latency'  \
                ,        'value.converter'='org.apache.kafka.connect.json.JsonConverter' \
                ); " | ksql http://ksql1:8088

        sleep 10
        echo "CREATE SOURCE CONNECTOR datagen_transactions_02 WITH ( \
         'connector.class' = 'io.confluent.kafka.connect.datagen.DatagenConnector'  \
        , 'kafka.topic' = 'transactions'  \
        ,'schema.filename' = '/usr/share/java/confluent-common/deposit-val.avsc'  \
        , 'iterations' = '5000000'      \
        , 'tasks' = '1' \
        , 'tasks.max' = '1' \
        , 'topic.creation.default.partitions' = '2' \
        , 'topic.creation.default.replication.factor' = '1' \
        );" | ksql http://ksql1:8088
        sleep 3
        curl -X PUT http://connect1:8083/connectors/DATAGEN_TRANSACTIONS_02/pause
        curl -X GET http://connect1:8083/connectors/DATAGEN_TRANSACTIONS_02/status

        sleep 3
        echo "CREATE SOURCE CONNECTOR datagen_transactions_03 WITH ( \
         'connector.class' = 'io.confluent.kafka.connect.datagen.DatagenConnector'  \
        , 'kafka.topic' = 'transactions'  \
        ,'schema.filename' = '/usr/share/java/confluent-common/deposit-val.avsc'  \
        , 'iterations' = '5000000'      \
        , 'tasks' = '1' \
        , 'tasks.max' = '1' \
        , 'topic.creation.default.partitions' = '2' \
        , 'topic.creation.default.replication.factor' = '1' \
        );" | ksql http://ksql1:8088
        sleep 3
        curl -X PUT http://connect1:8083/connectors/DATAGEN_TRANSACTIONS_03/pause
        curl -X GET http://connect1:8083/connectors/DATAGEN_TRANSACTIONS_03/status


        echo "DROP CONNECTOR pull_queries_logs; " | ksql http://ksql1:8088
        echo " CREATE Source CONNECTOR pull_queries_logs WITH (     \
        'tasks.max'= '1'     \
        ,'name'='PULL_QUERIES_LOGS'     \
        ,'connector.class'='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector'    \
        ,'input.file.pattern'='.*\.pending_load'    \
        ,   'input.path'='/tmp/consumer-output'    \
        ,   'error.path'='/tmp/consumer-output/error'    \
        ,'finished.path'='/tmp/consumer-output/finished'    \
        ,'cleanup.policy'='DELETE' \
        ,'halt.on.error'='false'    \
        ,'topic'='pull-queries-logs'    \
        ,'confluent.topic.bootstrap.servers'= 'kafka1:9092'    \
        ,'confluent.topic.replication.factor'= '1'    \
        ,'schema.generation.enabled'='true' \
        ,'value.schema'='{\"name\" : \"pull.queries.timing\",  \"type\" : \"STRUCT\",  \"isOptional\" : false,  \"fieldSchemas\" : {    \"ACCOUNT_ID\" : {      \"type\" : \"STRING\",      \"isOptional\" : false    },    \"ACCOUNT_BALANCE\" : {      \"type\" : \"STRING\",      \"isOptional\" : true    },    \"PULL_QUERY_MS\" : {      \"type\" : \"STRING\" , \"isOptional\" : true    }   } }' \
        ,'key.schema'='{  \"name\" : \"pull.queries.timing\",  \"type\" : \"STRUCT\",  \"isOptional\" : false,  \"fieldSchemas\" : {    \"ACCOUNT_ID\" : {      \"type\" : \"STRING\",      \"isOptional\" : false    } } }' \
        );" | ksql http://ksql1:8088
 

        #sleep required for the connector to find the primer file; load it; register the schema
        sleep 30
        echo "create stream st_pull_queries_logs with (kafka_topic='pull-queries-logs', value_format='AVRO');" \
        | ksql http://ksql1:8088

        sleep 10
        echo "CREATE TABLE IF NOT EXISTS influx_balance_cache_query_latency WITH (kafka_topic='influx_balance_cache_query_latency',value_format='KAFKA') AS \
             SELECT 'X' as X, replace(replace(replace('{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"map\",  \
                                                     \"keys\":{\"type\":\"string\", \"optional\":false},  \
                                                   \"values\":{\"type\":\"string\",\"optional\":false},\"optional\":false,  \
                                                             \"field\":\"tags\"},{\"type\":\"string\",\"optional\":false,  \
                                                             \"field\":\"time\"}, {\"type\":\"double\",\"optional\":true,  \
                                                             \"field\":\"value\"}],  \
                                                  \"optional\":false,\"version\":1},  \
                                                  \"payload\":{\"tags\":{\"id\":\"REPLACEME_ID\"},  \
                                                                       \"value\":REPLACEME_VALUE}}'   \
              ,'REPLACEME_ID'     ,TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd-HH:mm:ss'))   \
              ,'REPLACEME_TS'     ,TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd-HH:mm:ss'))   \
              ,'REPLACEME_VALUE'  ,CAST(avg(CAST(PULL_QUERY_MS AS DOUBLE)) AS STRING)       )   \
              as influx_balance_cache_query_latency   \
              FROM st_pull_queries_logs \
              WINDOW TUMBLING (SIZE 5 SECONDS) \
              GROUP BY 'X'; 

Partition count consistency: 6

docs/customer-360/online-dating/process.sql:  PARTITIONS = 3
docs/customer-360/loyalty-rewards/process.sql:  PARTITIONS = 3
docs/customer-360/loyalty-rewards/process.sql:  PARTITIONS = 3
docs/customer-360/loyalty-rewards/process.sql:  PARTITIONS = 3
docs/real-time-analytics/datacenter/process.sql:  PARTITIONS=3,
docs/real-time-analytics/datacenter/process.sql:  PARTITIONS=3,
docs/real-time-analytics/datacenter/index.md:  partitions=3,
docs/real-time-analytics/datacenter/index.md:  partitions=3,

Should be 6 (per product defaults and other recipes)

Integrate disparate messaging system

Describe the use case
With Confluent you can integrate disparate messaging systems (IBM, Rabbit, Tibco, etc) with modern data stores both on-premises and in the cloud. This enables you to lower your TCO as you modernize your business. With Confluent, you can scale your capacity dramatically, and integrate your messaging systems with Confluent’s event streaming system with ease. This paper outlines the solution architecture for the integration of messaging systems with Confluent.

Provide the ksqlDB application

https://github.com/confluentinc/pmm/tree/master/messaging-modernization/ksqldb

Track customer journeys throughout your SaaS

Describe the use case

You have a SaaS company where your customers browse pages online - maybe you’re an online retailer in this case. You want to know what your customer's journey has been throughout the site so that you can do analytics or understand what exactly they did if they call in for support. The ksqlDB output could be sent out to ElasticSearch.

This one seems pretty simple: make session windows per user and use a collect_list aggregator on all the pages they visited. Show some example queries accessing that page set.

Provide the ksqlDB application
Document the full set of ksqlDB statements that represent the use case.

N/A -- need to create ksqlDB code. Key is tumbling windows with COLLECT_LIST

Filter Confluent Audit Logs before sending to Splunk

Describe the use case
Confluent Cloud Audit Logs cluster, being read-only. Use ksqlDB to filter Audit Log events of interest (in-Kafka SIEM processing on these logs), then use a connector to send to Splunk

Provide the ksqlDB application

/*********** WORKING QUERY ****************/
CREATE STREAM audit_log_events (
time VARCHAR,
data STRUCT<
    serviceName VARCHAR, 
    methodName VARCHAR, 
    resourceName VARCHAR, 
    authenticationInfo STRUCT<principal VARCHAR>, 
    authorizationInfo STRUCT<granted BOOLEAN,
        operation VARCHAR,
        resourceType VARCHAR,
        resourceName VARCHAR,
        patternType VARCHAR,
        rbacAuthorization STRUCT<role VARCHAR>
    >
 >)
WITH (kafka_topic='confluent-audit-log-events', value_format='JSON', timestamp='time', timestamp_format='yyyy-MM-dd''T''HH:mm:ss.SSSX');

/************* SELECT FROM THAT STREAM  **********/
CREATE STREAM audit_log_topics
WITH (KAFKA_TOPIC='ksql_topic_audit_log_topics', PARTITIONS=12, REPLICAS=1) 
AS SELECT *
FROM  AUDIT_LOG_EVENTS 
WHERE (DATA->AUTHORIZATIONINFO->RESOURCETYPE = 'Topic')
EMIT CHANGES;

Logistics from orders and vehicle statuses

Describe the use case
This is a real-time logistics use case demo which mainly uses ksqlDB in Confluent Cloud for stream processing. It mainly uses two streams of data from orders and vehicle statuses. The demo showcases Confluent Cloud's features like:

  • Real-time order tracking from joins on ksqlDB with streams and tables
  • Masking data from customer orders
  • Schema registry tags on PII columns
  • Stream lineage showing the entire streaming flow
  • Tracking vehicles on a map in real time

architecture

Provide the ksqlDB application
Document the full set of ksqlDB statements that represent the use case.

https://github.com/confluentinc/apac-workshops/blob/master/ccloud-logistics-ksql/docs/ksql/queries.sql

Cybersecurity: Detect Slowloris DDoS attack

Describe the use case

Slowloris DDoS attack: its goal is to shut down your activity by creating and maintaining lots of HTTP connections. To perform a DDoS attack, small HTTP connections are initiated by the malicious script; this requires low effort on the attacker side, but it can bring your server down. When the server has no more connections available, it will drop new connections and send a RESET flag in the TCP packet. You can see if there is a peak of RESET packets sent by a server to a specific destination by analyzing the tcp_flags_reset flag.

Provide the ksqlDB application

https://www.confluent.io/blog/build-a-intrusion-detection-using-ksqldb/

Location Based Alerting Demo (geohash version)

Describe the use case
https://github.com/confluentinc/apac-workshops/tree/d134877b5587bdfc842c7231abbb96fa92e30420/marketing-locationbased-geohash

Show real time, personalized location based alerts in real time. User and merchant data from a database is joined against real time user locations (e.g. from mobile devices) to provide alerts when users pass within 200 meters of a participating merchant. User locations are subdivided by geohash and users receive an alert if they come within 200m of any participating merchant (the red circle).

Provide the ksqlDB application
Document the full set of ksqlDB statements that represent the use case.

https://github.com/confluentinc/apac-workshops/blob/master/marketing-locationbased-geohash/ksql-statements.sql

Flatten docs folder

Remove subfolders since we can specific groupings in the nav: https://github.com/confluentinc/ksqldb-recipes/blob/main/mkdocs.yml. Therefore the idea would be to flatten https://github.com/confluentinc/ksqldb-recipes/tree/main/docs, which decouples the recipe folder from its category assignment (since the categories have changed this in the past).

@colinhicks this could impact the ksqldb-recipes and in-product tutorial integration, so we can talk through this (what it should look like, timing, etc)

Cyber Security: Identify Firewall Deny events from Splunk

Describe the use case
The purpose of this project is to provide a demonstration on how to optimize your Splunk data ingestion by using Confluent. This guide will route data from a Splunk Universal Forwarder running an eventgenerator, to the Confluent Splunk-2-Splunk Source Connector while bypassing the Splunk indexing layer and retaining all of the critical metadata assosciated with each data source (source, sourcetype, host, event, _meta).

siem_optimization

Provide the ksqlDB application
Document the full set of ksqlDB statements that represent the use case.

https://github.com/JohnnyMirza/confluent_splunk_demo/blob/main/statements.sql

Convert Avro to JSON

Current onboarding tutorial workflow does not include enabling Schema Registry, so change recipes that use Avro to use JSON instead.

  1. anomaly-detection/salesforce/process.sql: VALUE_FORMAT = 'AVRO',
  2. customer-360/online-dating/process.sql: VALUE_FORMAT = 'AVRO',
  3. customer-360/loyalty-rewards/process.sql: VALUE_FORMAT = 'AVRO',
  4. customer-360/aviation/process.sql: , FORMAT = 'AVRO'

Also update the template

cc: @colinhicks thanks for surfacing this

Effective price based on time

Describe the use case

Database has a list of dates and prices...want to store the effective price in a KTable, where the effective price is defined as the price's date being greater than the current date, and closest to the current date. Can create dashboards, for example, with hourly revenue. (Recipe author needs to develop this use case a bit more...)

Provide the ksqlDB application

Src for Confluent-internal access, bonus it uses lambdas

CREATE STREAM CUSTOMER_BILLS 
WITH (TIMESTAMP='timestamp', TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ss''Z''') AS 
SELECT
  METRICS.LC_ID AS LC_ID,
  PRICES.KEY AS PRICE_KEY,
  (METRICS.VALUE * REDUCE(
      PRICES.PRICES, 
      STRUCT(TIME:=PARSE_TIMESTAMP('0001', 'yyyy'), PRICE:=CAST(0.0 AS DOUBLE)), 
      (S, K, V) => (
        CASE WHEN (PARSE_TIMESTAMP(K, 'yyyy-MM-dd''T''HH:mm:ss') < PARSE_TIMESTAMP(METRICS.TIMESTAMP, 'yyyy-MM-dd''T''HH:mm:ss''Z''')) 
          THEN STRUCT(TIME:=PARSE_TIMESTAMP(K, 'yyyy-MM-dd''T''HH:mm:ss'), PRICE:=V) 
          ELSE S 
        END))->PRICE) AS REVENUE,
  METRICS.TIMESTAMP AS TIMESTAMP,
  METRICS.ROWTIME AS METRICS_ROWTIME
FROM METRICS_WITH_CONFIG_KEY METRICS
INNER JOIN PRICES ON (((METRICS.METRICS_TYPE + ':') + LCASE(METRICS.CONFIG_KEY)) = PRICES.KEY)
EMIT CHANGES;

From @agavra

Basically PRICES.PRICES is the "price" table that contains date -> effective_price. What the REDUCE operation does is iterate throughout the PRICES.PRICES and if it finds a key in that map that is less than the target date it sets the price to the price in the map (this leverages the fact that we knew the underlying data would be sorted on time, but we could have just as easily added the condition to be < target date AND > previous_selected_date). When that reduce operation is done, we have the value associated with the latest date and multiply it by the quantity "purchased"

Especially useful if you can't use COLLECT_LIST (one column only)

Unify upper/lower casing of names

Some recipes uppercase STREAM/TABLE names while others don't would be nice to align.

Personally, I prefer to only upper case KEYWORDS and keep names at lower case...

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.