我使用长序列化器作为键,使用字符串序列化器作为值,在我们检索到消息时将消息发布到kafka主题,并且将key和key一起视为垃圾值,如下所示
^@^@^@^AÏÃ<9a>ò
Kafka制作人配置有问题吗?
更新:
以下生产者配置
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
configProps.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutInMillis);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, abcompressionType);
@Bean
public ProducerFactory<Long, String> longProducerFactory() {
return new DefaultKafkaProducerFactory<>(longProducerConfigs());
}
@Bean
public KafkaTemplate<Long, String> longKeyKafkaTemplate() {
return new KafkaTemplate<>(longProducerFactory());
}
和下面的发送呼叫
longKeyKafkaTemplate.send(topicName, key, message);
当我在Kafka工具kafkatool.com中看到键值时,我发现了那些垃圾值,
它不是“垃圾”,它是一个长值,显示为字符串,没有正确的反序列化。
我不熟悉该工具,也不熟悉您是否可以用它为密钥指定反序列化器,但使用命令行工具kafka-console-consumer.sh
您可以指定要使用的反序列化器。
$ kafka-console-consumer
This tool helps to read data from Kafka topics and outputs it to standard output.
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
connect to>
--consumer-property <String: A mechanism to pass user-defined
consumer_prop> properties in the form key=value to
the consumer.
--consumer.config <String: config file> Consumer config properties file. Note
that [consumer-property] takes
precedence over this config.
--enable-systest-events Log lifecycle events of the consumer
in addition to logging consumed
messages. (This is specific for
system tests.)
--formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--from-beginning If the consumer does not already have
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
--group <String: consumer group id> The consumer group id of the consumer.
--help Print usage information.
--isolation-level <String> Set to read_committed in order to
filter out transactional messages
which are not committed. Set to
read_uncommitted to read all
messages. (default: read_uncommitted)
--key-deserializer <String:
deserializer for key>
--max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
--offset <String: consume offset> The offset id to consume from (a non-
negative number), or 'earliest'
which means from beginning, or
'latest' which means from end
(default: latest)
--partition <Integer: partition> The partition to consume from.
Consumption starts from the end of
the partition unless '--offset' is
specified.
--property <String: prop> The properties to initialize the
message formatter. Default
properties include:
print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.
deserializer>
Users can also pass in customized
properties for their formatter; more
specifically, users can pass in
properties keyed with 'key.
deserializer.' and 'value.
deserializer.' prefixes to configure
their deserializers.
--skip-message-on-error If there is an error when processing a
message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.
--topic <String: topic> The topic id to consume on.
--value-deserializer <String:
deserializer for values>
--version Display Kafka version.
--whitelist <String: whitelist> Regular expression specifying
whitelist of topics to include for
consumption.
日志中的错误 2020-05-28 15:52:53.597错误112469---[nio-8080-exec-1]O.a.C.C.C.[.[.[/].[dispatcherServlet]:servlet.Service()对于servlet[dispatcherServlet]在路径[]上下文中引发异常[请求处理失败;嵌套异常是java.lang.IllegalStateException:没
问题内容: 有人可以解释一下G1垃圾收集器的工作原理吗?我还无法在任何地方找到任何全面,易于理解的描述。 谢谢 问题答案: 收集器将堆分成固定大小的区域,并跟踪这些区域中的实时数据。它将一组指针(“记住的集”)保留在区域内和区域外。当认为有必要使用GC时,它将首先收集实时数据较少的区域(因此,“垃圾优先”)。通常,这意味着一步就可以收集整个区域:如果进入一个区域的指针数量为零,则无需对该区域进行标
我试图了解垃圾收集是如何工作的。我很清楚以下几点: 当JVM无法将对象分配给年轻一代时,它将触发小型GC 列表项 当堆满时,JVM将触发完整的GC(两个次要的主要GC)。 但是,年轻一代和老一代中不再引用、符合GC条件但没有触发GC的对象如何(即年轻/老一代堆空间未满,因此没有GC发生) 这是否意味着在GC发生之前,这些对象将保留在年轻/老一代堆空间中? 我的阅读材料 > http://www.c
问题内容: 如果我使用String.intern()来提高性能,因为我可以使用“ ==”来比较内部字符串,是否会遇到垃圾回收问题?内联字符串的垃圾回收机制与普通字符串有何不同? 问题答案: 实际上,这不是垃圾收集优化,而是字符串池优化。调用时,您用其基本引用(首次遇到此字符串的引用,如果尚不知道此引用,则为参考)替换对初始String的引用。 但是,一旦您的字符串不再在应用程序中使用,它将成为垃圾
阅读:Kafka Connect FileStreamSource忽略附加行 看来Kafka现在支持这一观点,他说: https://docs.confluent.io/5.5.0/connect/management/configuring.html#Standalone-示例 是否声明该文件被监视: 开始独立连接 将文件的所有内容添加到主题中,将新行添加到中不会将这些行添加到主题中。是否需要配
问题内容: JavaScript中的垃圾回收如何工作?它类似于.NET垃圾回收吗?难道是因为人们在VBScript中实现垃圾回收很不好,所以人们避免了垃圾回收并建立了对JavaScript作为其标准客户端语言的偏好? 问题答案: 垃圾收集如何工作? 简短的答案是:当某个内存块(例如某个对象)不再可访问时,有资格回收它。何时,如何回收或是否回收它完全取决于实现,并且不同的实现方式也不同。但是在语言级