我正在尝试每500毫秒创建一个
JSON
数据集,并希望将其推送到Kafka
,以便在下游设置一些窗口并执行计算。下面是我的代码:
package KafkaAsSource
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer}
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
import java.util.{Optional, Properties}
object PushingDataToKafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setMaxParallelism(256)
env.enableCheckpointing(5000)
val stream: DataStream[String] = env.fromElements(createData())
stream.addSink(sendToTopic(stream))
}
def getProperties(): Properties = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
return properties
}
def createData(): String = {
val minRange: Int = 0
val maxRange: Int = 1000
var jsonData = ""
for (a <- minRange to maxRange) {
jsonData = "{\n \"id\":\"" + a + "\",\n \"Category\":\"Flink\",\n \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n \n}"
println(jsonData)
Thread.sleep(500)
}
return jsonData
}
def sendToTopic(): Properties = {
val producer = new FlinkKafkaProducer[String](
"topic"
,
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema())
,
getProperties(),
FlinkKafkaProducer.Semantic.EXACTLY_ONCE
)
return producer
}
}
它给了我以下错误:
type mismatch;
found : Any
required: org.apache.flink.streaming.api.functions.sink.SinkFunction[String]
stream.addSink(sendToTopic())
修改代码:
object FlinkTest {
def main(ars: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setMaxParallelism(256)
var stream = env.fromElements("")
//env.enableCheckpointing(5000)
//val stream: DataStream[String] = env.fromElements("hey mc", "1")
val myProducer = new FlinkKafkaProducer[String](
"maddy", // target topic
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
getProperties(), // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
val minRange: Int = 0
val maxRange: Int = 10
var jsonData = ""
for (a <- minRange to maxRange) {
jsonData = "{\n \"id\":\"" + a + "\",\n \"Category\":\"Flink\",\n \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n \n}"
println(a)
Thread.sleep(500)
stream = env.fromElements(jsonData)
println(jsonData)
stream.addSink(myProducer)
}
env.execute("hey")
}
def getProperties(): Properties = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
return properties
}
/*
def createData(): String = {
val minRange: Int = 0
val maxRange: Int = 10
var jsonData = ""
for (a <- minRange to maxRange) {
jsonData = "{\n \"id\":\"" + a + "\",\n \"Category\":\"Flink\",\n \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n \n}"
Thread.sleep(500)
}
return jsonData
}
*/
}
< code>Modified Code
提供了Kafka主题中的数据,但它不保留顺序。我在循环中做错了什么?此外,必须将Flink的版本从< code>1.13.5更改为< code>1.12.2。
我最初使用的是< code > Flink < code > 1 . 13 . 5 、< code >连接器和< code>2.11的< code>Scala。我到底错过了什么?
关于这个循环的几件事:
for (a <- minRange to maxRange) {
jsonData =
"{\n \"id\":\"" + a + "\",\n \"Category\":\"Flink\",\n \"eventTime\":\""
+ DateTimeFormatter
.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
.format(LocalDateTime.now) + "\"\n \n}"
println(a)
Thread.sleep(500)
stream = env.fromElements(jsonData)
println(jsonData)
stream.addSink(myProducer)
}
>
睡眠发生在Flink客户端,仅影响客户端在将作业图提交到集群之前组装作业图所需的时间。它对作业的运行方式没有影响。
这个循环创建了10个独立的管道,这些管道将独立并行运行,所有管道都产生相同的Kafka主题。这些管道将相互竞争。
若要获取所需的行为(跨单个管道的全局排序),您需要从单个源生成所有事件(当然,按顺序),并以 1 的并行度运行作业。像这样的东西会这样做:
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
object FlinkTest {
def main(ars: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setParallelism(1)
val myProducer = ...
val jsonData = (i: Long) => ...
env.fromSequence(0, 9)
.map(i => jsonData(i))
.addSink(myProducer)
env.execute()
}
}
您可以将 maxParallelism 保留为 256(或其默认值 128);它在这里不是特别相关。maxParallelism 是 keyBy
将把密钥散列到的哈希桶的数量,它定义了作业可伸缩性的上限。
我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误: Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr
假设我使用kafka streams(kafka-streams-scala库,版本2.2.0)。 我想出了一种可能的方法:创建流和可变映射,然后使用来跟踪每个键的N个最近值。 我的问题是是否有更好的方法来实现这一点--或者使用streams API,或者至少不使用可变映射。
我在Kafka中配置了3个代理运行在不同的端口上。我用的是春云流Kafka 我正在创建一个获得连续数据流的数据管道。我在kafka topic中存储3个代理运行的数据流。到目前为止没有问题。我担心的是假设3个经纪人倒下了5分钟,然后在那个时候我无法获得关于kafka主题的数据。将会有5分钟的数据丢失。从Spring开机我会得到警告 有没有一种方法可以在所有代理都停机时临时存储数据,并在代理再次启动
我使用的debezium带有一个重路由选项,它将所有表的更改发送到仅一个kafka主题。有了这样的配置,我确信我可以从spark中读到独特的Kafka主题。 但我的问题是:如果我使用debezium而不使用重路由选项,并且我在不同的Kafka主题中对每个表进行了更改,我如何保证我以正确的顺序阅读了所有主题的事件? 我知道我可以使用Spark来订购它,例如通过时间戳,但如果说,一个kafka主题离线
我有一个问题,我做了一个apache kafka消费者在Spring Boot消费3个不同的主题。但是我需要先使用来自第一个主题的所有数据,然后使用来自以下主题的数据,有什么方法可以做到这一点吗?还是你总是用同样的方式读它们?
我有一根绳子: 给定另一个字符串 我想做的是替换<code> bb_seq的顺序,在中添加一个字符,得到: <代码>的总长度?保证与< code>bb_seq相同。 我如何用R实现这一点? 我尝试了这个,但失败了: 我对非正则表达式解决方案持开放态度。