I am trying to follow the tutorials and also the closed issue #4 that helped me fixing an earlier problem.
I get the following error when trying this example here https://github.com/jeremyber-aws/amazon-kinesis-data-analytics-java-examples/blob/master/python/TumblingWindow/tumbling-windows.py
java.lang.RuntimeException: RowTime field should not be null, please convert it to a non-null long value.
at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:115)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$23.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:946)
at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:108)
at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:329)
at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:346)
at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:343)
at software.amazon.kinesis.connectors.flink.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:930)
at software.amazon.kinesis.connectors.flink.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:194)
at software.amazon.kinesis.connectors.flink.internals.ShardConsumer.lambda$run$0(ShardConsumer.java:120)
at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:115)
at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102)
at software.amazon.kinesis.connectors.flink.internals.ShardConsumer.run(ShardConsumer.java:112)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Same issue while doing another KDA with following tables:
def create_source_table(table_name, stream_name, region, stream_initpos):
return """CREATE TABLE {0} (
id VARCHAR(64),
key VARCHAR(64),
network VARCHAR(64),
status_code INTEGER,
duration BIGINT,
request_count BIGINT,
response_size BIGINT,
duration_min BIGINT,
duration_max BIGINT,
duration_sum BIGINT,
ts BIGINT,
ts_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
WATERMARK FOR ts_time AS ts_time - INTERVAL '5' SECOND
)
PARTITIONED BY (id)
WITH (
'connector' = 'kinesis',
'stream' = '{1}',
'aws.region' = '{2}',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.sessionName' = 'test',
'aws.credentials.role.arn' = 'arn:aws:iam::1234567890:role/test',
'scan.stream.initpos' = '{3}',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)""".format(table_name, stream_name, region, stream_initpos)
def create_sink_table(table_name, bucket_name):
return """CREATE TABLE {0} (
id VARCHAR(64),
key VARCHAR(64),
network VARCHAR(64),
status_code INTEGER,
duration BIGINT,
request_count BIGINT,
response_size BIGINT,
duration_min BIGINT,
duration_max BIGINT,
duration_sum BIGINT,
ts_time TIMESTAMP(3),
WATERMARK FOR ts_time AS ts_time - INTERVAL '5' SECOND
)
PARTITIONED BY (id)
WITH (
'connector' = 'filesystem',
'path' = 's3a://{1}/',
'format' = 'csv',
'sink.partition-commit.policy.kind' = 'success-file',
'sink.partition-commit.delay' = '1 min'
)""".format(table_name, bucket_name)
and inserting :
table_env.execute_sql(
"""INSERT INTO {0}
SELECT
id,
key,
network,
status_code,
COUNT(*) as request_count,
SUM(response_size) as response_size,
MIN(duration) as duration_min,
MAX(duration) as duration_max,
SUM(duration) as duration_sum,
TUMBLE_START(ts_time, INTERVAL '1' MINUTE)
FROM {1}
GROUP BY TUMBLE(ts_time, INTERVAL '1' minute), id, key, network, status_code
""".format(output_table_name, input_table_name)
)
Source: TableSourceScan(table=[[default_catalog, default_database, input_table]], fields=[id, key, network, status_code, duration, request_count, response_size, duration_min, duration_max, duration_sum, ts]) -> Calc(select=[id, key, network, status_code, duration, request_count, response_size, duration_min, duration_max, duration_sum, ts, TO_TIMESTAMP(FROM_UNIXTIME(ts)) AS ts_time]) -> WatermarkAssigner(rowtime=[ts_time], watermark=[(ts_time - 5000:INTERVAL SECOND)]) -> Calc(select=[ts_time, id, auth_key, network, status_code, response_size, duration]) (1/1)