当前位置: 首页 > 知识库问答 >
问题:

如何摆脱“用timeoutMillis = ...关闭Kafka制作人”

公羊宗清
2023-03-14

我通过以下代码将阿帕奇 Avro 格式的消息发送到 Kafka 代理实例:

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(kafkaTopic.getTopicName(), null, null,
                avroConverter.getSchemaId().toString(), convertRecordToByteArray(kafkaRecordToSend));

        String avroSchemaName = null;

        // some of my AVRO schemas are unions, some are simple:
        if (_avroSchema.getTypes().size() == 1) {
            avroSchemaName = _avroSchema.getTypes().get(0).getName();
        } else if (_avroSchema.getTypes().size() == 2) {
            avroSchemaName = _avroSchema.getTypes().get(1).getName();
        }

        // some custom header items...
        producerRecord.headers().add(MessageHeaders.MESSAGE_ID.getText(), messageID.getBytes());
        producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SUBJECT.getText(),
                avroSchemaName.getBytes());
        producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SCHEMA_ID.getText(),
                avroConverter.getSchemaId().toString().getBytes());
        if (multiline) {
            producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_MULTILINE_RECORD_NAME.getText(),
                    MULTILINE_RECORD_NAME.getBytes());
        }

        try {
            Future<RecordMetadata> result = kafkaProducer.send(producerRecord);
            RecordMetadata sendResult = result.get();
            MessageLogger.logResourceBundleMessage(_messages, "JAPCTOAVROKAFKAPRODUCER:DEBUG0002",
                    sendResult.offset());
        } catch (Exception e) {
            MessageLogger.logError(e);
            throw e;
        }

代码工作正常,消息最终在Kafka中并被处理以最终在ImphxDB中。问题是每次发送操作都会产生大量INFO消息(客户端ID号就是一个例子):

  • [生产者客户端 Id=生产者-27902] 关闭 Kafka 生产者超时Millis = 10000 毫秒。
  • [创建者客户端 Id=创建者-27902] 使用超时关闭 Kafka 创建者Millis = 9223372036854775807 毫秒。
  • Kafka开始计时: ...
  • Kafka commitId: ...
  • [创建者客户端 Id = 创建者-27902] 群集 ID:

这把我们的Graylog砸了。

我使用类似的代码发送字符串格式的消息。此代码在不产生INFO消息的情况下执行…

ProducerRecord<String, String> recordToSend = new ProducerRecord<>(queueName, messageText);
    recordToSend.headers().add("messageID", messageID.getBytes());

    Future<RecordMetadata> result = _producerConnection.send(recordToSend);
    

我知道INFO消息是从classorg.apache.kafka.clients.producer.KafkaProducer记录的。我需要删除这些消息,但我无法访问日志记录。mxl定义Graylog的记录器属性。

有没有办法通过POM条目或编程来消除这些消息?

共有1个答案

傅和璧
2023-03-14

代码行为的原因是一个设计缺陷:上面帖子中的代码被放在一个方法中,该方法被调用来向Kafka发送消息。KafkaProducer类在该方法中以及每次调用该方法时都被实例化。令人惊讶的是,KafkaProducer不仅在调用代码的显式close()处,而且在实例的强引用丢失时(在我的例子中,当代码离开方法时),发出关闭timeoutMillis=Kafka Producer的命令。在后一种情况下,timeoutMillits设置为9223372036854775807(最大的长数字)。

为了消除许多消息,我将KafkaProducer实例化移出了方法,并将实例变量设置为类属性,我不再在send(…)之后调用显式的

此外,我将实例化 KafkaProducer 的类的实例更改为强引用类成员。

通过这样做,我在实例化时收到Kafka生产者的一些消息,然后沉默了。

 类似资料:
  • 问题内容: 到目前为止,给我带来了很多麻烦,所以我想摆脱它。尽管spring框架文档清楚地说明了应该做的事情,但实际上 并没有摘要列表。 所以我一直坚持删除并得到错误 -在名称为的中找不到带有请求的映射 对于所有应该由控制器类解决的Url(在这种情况下:)。有什么建议可以让我了解更多信息吗?我非常想知道到底由代表什么标签。 问题答案: 你可以用来自定义定义的每个bean 。现在,javadocs详

  • 问题内容: 到现在为止,给我造成了很多麻烦,所以我想摆脱它。尽管spring框架文档清楚地说明了应该做的事情,但实际上 并没有摘要列表。 所以我坚持删除并现在得到错误 WARN osweb.servlet.PageNotFound-在DispatcherServlet中,名称为’workoutsensor’的URI [/ webapp / trainees]的HTTP请求未找到映射 对于所有应该由

  • 我有一个多线程应用程序,它使用producer类生成消息,之前我使用下面的代码为每个请求创建producer。其中KafkaProducer是新建的,每个请求如下: 然后我阅读了关于生产者的Kafka文档,并了解到我们应该使用单个生产者实例来获得良好的性能。 然后我在一个singleton类中创建了KafkaProducer的单个实例。 现在什么时候 或者我们如何在关闭后重新连接到生产者。问题是如

  • 我正在使用sping-kafka 2.2.7-RELEASE并编写生产者。我正在阅读从这里留档如下所述。 “从2.5版开始,每个版本都扩展了KafkaResourceFactory。这允许在运行时通过向它们的配置添加一个供应商来更改引导服务器:setBootstrapServersSupply ier(() → … ). 这将被调用以获取所有新连接的服务器列表。消费者和生产者通常是长期存在的。要关

  • 我有JavaWebService代码在我的eclipse。我使用了@WebService@Webmethod,@XmlElements,@XmlType,@XmlAccessorType 现在我正在使用cxf框架中的java2ws命令生成wsdl。这是命令 我的wsdl文件包含agr0作为我不想要的名称,因为当我将其导入SoapUI时。它正在字段周围添加标记。 下面是带有arg0的wsdl部分 下

  • 问题内容: 我正在使用Apache Derby嵌入式数据库在Maven项目中进行单元测试。不幸的是,每当运行测试时,我最终都会在项目根目录中找到该文件。数据库本身是在目录()中创建的,因此这不是问题。在查阅参考指南之后, 我尝试在JDBC url()上设置参数,但这似乎是针对其他日志的,因此仍然会出现。 任何帮助深表感谢。 问题答案: 您可以通过创建以下类来摆脱文件 并设置JVM系统属性,例如,使