当前位置: 首页 > 知识库问答 >
问题:

Kafka春靴流

扶开诚
2023-03-14

我想在我的spring boot项目中使用Kafka Streams实时处理。所以我需要Kafka Streams配置,或者我想使用KStreams或KTable,但我在互联网上找不到示例。

我做了制作人和消费者现在我想流实时。

共有3个答案

何星鹏
2023-03-14

您可以使用https://start.spring.io/相应地选择必要的版本/依赖项,并生成/下载项目。

您可以开始实现kstream api方法(https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/KStream.html)

孔砚
2023-03-14

使用Spring Boot上的Kafka Streams轻松入门:

>

  • 使用https://start.spring.io.选择Cloud Stream和Spring for Apache Kafka Streams作为依赖项。以下是预配置项目模板的链接:https://start.spring.io/#!语言=java

    在应用程序中定义KStream bean。例如,这是一个非常基本的消费者应用程序。它只需消耗数据并将记录从KStream记录到标准输出。

    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Main.class, args);
        }
    
        @Bean
        public java.util.function.Consumer<KStream<String, String>> process() {
            return stream -> stream.foreach((key, value) -> {
                System.out.println(key + ":" + value);
            });
        }
    }
    

    在这个应用程序中,我们定义了一个输入绑定。Spring将使用名称process-in-0创建此绑定,即bean函数的名称后跟-in-,后跟参数的顺序位置。您可以使用此绑定名称设置其他属性,例如主题名称。例如,spring。云流动绑定。0中的进程。目的地=我的主题

    请参阅此处的更多示例-Spring Cloud Stream Kafka活页夹参考,编程模型部分。

    配置应用程序。yaml如下:

    spring:
      cloud:
        stream:
          bindings:
            process-in-0.destination: my-topic
          kafka:
            streams:
              binder:
                applicationId: my-app
                brokers: localhost:9092
                configuration:
                  default:
                    key:
                      serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                    value:
                      serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    

  • 东郭子默
    2023-03-14

    首先,我要说的是,如果你是Kafka流的新手,在它上面添加spring boot会增加另一个层次的复杂性,Kafka流也有一个很大的学习曲线。以下是让你行动起来的基础:pom:

    <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>${spring.version}</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-streams</artifactId>
      <version>${kafka.version}</version>
    </dependency>
    

    现在是配置对象。下面的代码假设您正在创建两个流应用程序,请记住每个应用程序代表其自己的处理拓扑:

    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
    import org.springframework.kafka.core.StreamsBuilderFactoryBean;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class KafkaStreamConfig {
    
      @Value("${delivery-stats.stream.threads:1}")
      private int threads;
    
      @Value("${delivery-stats.kafka.replication-factor:1}")
      private int replicationFactor;
    
      @Value("${messaging.kafka-dp.brokers.url:localhost:9092}")
      private String brokersUrl;
    
    
      @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
      public StreamsConfig kStreamsConfigs() {
        Map<String, Object> config = new HashMap<>();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
        setDefaults(config);
        return new StreamsConfig(config);
      }
    
    
      public void setDefaults(Map<String, Object> config) {
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
      }
    
      @Bean("app1StreamBuilder")
      public StreamsBuilderFactoryBean app1StreamBuilderFactoryBean() {
        Map<String, Object> config = new HashMap<>();
        setDefaults(config);
        config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
        config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
        return new StreamsBuilderFactoryBean(config);
      }
    
      @Bean("app2StreamBuilder")
      public StreamsBuilderFactoryBean app2StreamBuilderFactoryBean() {
        Map<String, Object> config = new HashMap<>();
        setDefaults(config);
        config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app2");
        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
        config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
        config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
        return new StreamsBuilderFactoryBean(config);
      }
    }
    

    现在是有趣的部分,使用streamsBuilder构建应用程序(本例中为app1)。

    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KeyValue;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.kstream.Consumed;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Produced;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    @Component
    @Slf4j
    public class App1 {
      @SuppressWarnings("unchecked")
      @Bean("app1StreamTopology")
      public KStream<String, Long> startProcessing(@Qualifier("app1StreamBuilder") StreamsBuilder builder) {
    
        final KStream<String, Long> toSquare = builder.stream("toSquare", Consumed.with(Serdes.String(), Serdes.Long()));
        toSquare.map((key, value) -> { // do something with each msg, square the values in our case
          return KeyValue.pair(key, value * value);
        }).to("squared", Produced.with(Serdes.String(), Serdes.Long())); // send downstream to another topic
    
        return toSquare;
      }
    }
    

    希望这有帮助。

    Kafka命令创建主题并将数据发送到主题中

    创建主题:

    kafka-topics.bat --zookeeper localhost:2181 --create --topic toSquare --replication-factor 1 --partitions 1
    

    将数据发送到主题:

    kafka-console-producer --broker-list localhost:9092 --topic testStreamsIn --property parse.key=true --property key.separator=,
    test,12345678
    
     类似资料:
    • 我有一个Spring Boot2.25.1应用程序,它使用Camel 2.25.1与camel-kafka,一切都正常工作…在我的Kafka消费者中,我需要添加该功能以按需暂停消费,因此我升级到camel 3.18.1,以便我可以使用可暂停功能。升级到3.18.1后,我收到错误FileNotes与类文件TimeoutAwareAggregationStategy.class. 当我打开camel-

    • 我想在我的Junit5单元测试中将值注入到带有@Value注释的私有字段。 我引用了这个,并使用了ReflectionTestUtils。setField通过注入值解决了我的问题,但在验证方法被调用的次数时失败。 MyClass(我的类别): 测试类: 运行上述测试时出错 我想kafkaTemplate.sendMessage();被调用一次,但被调用两次后添加反射TestUtils。 需要关于如

    • 如果Kafka服务器(暂时)关闭,我的Spring Boot应用程序将继续尝试连接,但失败,从而导致不必要的通信量并扰乱日志文件: 我知道参数,这就是我如何创建bean: 消费者仍然试图每3秒连接一次。

    • 如spring boot博客所述 我尝试自定义我的对象序列化。 在我的配置中添加了一个新的配置bean之后 当我尝试输出类用户的实例时,json结果不在CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES中 也许我需要在我的Jersey配置中注册一些东西来激活我的自定义obejctMapper配置 谢谢

    • 我使用Spring的引导2.6.3和我试图使用thymeleaf最近得到一个与html的确认消息后,我通过邮件确认我的帐户(我只是使用确认邮件来验证新帐户),所以我看了一些关于thymeleaf的视频,我标记所有的视频都有一个名为模板的文件夹和静态在src/main/ressource和我没有他们,所以我创建了一个名为模板的文件夹,在文件夹中我创建了一个html文件只是为了尝试它,如果它(html

    • 当spring-boot应用程序运行时,如果我完全关闭代理程序(包括kafka和zookeeper),我会在控制台中看到这个警告,持续很长时间。 [org.springframework.kafka.kafkalistenerendpointcontainer#0-0-c-1]警告o.apache.kafka.clients.networkclient-[Consumer clientid=con