当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream手动投票器Kafka

邵刚洁
2023-03-14

我不想使用@KafKalistener或@StreamListener,但我想手动轮询Kafka。我正在使用spring-cloud-starter-stream-kafka库,我有以下Kafka制作人

  @Autowired
  private KafkaTemplate<byte[], byte[]> template;

  public void sendMessages() {
    IntStream.range(2)
             .forEach(val -> {
               template.send("kafka-topic", "hello".getBytes());
             });
  }
 @Autowired
  private ConsumerFactory consumerFactory;

  public void processKafkaRecords() throws InterruptedException {
    Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer("0", "consumer-1");
    consumer.subscribe(Arrays.asList("kafka-topic"));
    ConsumerRecords<byte[], byte[]> poll = consumer.poll(Duration.ofMillis(1000));
    poll.forEach(record -> {
      log.info("record {}", record);
    });
  }
spring.cloud.stream.bindings.pollableInput.destination=kafka-topic
spring.cloud.stream.bindings.pollableInput.group=kafka-topic
spring.cloud.stream.bindings.pollableInput.consumer.batch-mode=true
spring.cloud.stream.bindings.pollableInput.consumer.header-mode=none
spring.cloud.stream.bindings.pollableInput.consumer.use-native-decoding=true

spring.cloud.stream.kafka.bindings.pollableInput.consumer.autoCommitOffset=false

共有1个答案

卢黎明
2023-03-14

可能有几个原因:

  1. duration.ofmilis(1000)-尝试增加时间,除非您的客户端和kafka都运行在同一台计算机上,否则在某些情况下,1秒可能太低。因为poll(Duration)的文档说明如果超时到期,将返回一个空记录集
  2. 如果您首先启动了生产者,然后启动了使用者,并且您没有将偏移量重置策略设置为“最早”,那么您将看不到任何消息,因为默认情况下,使用者将从最新的偏移量进行使用。因此,请尝试设置以下auto.offset.reset=aristy
  3. 来自同一使用者组的另一个使用者可能正在运行,并且只有1个分区,或者使用者组已处于最后一个偏移量。在这种情况下,您可以尝试更改使用者组ID。
 类似资料:
  • 轻松满足企业在各种场景下的需要。“钉钉”通知、实时智能统计、可匿名、支持跨组织、可选图文&评分模式。 文字投票 文字投票的发起 点击新增 选择文字类型 编辑标题和投票项内容 设置参与人、截止日期和投票属性 保存或发布投票 图文投票 图文投票的发起 点击新增 选择图文类型 编辑标题和上传图片内容 设置参与人、截止日期和投票属性(是否多选、是否匿名、结果仅发起人可见、允许评论、评论匿名) 保存或发布投

  • 一、简介 系统的投票功能提供了两种投票类型,第一个是单选投票.第二种是多选投票.网站编辑人员可以根据实际的需求,选择类型进行操作。 何处使用投票: 常用于首页、内容页、及专题页面。所有你想放投票的区域。 如何使用: 只需要根据投票所放位置不同,复制对应代码到模版里即可。 系统信息发布页 和 专题管理内置提供了投票选择功能,只需手动点选,即可添加投票。 针对不同位置CSS样式不同,系统提供了三种常用

  • 现在我们的系统更完善了,但是想要找到最受欢迎的帖子有点难。我们需要一个排名系统来给我们的帖子排个序。 我们可以建立一个基于 karma 的复杂排名系统,权值随着时间衰减,和许多其他因素(很多功能都在 Telescope 中实现了,他是 Microscope 的大哥)。但是对于我们的例子 app, 我们尽量保持简单,我们只按照帖子收到的投票数为它们排序。 让我们实现一个给用户为帖子投票的方法。 数据

  • 获取投票信息 Mudu.Room.Vote.Get(function (response) { response = JSON.parse(response) if (response.status === 'y') { console.log('获取成功,数据为:', response.data) } if (response.status === 'n') {

  • 本文向大家介绍PHP+AJAX 投票器功能,包括了PHP+AJAX 投票器功能的使用技巧和注意事项,需要的朋友参考一下 终于到AJAX,翻译过来就是”异步Javascript和XML”,他可以实现网页内容的部分加载,可提高用户体验。现在有很多网站都有用这技术,反正你知道他能实现网页的异步更新就差不多了。当然下面的例子都相对简单,并没有体现它这一特点~ 投票器 新建文件【 AJAX投票.html】

  • 投票模块 投票模块PC标签调用说明 模块名:vote 模块提供的可用操作   操作名 说明 lists 获取投票列表 get_vote 获取投票概述 获取投票列表(lists): 可用参数: 参数名 是否必须 默认值 说明 siteid 是 null 站点ID enabled 否 1 显示方式 order 否 subjectid DESC 排序方式 代码例子: {pc:vote action="l