所以,我试图在我的Flink Kafka流媒体工作中启用EXACTLY_ONCE语义以及检查点。
但是我没有让它工作,所以我尝试从Github下载测试示例代码:https://github.com/apache/flink/blob/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java
所以运行这个很好。然而,当启用检查点时,我会出错。或者,如果我将EXACTLY\u ONCE更改为至少\u LEAST\u ONCE语义并启用检查点,则效果很好。但是,当将其更改为EXACTLY\u一次时,我再次出现此错误。
我得到的例外是:
org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1099)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1036)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:360)
... 12 more
为了在我的环境中工作,我对代码进行了轻微的更改。我正在docker内的flink操作游乐场中运行它。(这https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/docker-playgrounds/flink-operations-playground.html)。最新版本,1.10和内部提供的kafka是verison 2.2.1
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1_000);
String inputTopic = "my-input";
String outputTopic = "my-output";
String kafkaHost = "kafka:9092";
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
DataStream<KafkaEvent> input = env
.addSource(new FlinkKafkaConsumer<>(inputTopic, new KafkaEventSchema(), kafkaProps)
.assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
.keyBy("word")
.map(new RollingAdditionMapper());
input.addSink(
new FlinkKafkaProducer<>(
outputTopic,
new KafkaEventSerializationSchema(outputTopic),
kafkaProps,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE));
env.execute("Modern Kafka Example");
}
示例中的其他类可以找到:https://github.com/apache/flink/tree/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base
我确实尝试将序列化更改为使用KafkaSerializationSchema而不是使用SerializationSchema的示例代码。然而,下面的代码也没有帮助。同样的错误。
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaEventSerializationSchema implements KafkaSerializationSchema<KafkaEvent> {
/**
*
*/
private static final long serialVersionUID = 1L;
private String topic;
public KafkaEventSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(KafkaEvent element, Long timestamp) {
return new ProducerRecord<>(topic, element.toString().getBytes());
}
}
感谢所有帮助。我还没有在flink和kafka之间找到任何关于Justice\ONCE garantuee的在线工作代码。只加载有关它的文章,而不加载实际的实体工作代码。这就是我在这里想要实现的全部目标。
我遇到了同样的问题,并明确地为生产者设置了一个超时值<代码>属性。setProperty(“transaction.timeout.ms”,“900000”)
当我尝试使用RESTEasy检索数据时,会出现以下异常: 原因:org。科德豪斯。杰克逊。地图JsonMappingException:找不到类org的序列化程序。冬眠代理波乔。javassist。JavassistLazInitializer,未发现创建BeanSerializer的属性(为了避免异常,请禁用SerializationConfig.Feature.FAIL_ON_EMPTY_be
看起来杰克逊无法处理同名的属性和属性。
本文向大家介绍.net的序列化与反序列化实例,包括了.net的序列化与反序列化实例的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了.net的序列化与反序列化的实现方法。分享给大家供大家参考。具体方法如下: 1.序列化与反序列化概述 C#中如果需要:将一个结构很复杂的类的对象存储起来,或者通过网路传输到远程的客户端程序中去,这时就需要用到序列化,反序列化(Serialization & De
问题内容: 我有一个发送到服务器或从服务器发送的用户对象。发送用户对象时,我不想将哈希密码发送给客户端。因此,我添加了password属性,但这也阻止了将其反序列化为密码,这使得在没有密码的情况下很难注册用户。 我怎样才能只应用序列化而不是反序列化?我使用的是Spring JSONView,因此对的控制不多。 我尝试过的事情: 添加到属性 仅添加getter方法 问题答案: 确切的操作方法取决于你
对于初学者,我希望将所有输入数据流转换为KeyedStreams。因此,我将输入数据流映射为一个元组,然后应用KeyBy将其转换为KeyStream。 我总是遇到序列化的问题,我试着按照本指南https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html操作,但没有成功。 我想知道的是: null 测试类 错
采用net.sf.json.JSONObject处理数据时,type字段序列化后能不能和采用Map处理数据时输出的结果一致呢? 输出 序列化两次type 输出 输出与采用Map还是不同,Map输出的type可以直接反序列化为字符串数组,但是序列化两次的不能直接反序列化为字符串数组