// 配置 Kafka 消费者
val properties = System.getProperties
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
// 创建 Kafka 数据源
val kafkaSource = new FlinkKafkaConsumer[String]("test_topic", new SimpleStringSchema(), properties)
// 添加 Watermark 生成器
val watermarkGenerator = new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(10)) {
override def extractTimestamp(element: String): Long = {
// 解析消息中的时间戳,这里假设消息格式为 "message,timestamp"
val parts = element.split(",")
parts(1).toLong
}
}
// 将 FlinkKafkaConsumer 转换为 DataStream
val stream = env.addSource(kafkaSource)
.map(new MapFunction[String, (String, Long)] {
override def map(value: String): (String, Long) = {
val parts = value.split(",")
(parts(0), parts(1).toLong)
}
})
.assignTimestampsAndWatermarks(watermarkGenerator)
// 对数据进行处理
stream.print()
// 启动 Flink 作业
env.execute("Flink Kafka Example")