@Bean
public ProducerFactory producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<String, User> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(config);
defaultKafkaProducerFactory.setTransactionIdPrefix("trans");
//defaultKafkaProducerFactory.transactionCapable();
return defaultKafkaProducerFactory;
//return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaTransactionManager<String, User> transactionManager() {
KafkaTransactionManager transactionManager = new KafkaTransactionManager(producerFactory());
transactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
transactionManager.setNestedTransactionAllowed(true);
return transactionManager;
}
/**
* New configuration for the consumerFactory added
*
* @return
*/
@Bean
public ConsumerFactory<String, User> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "firstTopic-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<User>(User.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, User>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setTransactionManager(transactionManager());
factory.setRetryTemplate(kafkaRetry());
factory.setStatefulRetry(true);
factory.setErrorHandler(getErrorHandler());
factory.setRecoveryCallback(retryContext -> {
//implement the logic to decide the action after all retries are over.
ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
System.out.println("Recovery is called for message " + consumerRecord.value());
return Optional.empty();
});
return factory;
}
public RetryTemplate kafkaRetry() {
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(60 * 1000);
backOffPolicy.setMultiplier(3);
backOffPolicy.setMaxInterval(4 * 60 * 1000); // original 25 * 60 * 1000
retryTemplate.setBackOffPolicy(backOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(4);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
public SeekToCurrentErrorHandler getErrorHandler() {
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler() {
@Override
public void handle(Exception thrownException,
List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer,
MessageListenerContainer container) {
//super.handle(thrownException, records, consumer, container);
if (!records.isEmpty()) {
ConsumerRecord<?, ?> record = records.get(0);
String topic = record.topic();
long offset = record.offset();
int partition = record.partition();
if (thrownException instanceof DeserializationException) {
System.out.println("------1111------deserialization exception ");
} else {
System.out.println("------xxxx------Record is empty ");
consumer.seek(new TopicPartition(topic, partition), offset);
}
} else {
System.out.println("------4444------Record is empty ");
}
}
};
return errorHandler;
}
@Autowired
KafkaTemplate<String, User> kafkaTemplate;
@KafkaListener(topics = "firstTopic", groupId = "firstTopic-group")
@Transactional
public void onCustomerMessage(User user, Acknowledgment acknowledgment) throws Exception {
/*System.out.println("get the message " +user.getFirstName());
if (user.getFirstName().equalsIgnoreCase("Test")) {
throw new RuntimeException("Incompatible message " + user.getFirstName());
}
*/
//postToSecondTopic(acknowledgment, user);
System.out.println("NOT In transaction");
kafkaTemplate.executeInTransaction(t -> {
System.out.println("---------------------->");
int number = (int) (Math.random() * 10);
t.send("secondtopic", user);
if (number % 5 == 0)
throw new RuntimeException("fail");
acknowledgment.acknowledge();
return true;
});
System.out.println("*** exit ***");
}
日志中的错误
2020-05-28 15:52:53.597错误112469---[nio-8080-exec-1]O.a.C.C.C.[.[.[/].[dispatcherServlet]:servlet.Service()对于servlet[dispatcherServlet]在路径[]上下文中引发异常[请求处理失败;嵌套异常是java.lang.IllegalStateException:没有事务在处理中;可能的解决方案:在template.executeinTransaction()操作范围内运行模板操作,在调用模板方法之前使用@transactional启动事务,在使用记录时在侦听器容器启动的事务中运行
IllegalStateException:没有事务在处理中;可能的解决方案:在template.executeintransaction()操作范围内运行模板操作,在调用模板方法之前用@transactional启动事务,在使用org.springframework.util.assert.state(assert.java:73)~[spring-core-5.2.5.release.jar:5.2.5.release]在org.springframework.kafka.core.kafkatemplate.doSend(kafkatemplate.java:394)~[spring-kafka-2.3.7.release.jar:2.3.7.plate.java:216)~[spring-kafka-2.3.7.release.jar:2.3.7.release]在com.barade.sandesh.springkafka.userresource.postcomments(userresource.java:26)~[classes/:na]在sun.reflect.nativeMethodAccessorimpl.invoke0(原生方法)~[na:1.8.0_252]在在java.lang.reflect.method.invoke(method.java:498)~[NA:1.8.0_252]在org.springframework.web.method.support.invocablehandlermethod.doinvoke(invocablehandlermethod.java:190)~[spring-web-5.2.5.release.jar:5.2.5.release]在AbleHandlerMethod.invokeandHandland(servletInvocableHandlerMethod.java:105)~[spring-webmvc-5.2.5.release.jar:5.2.5.release]在org.springframework.web.servlet.mvc.method.annotation.requestMappingHandlerAdapter.invale(requestMappingHandlerAdapter.invale(requestMappingHandlerAdapter.java:879)~[spring-webmapping-5.2.5.release]在793)~[spring-webmvc-5.2.5.release.jar:5.2.5.release]位于org.springframework.web.servlet.mvc.methOd.abstractHandlerMethodAdapter.handle(abstractHandlerMethodAdapter.java:87)~[spring-webmvc-5.2.5.release.jar:5.2.5.release]在org.springframework.web.servlet.dispatcherServlet.doDispatcherServlet.java:1040)~[spring-webmvc-5.2.5.release]在org.springframework.web.servlet.dispatcherServlet:5.2.5.release]在(frameworkservlet.java:1006)~[spring-webmvc-5.2.5.release.jar:5.2.5.release]在org.springframework.web.servlet.frameworkservlet.doPost(frameworkservlet.java:909)~[spring-webmvc-5.2.5.release.jar:5.2.5.release]
用户资源
@RestController
@RequestMapping("accounts")
public class UserResource {
@Autowired
KafkaTemplate <String, User> kafkaTemplate;
@PostMapping("/users")
public String postComments(@RequestParam ("firstName") final String firstName,
@RequestParam ("lastName") final String lastName,
@RequestParam ("userName") final String userName ) {
List<String> accountTypes = new ArrayList<String>();
kafkaTemplate.send("firstTopic", new User(firstName,lastName,userName));
return "Message sent to the Error queue";
}
}
我有一个springboot应用程序,它侦听Kafka流并将记录发送到某个服务以进行进一步处理。服务有时可能会失败。注释中提到了异常情况。到目前为止,我自己模拟了服务成功和异常场景。 侦听器代码: 用户工厂配置如下: 由于REST服务正在抛出RestClientException,它应该进入上面提到的if块。关于FixedBackOff,我不希望SeekToCurrentErrorHandler执
我希望有一个简单的命令,就像我在bash中使用的一样,在AWS Lambda函数中发布一些内容到MQTT上的主题。按照:mosquitto_pub-h my.server.com-t“light/set”-m“on” 背景:我想和Alexa一起打开和关闭一盏灯。Alexa可以启动一个Lambda函数,在这个Lambda函数中,我想启动一个MQTT发布,因为lamp可以侦听MQTT主题并对那里的消息
背景资料 AWS服务是区域性的(例如,),boto3库要求您在访问客户端或资源之前设置默认区域。但是,这里的留档显示您可以使用通配符替换区域的SNS主题ARN。留档说: 文档:Amazon简单通知服务(Amazon SNS) 语法: 示例: 密码 当我使用boto3的SNS资源/客户端发布到主题ARN(该区域具有通配符)时,我得到以下错误。当我没有该区域的通配符时(例如,我指定了),一切正常。我查
我创建了一个开源项目,希望将其发布到maven central,以便用户只需在POM中引用它就可以使用该库。例如: 我找到了几个在线教程,但其中一些已经过时,一些建议自动化整个过程,从而公开复杂化它。 例如,一篇教程建议为github帐户创建SSH键,并让maven在推送到maven Central时自动创建一个git标记。虽然这是有用的,但没有必要开始。 另一个例子,试图直接通过maven发布它
阅读:Kafka Connect FileStreamSource忽略附加行 看来Kafka现在支持这一观点,他说: https://docs.confluent.io/5.5.0/connect/management/configuring.html#Standalone-示例 是否声明该文件被监视: 开始独立连接 将文件的所有内容添加到主题中,将新行添加到中不会将这些行添加到主题中。是否需要配
在本章中,让我们研究一下Themes and Layouts 。 Drupal将在安装过程中将Bartik主题安装为默认主题。 您可以从Drupal官方网站选择付费或免费主题。 通常,布局是文本和图形的排列。 选择主题是一个好主意,请记住在您的网站上使用不同的布局。 Step 1 - 访问Drupal官方网站 ,然后单击Get Started 。 Step 2 - 单击All Themes ,如以