好的,目标是:我有一个应该发送邮件的服务,如果失败,我的Kafka制作人将把这封邮件发送到Kafka主题。第二个程序每两分钟查看一次主题,应该只使用一条消息(最早的一条),然后重试发送,如果失败,程序应该将此消息返回主题。
我已经有了一个消费者,但问题是,它会消耗我直到现在还没有使用消费者的所有消息。但我希望他只吃最老的,他以前从未吃过。
这是我的实际消费者:
“CustMessage”是我为测试而创建的一个类,它只是一个具有不同属性的对象,如日期、消息和其他东西。为了测试,代码有点修改,所以这个消费者正在运行endles。它也只显示消息而不是处理它,所以只有“System.out.println(message.displayMessage());
”。
公共级消费电子{
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "happyConsumer");
properties.put("auto.offset.reset", "earliest");
properties.put("enable.auto.commit", "false");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList("mymailtopic"));
while (true) {
ObjectMapper om = new ObjectMapper();
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
customMessage message=null;
try {
message = om.readValue(record.value(), customMessage.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
System.out.println(message.displayMessage());
}
}
}
}
对于实际的代码,服务也可以工作,但我至少可以尝试它是否也可以像我在第一段中提到的那样工作。所以,我的问题是:
我在用Java
您可以使用max.poll.records属性逐条使用消息,在配置中将此属性设置为1
对poll()的单个调用中返回的最大记录数。
properties.put("max.poll.records", 1);
我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。
2016-07-05 03:59:25.042 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Tick,ID:{},[30] 2016-07-05 03:59:25.946 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Metrics_Tick,ID:{},[60] 我的测试
这是否意味着即使有一个使用者实例,我们也能够并行地使用来自三个分区的消息。 这是否可以作为替代,以增加Kafka主题的消耗率,而不增加分区的数量。我知道的唯一的另一种方法是创建一个线程池,并将接收到的消息提交给它。我们是否可以使用ConcurrentKafkaListenerContainerFactory作为该流程的替代,因为这听起来显然更容易实现。
假设在Kafka中,我有一个主题“A”的4个分区,并且我有20个消费者组“AC”的消费者。我不需要任何排序,但我想通过扩展我的消费者实例来更快地处理消息。请注意,所有消息都是独立的,可以独立处理。 我查看了消费者配置分区。分配策略,但不确定是否可以根据消息可用性实现消费者到分区的动态分配。
我有一个Kafka集群(版本:0.10.1.0),有9个代理和10个分区。 我尝试使用camel kafka从java应用程序中获取消息。这是我的pom。xml 这只是我使用的与骆驼Kafka相关的依赖项。下面是骆驼Kafka消费者代码。 我正在使用文档中指定的KafkaURIhttps://camel.apache.org/components/latest/kafka-component.ht
我正在开发一个使用的软件。我有一个用户订阅了多个主题,我想知道是否有一个订单接收来自这些主题的消息。我在我的电脑上尝试了一些组合,但我需要确定这一点。例 null [编辑]我想指定这两个主题各有一个分区,并且只有一个生产者和一个消费者。我需要首先阅读来自第一个主题的所有消息,然后阅读来自另一个主题的消息