我使用的是spring boot 2.2.4版本,spring-kafka 2.4.2版本
我的场景是以下一个:
所以我写了folloqing代码
生产者微服务
spring kafka配置:
@Configuration
public class WebmailKafkaConfig {
@Autowired
private Environment environment;
@Bean
public KafkaAdmin kafkaAdmin(){
Map<String, Object> configuration = new HashMap<String, Object>();
configuration.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("webmail.be.messaging.kafka.bootstrap.address"));
KafkaAdmin result = new KafkaAdmin(configuration);
return result;
}
@Bean
public ProducerFactory<String, RicezioneMailMessage> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("webmail.be.messaging.kafka.bootstrap.address"));
configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean("ricezioneMailMessageKafkaTemplate")
public KafkaTemplate<String, RicezioneMailMessage> ricezioneMailMessageKafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
public class WebmailKafkaTopicSvcImpl implements WebmailKafkaTopicSvc {
private static final Logger logger = LoggerFactory.getLogger(WebmailKafkaTopicSvcImpl.class.getName());
@Autowired
private KafkaAdmin kafkaAdmin;
@Value("${webmail.be.messaging.kafka.topic.numero.partizioni}")
private int numeroPartizioni;
@Value("${webmail.be.messaging.kafka.topic.fattore.replica}")
private short fattoreReplica;
@Autowired
@Qualifier("ricezioneMailMessageKafkaTemplate")
private KafkaTemplate<String, RicezioneMailMessage> ricezioneMailMessageKafkaTemplate;
@Override
public void createKafkaTopic(String topicName) throws Exception {
if(!StringUtils.hasText(topicName)){
throw new IllegalArgumentException("Passato un topic name non valido ["+topicName+"]");
}
AdminClient adminClient = null;
try{
adminClient = AdminClient.create(kafkaAdmin.getConfig());
List<NewTopic> topics = new ArrayList<>(1);
NewTopic topic = new NewTopic(topicName, numeroPartizioni, fattoreReplica);
topics.add(topic);
CreateTopicsResult result = adminClient.createTopics(topics);
result.all().whenComplete(new KafkaFuture.BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
if( throwable != null ){
logger.error("Errore creazione topic", throwable);
}
}
});
}finally {
if( adminClient != null ){
adminClient.close();
}
}
}
@Override
public void sendMessage(RicezioneMailMessage rmm) throws Exception {
ListenableFuture<SendResult<String, RicezioneMailMessage>> future = ricezioneMailMessageKafkaTemplate.send(rmm.getPk(), rmm);
future.addCallback(new ListenableFutureCallback<SendResult<String, RicezioneMailMessage>>() {
@Override
public void onFailure(Throwable ex) {
if( logger.isWarnEnabled() ){
logger.warn("Impossibile inviare il messaggio=["
+ rmm + "] a causa di : " + ex.getMessage(),ex);
}
}
@Override
public void onSuccess(SendResult<String, RicezioneMailMessage> result) {
if(logger.isTraceEnabled()){
logger.trace("Inviato messaggio=[" + rmm +
"] con offset=[" + result.getRecordMetadata().offset() + "]");
}
}
});
}
}
在制作人方面所有的工作都很好。我能创造话题和发送信息。
消费者微服务
动态侦听器类
public class DynamicKafkaConsumer {
private final String brokerAddress;
private final String topicName;
private boolean stopTest;
private static final Logger logger = LoggerFactory.getLogger(DynamicKafkaConsumer.class.getName());
public DynamicKafkaConsumer(String brokerAddress, String topicName) {
if( !StringUtils.hasText(brokerAddress)){
throw new IllegalArgumentException("Passato un broker address non valido");
}
if( !StringUtils.hasText(topicName)){
throw new IllegalArgumentException("Passato un topicName non valido");
}
this.brokerAddress = brokerAddress;
this.topicName = topicName;
if( logger.isTraceEnabled() ){
logger.trace("Creato {} con topicName {} e brokerAddress {}", this.getClass().getName(), this.topicName, this.brokerAddress);
}
}
public final void start() {
MessageListener<String, RicezioneMailMessage> messageListener = (record -> {
RicezioneMailMessage messaggioRicevuto = record.value();
if( logger.isInfoEnabled() ){
logger.info("Ricevuto messaggio {} su topic {}", messaggioRicevuto, topicName);
}
stopTest = true;
});
ConcurrentMessageListenerContainer<String, RicezioneMailMessage> container =
new ConcurrentMessageListenerContainer<>(
consumerFactory(brokerAddress),
containerProperties(topicName, messageListener));
container.start();
}
private DefaultKafkaConsumerFactory<String, RicezioneMailMessage> consumerFactory(String brokerAddress) {
return new DefaultKafkaConsumerFactory<>(
consumerConfig(brokerAddress),
new StringDeserializer(),
new JsonDeserializer<>(RicezioneMailMessage.class));
}
private ContainerProperties containerProperties(String topic, MessageListener<String, RicezioneMailMessage> messageListener) {
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(messageListener);
return containerProperties;
}
private Map<String, Object> consumerConfig(String brokerAddress) {
return Map.of(
BOOTSTRAP_SERVERS_CONFIG, brokerAddress,
GROUP_ID_CONFIG, "groupId",
AUTO_OFFSET_RESET_CONFIG, "earliest",
ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"
);
}
public boolean isStopTest() {
return stopTest;
}
public void setStopTest(boolean stopTest) {
this.stopTest = stopTest;
}
}
public class TestRicezioneMessaggiCasellaPostale {
private static final Logger logger = LoggerFactory.getLogger(TestRicezioneMessaggiCasellaPostale.class.getName());
@Test
public void testRicezioneMessaggiMail() {
try {
String brokerAddress = "localhost:9092";
DynamicKafkaConsumer consumer = new DynamicKafkaConsumer(brokerAddress, "f586caf2-ffdc-4e3a-88b9-a262a502f8ac");
consumer.start();
boolean stopTest = consumer.isStopTest();
while (!stopTest) {
stopTest = consumer.isStopTest();
}
} catch (Exception e) {
logger.error("Errore nella configurazione della casella postale; {}", e.getMessage(), e);
}
}
}
当我在生产者端发送消息时,我可以看到以下日志:
2020-02-19 22:00:22,320 52822 [kafka-producer-network-thread | producer-1] TRACE i.e.t.r.p.w.b.s.i.WebmailKafkaTopicSvcImpl - Inviato messaggio=[RicezioneMailMessage{pk='c5c8f8a4-8ddd-407a-9e51-f6b14d84f304', tipoMessaggio='mail'}] con offset=[0]
在消费者方面,我没有看到任何信息。我只看到下面的指纹:
2020-02-19 22:00:03,194 1442 [main] INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = false
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = groupId
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes
= 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
2020-02-19 22:00:03,630 1878 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka version: 2.4.0
2020-02-19 22:00:03,630 1878 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka commitId: 77a89fcf8d7fa018
2020-02-19 22:00:03,630 1878 [main] INFO o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1582146003626
2020-02-19 22:00:03,636 1884 [main] INFO o.a.k.c.consumer.KafkaConsumer - [Consumer clientId=consumer-groupId-1, groupId=groupId] Subscribed to topic(s): f586caf2-ffdc-4e3a-88b9-a262a502f8ac
2020-02-19 22:00:03,645 1893 [main] INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
2020-02-19 22:00:03,667 1915 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2020-02-19 22:00:04,123 2371 [consumer-0-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-groupId-1, groupId=groupId] Cluster ID: hOOJH-WNTNiXD4il0Y7_0Q
2020-02-19 22:00:05,052 3300 [consumer-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2020-02-19 22:00:05,059 3307 [consumer-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] (Re-)joining group
2020-02-19 22:00:05,116 3364 [consumer-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] (Re-)joining group
2020-02-19 22:00:05,154 3402 [consumer-0-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] Finished assignment for group at generation 1: {consumer-groupId-1-41df9153-7c33-46b1-8274-2d7ee2bfb35c=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@a95df1b}
2020-02-19 22:00:05,327 3575 [consumer-0-C-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] Successfully joined group with generation 1
2020-02-19 22:00:05,335 3583 [consumer-0-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] Adding newly assigned partitions: f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0
2020-02-19 22:00:05,363 3611 [consumer-0-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] Found no committed offset for partition f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0
2020-02-19 22:00:05,401 3649 [consumer-0-C-1] INFO o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-groupId-1, groupId=groupId] Resetting offset for partition f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0 to offset 0.
2020-02-19 22:00:05,404 3652 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing on assignment: {f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}}
2020-02-19 22:00:05,432 3680 [consumer-0-C-1] INFO o.s.k.l.ConcurrentMessageListenerContainer - groupId: partitions assigned: [f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0]
2020-02-19 22:00:08,669 6917 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2020-02-19 22:00:08,670 6918 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2020-02-19 22:00:13,671 11919 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2020-02-19 22:00:13,671 11919 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2020-02-19 22:00:18,673 16921 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2020-02-19 22:00:18,673 16921 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2020-02-19 22:00:23,674 21922 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2020-02-19 22:00:23,674 21922 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2020-02-19 22:00:28,676 26924 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2020-02-19 22:00:28,676 26924 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2020-02-19 22:00:33,677 31925 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2020-02-19 22:00:33,677 31925 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2020-02-19 22:00:38,678 36926 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2020-02-19 22:00:38,678 36926 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
2020-02-19 22:00:43,678 41926 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records
2020-02-19 22:00:43,679 41927 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}
谁能告诉我我错在哪?
我找到了你代码的根本原因。
从客户端发送消息和日志的代码:
ricezioneMailMessageKafkaTemplate.send(rmm.getPk(), rmm);
Inviato messaggio=[RicezioneMailMessage{pk='c5c8f8a4-8ddd-407a-9e51-f6b14d84f304', tipoMessaggio='mail'}] con offset=[0]
来自使用者的代码和日志:
DynamicKafkaConsumer consumer = new DynamicKafkaConsumer(brokerAddress, "f586caf2-ffdc-4e3a-88b9-a262a502f8ac");
2020-02-19 22:00:03,636 1884 [main] INFO o.a.k.c.consumer.KafkaConsumer - [Consumer clientId=consumer-groupId-1, groupId=groupId] Subscribed to topic(s): f586caf2-ffdc-4e3a-88b9-a262a502f8ac
您发送到的主题:c5c8f8a4-8ddd-407a-9e51-f6b14d84f304
生产者/消费者正在发送/监听不同的主题。
我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d
我正在尝试把阿帕奇Storm和Kafka整合在一起。连接似乎建立良好,但没有收到任何消息。但是这些消息似乎也被发送到了Kafka服务器,而Kafka服务器中相应主题的索引文件显示存在一些数据。有没有一种方法可以在Storm End上调试这个更多的..?我正在使用Storm的客户解码器来处理信息。Storm的实现是:
我正在尝试运行一个简单的Spring Boot Kafka应用程序,但我无法使其工作。我遵循了各种教程,现在我正在实现这个教程,但当我启动应用程序时,会发生以下情况: 我可以在控制台中写入,但消费者没有收到任何消息。 这是我的SpringApplication类: application.yml: 消费者类、生产者类及其配置类与教程中所写的相同。< br >在我的server.properties
我正在使用FCM通知。由于某些问题,当我使用FCM云控制台发送时,我没有收到通知。 这是我的两节课。 我注意到从未调用public void onTokenRefresh()。所以我用了它 字符串refreshedToken=FireBaseInstanceId.getInstance().getToken(); 在我的主要活动中我得到了标记。可能FirebaseInstanceIdService
我正在构建一个从Firebase接收通知的应用程序。应用程序收到通知但没有弹出它。 任何帮助! 类MyFirebaseMessagingService:FirebaseMessagingService(){
我正在使用Amazon Textract的StartDocumentAnalysis函数异步扫描文件。S3存储桶中的pdf文件。正如文档所说,我应该会收到一份关于工作状态的通知,发送到提供的SNS主题。 返回用于获取操作结果的作业标识符(JobId)。当文本分析完成后,亚马逊文本将完成状态发布到您在中指定的亚马逊简单通知服务(Amazon SNS)主题。 我用来开始分析的代码如下: 我在AWS控制