Spring Kafka

Spring 的 Kafka 抽象
授权协议 Apache-2.0
开发语言 Java
所属分类 服务器软件、 JMS/消息中间件
软件类型 开源软件
地区 不详
投 递 者 卜和悌
操作系统 跨平台
开源组织 Pivotal
适用人群 未知
 软件概览

Spring for Apache Kafka (spring-kafka) 项目将 Spring 核心概念应用于基于 Kafka 的消息传递解决方案的开发。它提供了一个“模板”作为发送消息的高级抽象。它还通过 @KafkaListener 注解和“侦听器容器(listener container)”为消息驱动的 POJO 提供支持。

这些库促进了依赖注入和声明的使用。在所有这些用例中,你将看到 Spring Framework 中的 JMS 支持和 Spring AMQP 中的 RabbitMQ 支持的相似之处。

功能特性

  • KafkaTemplate

  • KafkaMessageListenerContainer

  • @KafkaListener

  • KafkaTransactionManager

  • 带嵌入式 kafka 服务器的 spring-kafka-test jar 包

  • 2.1 SpringKafka配置 SpringBoot为Kafka提供了两种配置方式 KafkaProperties:Kafka配置类 Map<String, String> properties:配置kv值 2.1.1 Kafka配置类KafkaProperties SpringBoot提供了KafkaProperties配置类,且会将spring.

  • 消费者并发数量 spring.kafka.listener.concurrency @KafkaListener.concurrency 仅在多partition对应单个消费端时,用于多线程消费消息(concurrency <= partition数量), 当存在多个消费端时,优先考虑让新的消费端去消费(而不是多线程,即便设置concurrency > 1也仅有唯一消费线程生效), 所以如果消费端

  • 2.3 SpringKafka消费者 2.3.1 Kafka消息监听器MessageListener 之前已经介绍了通过kafka-topic-consumer.sh和kafka-tool工具来消费数据。下面介绍SpringKafka消费数据的方式——kafka消息监听器。 Kafka的消息监听一般可以分为:1.单条数据监听;2.批量数据监听。GenericMessageListener是Spri

  • 在《幂等与事务处理的》的文章中,我们已经为大家介绍使用apache kafka原生API 如何保证:数据被发送Exactly One,发送一次并且只发送一次,不重发不漏发。幂等性的设置仍然很简单,只需要将生产者客户端参数enable.idempotence设置为true即可。 spring: kafka: producer: properties: enable.idem

  • application-kafka.properties #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。 spring.kafka.consumer.auto-commit-interval; #当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最

  • 在介绍apache kafka原生java api的时候,我们曾详细介绍过消费者偏移量提交方式。这里不再过于详细的说明,只说明一下Spring Kafka消费者数据监听模式配置及使用方法。 Spring Kafka监听器模式(spring.kafka.listener.type配置属性)有两种:single - 监听器消息参数是一个对象;batch - 监听器消息参数是一个集合。 消费者偏移量提交

  • 前言 在上一篇博文中,我们介绍了message listener是如何创建的。接下来,我们将继续探讨Consumer创建、消息拉取和消息传递给listener。 一、消费者创建 ListenerConsumer KafkaMessageListenerContainer内部类,该内部类负责创建Kafka消费者、消息处理、将消息回传给对应的Method进行处理。 ListenerConsumer(G

  • SpringKafka文档地址:https://docs.spring.io/spring-kafka/reference/htmlsingle kafka文档地址:http://kafka.apache.org/documentation SpringKafka中配置的Java配置实现类:https://github.com/spring-projects/spring-boot/blob/v1

  • logback-spring.xml <?xml version="1.0" encoding="UTF-8"?> <!-- 从高到地低 OFF 、 FATAL 、 ERROR 、 WARN 、 INFO 、 DEBUG 、 TRACE 、 ALL --> <!-- 日志输出规则 根据当前ROOT 级别,日志输出时,级别高于root默认的级别时 会输出 --> <!-- 以下 每个配置的 filt

 相关资料
  • 问题内容: 我是一名 Java 开发人员,对Object Orientation概念有很好的了解( 或者也许我这样认为 )。现在我正在学习设计模式(从头开始设计模式)。我一直在阅读有关OOPS概念抽象的文章,以简要地理解它,而阅读更多有关它的内容,使我比以前更加困惑。 据我了解,抽象是指隐藏程序的内部细节,同时将接口暴露给其他程序员,而无需担心内部细节。但是我不明白 抽象类如何适应这种抽象概念,其

  • 本文向大家介绍php中的抽象方法和抽象类,包括了php中的抽象方法和抽象类的使用技巧和注意事项,需要的朋友参考一下 1、什么是抽象方法? 我们在类里面定义的没有方法提的方法就是抽象方法。所谓的没有方法体指的是,在声明的时候没有大括号以及其中的内容,而是直接在声明时在方法名后加上分号结束,另外在声明抽象方法时方法还要加一个关键字"abstract"来修饰。 例如: 2、什么是抽象类? 只要一个类里面

  • 我正在尝试对扩展抽象基的类进行单元测试。以下是“类似的类”,以供说明: 下面是我正在尝试的单元测试: 当我做这个测试的时候 java.lang.NullPointerException 在中 我知道自动连线的“滤水器”没有初始化。但接下来,我只想在我的单元测试中模拟抽象的“非抽象”方法。 我该如何使用EasyMock来实现这一点呢?另外,我不知道和应该做什么。

  • 嗨,我有一个抽象类,其中有一些公共方法和一些抽象方法。我让公众知道,他们实现了派生类的通用方法。 让我困惑的是,为什么我想定义一个公共抽象方法,而不是受保护的抽象方法。在抽象类中定义公共抽象方法对我来说毫无意义。。。。因为if是一个抽象,在派生类中会被重写,但if被定义为public也是一样的,但在某种程度上,将其定义为protected更有意义,因为我们知道,我们将在派生类中重写它。 在抽象类中

  • 问题内容: 我有一个抽象的DAO类,它使用参数化类型(实体)和(主键)。在每个实体中我都有一个。我想动态调用此命名查询而不知道其确切名称和参数名称。 例如,假设以下实体 和这个 我应该如何实现该方法,以便不需要知道确切的名称和参数名称? 问题答案: 在您的示例中,命名查询的命名约定通常为“ City.findByName”,因此,我将尝试更改命名查询以遵循此模式。然后,此查询的参数也应具有相同的名

  • 为什么我从下面的代码中得到这个编译错误消息? (程序根据键盘上按下的箭头键,在4个方向上移动箭头:d) Direction.java:41:错误:DirectionBoard。DirectionListener不是抽象的,并且不会覆盖KeyListener中的抽象方法keyReleated(KeyEvent)

  • 我有一个子类,它声明了我的抽象超类中的所有方法,但它仍然给我一个错误,说明我的类不是抽象的。我不知道为什么会抛出这个错误。 我得到的具体错误是 PhoneBookEntry.java: 1:错误:PhoneBookEntry不是抽象的,并且不会覆盖可比中的抽象方法compareTo(Object) 我的问题代码: 还有我的子类:

  • 当我宣布“抽象公共无效显示();”在抽象类测试中,这是创建一个全新的show()方法,还是只引用Inter接口中声明的show()方法?请澄清。