我有一个小程序可以用阿帕奇Kafka来计算颜色的数量
public class FavouriteColor {
private static final String INPUT_TOPIC_NAME = "favourite-colour-input";
private static final String OUTPUT_TOPIC_NAME = "favourite-colour-output";
private static final String INTERMEDIATE_TOPIC_NAME = "favourite-colour-output";
private static final String APPLICATION_ID = "favourite-colour-java";
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(INPUT_TOPIC_NAME);
KStream<String, String> usersAndColours = textLines
.filter((key, value) -> value.contains(","))
.selectKey((key, value) -> value.split(",")[0].toLowerCase())
.mapValues(value -> value.split(",")[1].toLowerCase())
.filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour));
usersAndColours.to(INTERMEDIATE_TOPIC_NAME);
KTable<String, String> usersAndColoursTable = builder.table(INTERMEDIATE_TOPIC_NAME);
KTable<String, Long> favouriteColours = usersAndColoursTable
.groupBy((user, colour) -> new KeyValue<>(colour, colour))
.count(Named.as("CountsByColours"));
favouriteColours.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();
System.out.println(streams);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
创建主题,生产者/消费者开始使用终端:
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-input
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic favourite-colour-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
kafka-console-producer --bootstrap-server localhost:9092 --topic favourite-colour-input
我在终端中提供了以下输入:
stephane,blue
john,green
stephane,red
alice,red
我在消费者终端收到错误:
stephane Processed a total of 1 messages
[2021-11-27 21:31:58,155] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:26)
at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:21)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:519)
at scala.Option.map(Option.scala:242)
at kafka.tools.DefaultMessageFormatter.deserialize$1(ConsoleConsumer.scala:519)
at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:568)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
这里有什么问题?我做了简短的研究,发现了其他人提出的类似问题,但这些解决方案似乎对我不起作用。
您将值反序列化器定义为Long的值,但看起来您的数据是一个字符串。
我正在尝试Kafka Streams。编写一个简单的应用程序,我在其中计算重复的消息。 消息: 等。 我正在尝试通过。用它作为钥匙。然后将其用作值。然后按键分组,查看在每个会话中复制了哪些消息。 这是代码: KTable 主题 常规生产者生成重复。但是,当我使用控制台使用者查看它时,它会崩溃并显示错误。然后,我在控制台使用者上使用 --跳过错误消息标志。现在我看到成千上万的这样的线条 谁能帮帮我这
问题内容: 我正在将SocketServer模块用于TCP服务器。我在此函数遇到一些问题,因为传入的数据包总是具有不同的大小,所以如果我指定(我尝试使用更大的值,并且更小),则它在2或3个请求后卡住,因为数据包长度会变小(我认为),然后服务器卡住直到超时。 如果客户端通过同一源端口发送多个请求,但服务器卡住,则非常感谢您的帮助,谢谢! 问题答案: 网络 总是 不可预测的。TCP使许多这种随机行为为
当我尝试接收大量数据时,它会被切断,我必须按enter键才能获取其余数据。起初,我可以增加一点,但它仍然不会收到所有的。正如您所看到的,我增加了conn.recv()上的缓冲区,但它仍然无法获取所有数据。它会在某一点切断它。我必须在原始输入上按enter键才能接收其余数据。我是否可以一次获取所有数据?这是密码。
我正在使用Kafka源和接收器连接器创建一个数据管道。源连接器从SQL数据库消费并发布到主题,而接收器连接器订阅主题并放入其他SQL数据库。表有16 GB的数据。现在的问题是,数据不能从一个数据库传输到另一个数据库。但是,如果表的大小很小,比如1000行,那么数据可以成功传输。 源连接器配置: 源连接器日志: 有人能指导我如何调整Kafka源连接器以传输大数据吗?
我使用Android应用程序通过TCP套接字与同一局域网上的PC java应用程序进行通信、发送和接收消息。下面是我在android中使用的Asynctask的代码,用于发送消息并从PC接收回复: } 我在onPostExcecute中的祝酒词中显示PC的回复。 Android通过BufferedWriter发送消息,而PC上的java应用程序在BufferedReader中接收消息。 PC在收到
数据应该传递给我的portlet类,但它不会。这是我的类代码: 这个类旨在将数据传递回JSP。方法工作正常,就像在jsp中一样,我可以使用但是,它总是返回“no-param”,这意味着返回null。 因此,似乎正确调用了我的方法,但它没有接收表单数据。错误在哪里,我错了什么? 我将portlet下载到我的本地机器上,部署到本地演示Liferay安装中,它工作了!所以代码应该是可以的,这一定是一些服