使用spring集成Kafka(2.1),我能够成功地发送消息到Kafka的一个主题。
本机Kafka客户端API提供了一个在成功发送时回调的选项。我怎样才能做到与spring一样--融合--Kafka。下面我的配置和代码供你参考。
XML配置
<int:publish-subscribe-channel id="inputToKafka" />
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
auto-startup="true"
channel="inputToKafka"
kafka-template="template"
topic="test"
sync="true">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<!--<entry key="retries" value="0" />
<entry key="batch.size" value="16384" />
<entry key="linger.ms" value="0" />
<entry key="buffer.memory" value="33554432" /> -->
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
发送消息的Java码
@Autowired
@Qualifier("inputToKafka")
MessageChannel channel;
channel.send(MessageBuilder.withPayload("Test Message").build());
当前没有“消息传递”样式的回调,但您可以使用Kafkatemplate
注册ProducerListener
;参见Kafkatemplate。
主要内容:Spark是什么?,与Spark整合在本章中,将讨论如何将Apache Kafka与Spark Streaming API集成。 Spark是什么? Spark Streaming API支持实时数据流的可扩展,高吞吐量,容错流处理。 数据可以从Kafka,Flume,Twitter等许多来源获取,并且可以使用复杂算法进行处理,例如:映射,缩小,连接和窗口等高级功能。 最后,处理后的数据可以推送到文件系统,数据库和现场仪表板上。 弹
主要内容:Storm是什么?,与Storm整合,提交到拓扑在本章中,我们将学习如何将Kafka与Apache Storm集成。 Storm是什么? Storm最初是由Nathan Marz和BackType团队创建的。 在很短的时间内,Apache Storm成为分布式实时处理系统的标准,用于处理大数据。 Storm速度非常快,每个节点每秒处理超过一百万个元组的基准时钟。 Apache Storm持续运行,从配置的源(Spouts)中消耗数据并将数据传递
一、背景 先说一下,为什么要使用 Flume + Kafka? 以实时流处理项目为例,由于采集的数据量可能存在峰值和峰谷,假设是一个电商项目,那么峰值通常出现在秒杀时,这时如果直接将 Flume 聚合后的数据输入到 Storm 等分布式计算框架中,可能就会超过集群的处理能力,这时采用 Kafka 就可以起到削峰的作用。Kafka 天生为大数据场景而设计,具有高吞吐的特性,能很好地抗住峰值数据的冲击
本文向大家介绍Spring Boot 整合mybatis 与 swagger2,包括了Spring Boot 整合mybatis 与 swagger2的使用技巧和注意事项,需要的朋友参考一下 之前使用springMVC+spring+mybatis,总是被一些繁琐的xml配置,有时候如果配置出错,还要检查各种xml配置,偶然接触到了spring boot 后发现搭建一个web项目真的是1分钟的事情
向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前
一、版本说明 Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下: spark-streaming-kafka-0-8 spark-streaming-kafka-0-10 Kafka 版本 0.8.2.1 or higher 0.10.0 or higher A