当前位置: 首页 > 知识库问答 >
问题:

Kafka elasticsearch连接器-“刷新超时过期,记录未刷新:”

侯涵煦
2023-03-14

我有一个奇怪的问题与Kafka->elasticsearch连接器。当我第一次启动时,我在elasticsearch中收到了一个新数据,并通过kibana dashboard进行了检查,但当我使用同一个producer应用程序在kafka中生成新数据并尝试再启动一次connector时,我在elasticsearch中没有得到任何新数据。现在我遇到了这样的错误:

[2018-02-04 21:38:04,987] ERROR WorkerSinkTask{id=log-platform-elastic-0} Commit of offsets threw an unexpected exception for sequence number 14: null (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
org.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 15805

我正在使用next命令运行连接器:

/usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties log-platform-elastic.properties

connect-avro-standalone.properties:

bootstrap.servers=kafka-0.kafka-hs:9093,kafka-1.kafka-hs:9093,kafka-2.kafka-hs:9093
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
#rest.host.name=
rest.port=8084
#rest.advertised.host.name=
#rest.advertised.port=
plugin.path=/usr/share/java

和log-platform-elastic.properties:

name=log-platform-elastic
key.converter=org.apache.kafka.connect.storage.StringConverter
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=member_sync_log, order_history_sync_log # ... and many others
key.ignore=true
connection.url=http://elasticsearch:9200
type.name=log

我检查了与kafka brokers、elasticsearch和schema-registry的连接(此时schema-registry和connector在同一台主机上),一切正常。Kafka代理正在端口9093上运行,我可以使用kafka-avro-console-consumer从主题中读取数据。我将非常感激你在这方面的帮助!

共有1个答案

朱承载
2023-03-14

只需将flush.timeout.ms更新到大于10000(默认值为10秒)

根据文件

flush.timeout.ms用于定期刷新的超时(以毫秒为单位),以及在添加记录时等待已完成的请求提供可用的缓冲区空间。如果超过此超时,任务将失败。

 类似资料:
  • 使用Firebase.auth(),如果身份验证状态没有改变,如何刷新/重新获取用户? 我有一个使用电话号码进行身份验证的用户。我向点击该链接的用户发送电子邮件验证链接。该链接验证用户电子邮件(更新记录),并显示一个“继续”按钮,该按钮(通过deeplink)返回到应用程序。回到应用后,我想从 Firebase 重新提取用户记录,以便查看用户电子邮件是否已通过验证。我该怎么做?

  • 我制作了一个非常简单的登录和会话结构,以便在未来基于JSP的应用程序中重用。是这样的: 而位于WebContent根目录的login.jsp页面有一个 表单,其中包含用于身份验证的专用innerHTML和一个用于接收会话超时或登录失败消息的${failure}字段。 这个结构对我来说很管用。它拦截、请求登录、检查会话和身份验证,等等,但有一个小缺陷:如果您在登录页面并在超时后刷新它(F5或在URL

  • google oauth2刷新令牌何时过期? 我所说的过期是指过期是因为经过了某个时间跨度(不是因为用户已撤销访问权限或用户已请求新的刷新令牌) 我做了一些研究,没有一个引用官方的谷歌文档(我也找不到一个有效的谷歌文档) 其他一些问题表示,由于时间,它从未过期: 谷歌刷新令牌过期了吗? https://community.fitbit.com/t5/web-api-development/inva

  • 我已经阅读了跑道文档。我特别考虑了以下关于使用的声明: 此请求返回与上述相同的数据,您可以继续反复执行此操作,以保持应用程序的身份验证,而无需要求用户重新身份验证。 这是否意味着将无限期有效或过期: < li >签发后X天;或者 < li >最后一次使用它获取新的< code>access_token后的X天 编辑:请参阅此跑道线程,该线程提出相同的问题,但似乎没有给出任何关于Oauth2.0协议

  • 任何人都可以告诉我通过 OAuth2 生成刷新令牌的到期时间。实际上,它通常返回 2 个参数访问令牌和刷新令牌。我们使用刷新令牌,以便在访问令牌过期时生成新的访问 toke。但是谷歌日历版本3,我正在使用刷新令牌来调用日历API。但是在这里我遇到了令牌过期的问题。所以任何人都可以建议我当令牌过期时我该怎么办。据我说,刷新令牌没有过期时间。请检查下面的代码以使用刷新令牌创建日历服务:-