我想在Kafka上用Flink设置一个基本的生产者-消费者,但是我很难通过Java向现有消费者生成数据。
CLI解决方案
>
我设置了一个Kafka代理
使用kafka_2.11-2.4.0
zip fromhttps://kafka.apache.org/downloads
bin/zookeeper服务器启动。sh配置/动物园管理员。属性
和bin/kafka服务器启动。sh config/server。属性
我使用创建了一个名为transactions1的主题
bin/kafka主题。sh--创建--引导服务器localhost:9092--复制因子1--分区1--主题事务1
现在我可以在命令行上使用生产者和消费者来查看主题已经创建并工作。
设置我运行的消费者
bin/kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--主题事务1--从开始
现在,如果任何制作人向主题transactions1
发送数据,我将在消费者控制台中看到它。
我通过运行
bin/kafka控制台制作人。sh--代理列表本地主机:9092--主题事务1
并在cli中的生产者中输入以下数据线,这些数据线也显示在消费者cli中。
{“txnID”:1,“金额”:100.0,“账户”:“AC1”}
{"txnID": 2,"amt": 10.0,"帐户":"AC2"}
{"txnID": 3,"amt": 20.0,"帐户":"AC3"}
现在我想在Java代码中复制第三步,即生产者和消费者,这是这个问题的核心问题。
...
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
compile group: 'com.twitter', name: 'chill-thrift', version: '0.7.6'
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.11.0'
compile group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6'
compile group: 'org.apache.thrift', name: 'protobuf-java', version: '3.7.0'
}
...
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
public class Transaction {
public final int txnID;
public final float amt;
public final String account;
public Transaction(int txnID, float amt, String account) {
this.txnID = txnID;
this.amt = amt;
this.account = account;
}
public String toJSONString() {
Gson gson = new Gson();
return gson.toJson(this);
}
public static Transaction fromJSONString(String some) {
Gson gson = new Gson();
return gson.fromJson(some, Transaction.class);
}
public static MapFunction<String, String> mapTransactions() {
MapFunction<String, String> map = new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value != null || value.trim().length() > 0) {
try {
return fromJSONString(value).toJSONString();
} catch (Exception e) {
return "";
}
}
return "";
}
};
return map;
}
@Override
public String toString() {
return "Transaction{" +
"txnID=" + txnID +
", amt=" + amt +
", account='" + account + '\'' +
'}';
}
}
public class SetupSpike {
public static void main(String[] args) throws Exception {
System.out.println("begin");
List<Transaction> txns = new ArrayList<Transaction>(){{
add(new Transaction(1, 100, "AC1"));
add(new Transaction(2, 10, "AC2"));
add(new Transaction(3, 20, "AC3"));
}};
// This list txns needs to be serialized in Flink as Transaction.class->String->ByteArray
//via producer and then to the topic in Kafka broker
//and deserialized as ByteArray->String->Transaction.class from the Consumer in Flink reading Kafka broker.
String topic = "transactions1";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", topic);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//env.getConfig().addDefaultKryoSerializer(Transaction.class, TBaseSerializer.class);
// working Consumer logic below which needs edit if you change serialization
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
DataStream<String> stream = env.addSource(myConsumer).map(Transaction::toJSONString);
//working Producer logic below which works if you are sinking a pre-existing DataStream
//but needs editing to work with Java List<Transaction> datatype.
System.out.println("sinking expanded stream");
MapFunction<String, String> etl = new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
if (value != null || value.trim().length() > 0) {
try {
return fromJSONString(value).toJSONString();
} catch (Exception e) {
return "";
}
}
return "";
}
};
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(topic,
new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
try {
System.out.println(element);
return new ProducerRecord<byte[], byte[]>(topic, stringToBytes(etl.map(element)));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}, properties, Semantic.EXACTLY_ONCE);
// stream.timeWindowAll(Time.minutes(1));
stream.addSink(myProducer);
JobExecutionResult execute = env.execute();
}
}
正如您所看到的,我无法使用提供的Listtxns
完成此操作。以上是我可以从Flink留档收集的工作代码,用于重定向主题流数据,并通过Cli生产者手动发送数据。问题是用java编写KafkaProducer代码,将数据发送到主题,这进一步加剧了以下问题
与Flink合作过的人能否帮助我在Flink中生成txns
列表以transactions1
主题,然后验证它是否与消费者合作?此外,在下沉前添加时间戳或进行一些处理等问题上的任何帮助都将大有裨益。您可以在上找到源代码https://github.com/devssh/kafkaFlinkSpike其目的是生成Flink样板文件,从内存存储中添加“AC1”的详细信息,并将其与实时出现的事务事件连接起来,以向用户发送扩展的输出。
以下几点没有特别的顺序:
最好不要将Flink 1.9.2版与1.9.0版混合在一起,就像您在这里所做的那样:
compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
有关如何使用时间戳、水印、keyBy、windows等的教程,请参阅Ververica提供的在线培训资料。
要使用List
DataStream<Transaction> transactions = env.fromCollection(txns);
有关使用Flink和Kafka时如何处理序列化/反序列化的示例,请参阅Flink操作平台,特别是在ClickEventCount中使用的
ClickEventDeserializationSchema
和ClickEventStatisticsSerializationSchema
。java,并在此处定义。(注意:此游乐场尚未更新为Flink 1.10。)
Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka
这是我的消费者: 所以当运行我的制作人时,它最终会出错。任何人都知道这意味着什么,如果这可能是错的。
我为Kafka建立了一个docker形象(Wurstmeister/Kafka-Docker)。在docker容器中,我可以使用内置的shell脚本创建主题、生成消息和使用消息。现在,我使用https://github.com/mapr-demos/kafka-sample-programs托管的代码从我的主机连接到kafka broker。在构建和运行程序之后,什么都没有发生,程序就会堆积起来。
我创建了一个带有三个分区的Kafka主题。使用Spring Kafka中的ProducerFactory,我可以创建一个producer实例。但是,我想创建三个生产者实例,因为我有三个分区。类似地,我想要三个consumer的实例。我该怎么做?请帮忙。
我有一个消费者作为生产者消费者模式的一部分: 简化: 如果我移除 通过将线程设置为睡眠,CPU使用率攀升到极高的水平(13%),而不是0%。 此外,如果我实例化该类的多个实例,则每个实例的CPU使用率都会以13%的增量攀升。 大约每分钟(可能每30秒)都会向BlockingCollection添加一个新的LogItem,并将适用的消息写入文件。 有没有可能线程以某种方式阻止了其他线程的运行,而系统
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?