当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect源任务的轮询间隔

周学义
2023-03-14

我正在使用Kafka-Connect API实现一个自定义源连接器,可用于轮询REST-API并将JSON响应沉入Kafka主题。

现在我想知道如何实现SourceTask的轮询间隔,JDBC连接器如何提供一个轮询间隔。在某个地方,我必须将线程设置为睡眠状态,但是我必须在哪里执行此操作?

共有2个答案

澹台衡
2023-03-14

使用max.poll.interval.ms

请参考此链接:https://kafka.apache.org/documentation/

连俊智
2023-03-14

我在SourceTask实现中通过添加一个long类型的私有字段来存储时间戳,解决了这个用例。在第一次<code>poll()</code>调用时,字段尚未初始化,因此将轮询配置的REST API。在第一次调用时,所提到的<code>long</code>字段get使用当前时间戳初始化。在以下所有poll()调用中,都会检查上一次调用的时间戳。如果自上一次<code>poll()</code>以来经过的毫秒数小于两次轮询之间的配置间隔,则我会将线程发送到Hibernate状态,因为经过了配置的毫秒数。

 类似资料:
  • 客户使用此模式: Apache Camel和CXF JMS接收器 它们在内部使用Spring MDP(消息驱动POJO)来实现它们的消息接收器 它们部署在IBM WebSphere Application Server 7上 队列管理器是IBM Websphere MQ 6 Spring MDP使用JNDI队列连接工厂绑定到队列管理器--支持连接池和会话池

  • 问题是,我无法获取在此ArrayBlockingQueue等待获取db连接的线程数。我查看了文档,其中似乎与我想要的内容无关,因为想要获得db连接的任务是线程,我不确定如何手动计算等待db连接的任务数。 感谢任何帮助。

  • 和此xhtml: 我想动态更新primefaces轮询组件的间隔时间,但不改变我的示例代码。睡眠总是每1秒执行一次。 我的代码中有什么错误?

  • 问题内容: 我正在研究ajax长期轮询,但感到困惑。传统的ajax调用和长时间轮询有什么不同 此示例仅以递归方式调用服务器。与setInterval中的传统调用不同。 问题答案: 顾名思义,“ 长时间轮询” 意味着长时间轮询某些内容。 这是实际过程的开始,您对服务器上的某个脚本进行ajax调用,在本例中为,您需要使服务器脚本(例如)足够智能,以便仅在所需数据可用时才响应请求,该脚本应等待指定的时间

  • 我有一个Kafka主题,并为其附加了1个消费者(主题只有1个分区)。现在对于超时,我使用默认值(心跳:3秒,会话超时:10秒,轮询超时:5分钟)。 根据留档,轮询超时定义消费者必须在其他代理将该消费者从消费者组中删除之前处理消息。现在假设,消费者只需1分钟即可完成处理消息。 现在我有两个问题

  • 我已经设法编写了一个小型控制台应用程序来连接到Azure服务总线中的一个主题/订阅。我已经向订阅发送了消息并注册了一个消息处理程序。我看到消息处理程序每60秒轮询一次。例如,是否将轮询间隔延长至30或60分钟?