Need to convert this from Docker local to Confluent Cloud (e.g. custom schemas as used in'schema.filename' = '/usr/share/java/confluent-common/deposit-val.avsc'
is not supported in Confluent Cloud)
echo "CREATE SOURCE CONNECTOR datagen_transactions_01 WITH ( \
'connector.class' = 'io.confluent.kafka.connect.datagen.DatagenConnector' \
, 'kafka.topic' = 'transactions' \
, 'schema.filename' = '/usr/share/java/confluent-common/deposit-val.avsc' \
, 'iterations' = '5000000' \
, 'tasks' = '2' \
, 'tasks.max' = '2' \
, 'topic.creation.default.partitions' = '2' \
, 'topic.creation.default.replication.factor' = '1' \
);" | ksql http://ksql1:8088
echo " CREATE STREAM IF NOT EXISTS TRANSACTIONS WITH (kafka_topic='transactions', value_format='avro'); " | ksql http://ksql1:8088
echo " CREATE STREAM IF NOT EXISTS ST_TRANSACTIONS WITH (kafka_topic='st_transactions', value_format='avro') \
AS SELECT CAST(DEP_ACCOUNT_NO as STRING) as DEP_ACCOUNT_NO \
, CONCAT('$$',CAST(DEP_BALANCE_INT + (DEP_BALANCE_CENTS_INT/100) AS STRING)) as BALANCE \
, UNIX_TIMESTAMP() - rowtime AS delta_ms \
, rowtime AS ts_1_capture \
, UNIX_TIMESTAMP() AS ts_2_stream \
, TIMESTAMPTOSTRING(ROWTIME , 'yyyy-MM-dd HH:mm:ss.SSS') AS ts_1 \
, TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss.SSS') AS ts_2 \
FROM TRANSACTIONS;" | ksql http://ksql1:8088
echo "CREATE TABLE IF NOT EXISTS balance_cache WITH (kafka_topic='balance_cache', value_format='avro',key_format='avro') AS \
SELECT DEP_ACCOUNT_NO \
, latest_by_offset(balance) as BALANCE \
, latest_by_offset(DELTA_MS) as DELTA_1_MS \
, (Latest_by_offset(UNIX_TIMESTAMP()) - Latest_by_offset(ts_1_capture)) AS delta_2_ms \
, latest_by_offset(TS_1_CAPTURE) as TS_1_CAPTURE \
, latest_by_offset(TS_2_STREAM) as TS_2_STREAM \
, Latest_by_offset(UNIX_TIMESTAMP()) AS TS_3_CACHE \
, Latest_by_offset(ts_1) as TS_1 \
, Latest_by_offset(ts_2) as TS_2 \
, Latest_by_Offset(TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss.SSS')) AS ts_3 \
FROM ST_TRANSACTIONS \
GROUP BY DEP_ACCOUNT_NO; " | ksql http://ksql1:8088
echo "CREATE TABLE IF NOT EXISTS influx_balance_cache_latency WITH (kafka_topic='influx_balance_cache_latency',value_format='KAFKA') AS \
SELECT 'X' as X, replace(replace(replace('{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"map\", \
\"keys\":{\"type\":\"string\", \"optional\":false}, \
\"values\":{\"type\":\"string\",\"optional\":false},\"optional\":false, \
\"field\":\"tags\"},{\"type\":\"string\",\"optional\":false, \
\"field\":\"time\"}, {\"type\":\"double\",\"optional\":true, \
\"field\":\"value\"}], \
\"optional\":false,\"version\":1}, \
\"payload\":{\"tags\":{\"id\":\"REPLACEME_ID\"}, \
\"time\":\"REPLACEME_TS\", \
\"value\":REPLACEME_VALUE}}' \
,'REPLACEME_ID' ,TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd-HH:mm:ss')) \
,'REPLACEME_TS' ,TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd-HH:mm:ss')) \
,'REPLACEME_VALUE' ,CAST(Avg( UNIX_TIMESTAMP() - ts_1_capture) AS STRING)) \
as influx_json_row \
FROM ST_TRANSACTIONS \
WINDOW TUMBLING (SIZE 5 SECONDS) \
GROUP BY 'X'; " | ksql http://ksql1:8088
echo "CREATE TABLE IF NOT EXISTS influx_balance_cache_count WITH (kafka_topic='influx_balance_cache_count',value_format='KAFKA') AS \
SELECT 'X' as X, replace(replace(replace('{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"map\", \
\"keys\":{\"type\":\"string\", \"optional\":false}, \
\"values\":{\"type\":\"string\",\"optional\":false},\"optional\":false, \
\"field\":\"tags\"},{\"type\":\"string\",\"optional\":false, \
\"field\":\"time\"}, {\"type\":\"double\",\"optional\":true, \
\"field\":\"value\"}], \
\"optional\":false,\"version\":1}, \
\"payload\":{\"tags\":{\"id\":\"REPLACEME_ID\"}, \
\"time\":\"REPLACEME_TS\", \
\"value\":REPLACEME_VALUE}}' \
,'REPLACEME_ID' ,TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyy-MM-dd-HH:mm:ss')) \
,'REPLACEME_TS' ,TIMESTAMPTOSTRING(UNIX_TIMESTAMP(), 'yyyy-MM-dd-HH:mm:ss')) \
,'REPLACEME_VALUE' ,CAST(count(*) AS STRING) ) \
as influx_json_row \
FROM BALANCE_CACHE \
GROUP BY 'X'; " | ksql http://ksql1:8088
# turning on TRACE for influxDB connectors
#sleep 5
#curl -s -X PUT -H "Content-Type:application/json" http://connect1:8083/admin/loggers/io.confluent.influxdb -d '{"level": "TRACE"}'
sleep 5
echo " CREATE SINK CONNECTOR INFLUX_COUNT_BALANCE_CACHE WITH ( \
'connector.class'='io.confluent.influxdb.InfluxDBSinkConnector' \
, 'tasks.max'='2' \
, 'topics'='influx_balance_cache_count' \
, 'influxdb.url'='http://influxdb:8086' \
, 'influxdb.db'='telegraf' \
, 'influxdb.username'='root' \
, 'influxdb.password'='root' \
,'measurement.name.format'='balance_cache_count' \
, 'value.converter'='org.apache.kafka.connect.json.JsonConverter' \
); " | ksql http://ksql1:8088
sleep 5
echo " CREATE SINK CONNECTOR INFLUX_LATENCY_BALANCE_CACHE WITH ( \
'connector.class'='io.confluent.influxdb.InfluxDBSinkConnector' \
, 'tasks.max'='2' \
, 'topics'='influx_balance_cache_latency' \
, 'influxdb.url'='http://influxdb:8086' \
, 'influxdb.db'='telegraf' \
, 'influxdb.username'='root' \
, 'influxdb.password'='root' \
,'measurement.name.format'='balance_cache_latency' \
, 'value.converter'='org.apache.kafka.connect.json.JsonConverter' \
); " | ksql http://ksql1:8088
sleep 10
echo "CREATE SOURCE CONNECTOR datagen_transactions_02 WITH ( \
'connector.class' = 'io.confluent.kafka.connect.datagen.DatagenConnector' \
, 'kafka.topic' = 'transactions' \
,'schema.filename' = '/usr/share/java/confluent-common/deposit-val.avsc' \
, 'iterations' = '5000000' \
, 'tasks' = '1' \
, 'tasks.max' = '1' \
, 'topic.creation.default.partitions' = '2' \
, 'topic.creation.default.replication.factor' = '1' \
);" | ksql http://ksql1:8088
sleep 3
curl -X PUT http://connect1:8083/connectors/DATAGEN_TRANSACTIONS_02/pause
curl -X GET http://connect1:8083/connectors/DATAGEN_TRANSACTIONS_02/status
sleep 3
echo "CREATE SOURCE CONNECTOR datagen_transactions_03 WITH ( \
'connector.class' = 'io.confluent.kafka.connect.datagen.DatagenConnector' \
, 'kafka.topic' = 'transactions' \
,'schema.filename' = '/usr/share/java/confluent-common/deposit-val.avsc' \
, 'iterations' = '5000000' \
, 'tasks' = '1' \
, 'tasks.max' = '1' \
, 'topic.creation.default.partitions' = '2' \
, 'topic.creation.default.replication.factor' = '1' \
);" | ksql http://ksql1:8088
sleep 3
curl -X PUT http://connect1:8083/connectors/DATAGEN_TRANSACTIONS_03/pause
curl -X GET http://connect1:8083/connectors/DATAGEN_TRANSACTIONS_03/status
echo "DROP CONNECTOR pull_queries_logs; " | ksql http://ksql1:8088
echo " CREATE Source CONNECTOR pull_queries_logs WITH ( \
'tasks.max'= '1' \
,'name'='PULL_QUERIES_LOGS' \
,'connector.class'='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector' \
,'input.file.pattern'='.*\.pending_load' \
, 'input.path'='/tmp/consumer-output' \
, 'error.path'='/tmp/consumer-output/error' \
,'finished.path'='/tmp/consumer-output/finished' \
,'cleanup.policy'='DELETE' \
,'halt.on.error'='false' \
,'topic'='pull-queries-logs' \
,'confluent.topic.bootstrap.servers'= 'kafka1:9092' \
,'confluent.topic.replication.factor'= '1' \
,'schema.generation.enabled'='true' \
,'value.schema'='{\"name\" : \"pull.queries.timing\", \"type\" : \"STRUCT\", \"isOptional\" : false, \"fieldSchemas\" : { \"ACCOUNT_ID\" : { \"type\" : \"STRING\", \"isOptional\" : false }, \"ACCOUNT_BALANCE\" : { \"type\" : \"STRING\", \"isOptional\" : true }, \"PULL_QUERY_MS\" : { \"type\" : \"STRING\" , \"isOptional\" : true } } }' \
,'key.schema'='{ \"name\" : \"pull.queries.timing\", \"type\" : \"STRUCT\", \"isOptional\" : false, \"fieldSchemas\" : { \"ACCOUNT_ID\" : { \"type\" : \"STRING\", \"isOptional\" : false } } }' \
);" | ksql http://ksql1:8088
#sleep required for the connector to find the primer file; load it; register the schema
sleep 30
echo "create stream st_pull_queries_logs with (kafka_topic='pull-queries-logs', value_format='AVRO');" \
| ksql http://ksql1:8088
sleep 10
echo "CREATE TABLE IF NOT EXISTS influx_balance_cache_query_latency WITH (kafka_topic='influx_balance_cache_query_latency',value_format='KAFKA') AS \
SELECT 'X' as X, replace(replace(replace('{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"map\", \
\"keys\":{\"type\":\"string\", \"optional\":false}, \
\"values\":{\"type\":\"string\",\"optional\":false},\"optional\":false, \
\"field\":\"tags\"},{\"type\":\"string\",\"optional\":false, \
\"field\":\"time\"}, {\"type\":\"double\",\"optional\":true, \
\"field\":\"value\"}], \
\"optional\":false,\"version\":1}, \
\"payload\":{\"tags\":{\"id\":\"REPLACEME_ID\"}, \
\"value\":REPLACEME_VALUE}}' \
,'REPLACEME_ID' ,TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd-HH:mm:ss')) \
,'REPLACEME_TS' ,TIMESTAMPTOSTRING(WINDOWEND, 'yyyy-MM-dd-HH:mm:ss')) \
,'REPLACEME_VALUE' ,CAST(avg(CAST(PULL_QUERY_MS AS DOUBLE)) AS STRING) ) \
as influx_balance_cache_query_latency \
FROM st_pull_queries_logs \
WINDOW TUMBLING (SIZE 5 SECONDS) \
GROUP BY 'X';