我的目标是从主题A消费,做一些处理和生产到主题B,作为单个原子动作。要做到这一点,我有两种选择:
我已成功验证选项#1。所谓成功,是指如果我的处理失败(抛出IllegalArgumentException),来自主题A的已消费消息将继续被KafKalistener消费。这是我所期望的,因为没有提交偏移量,而使用了DefaultAfterRollbackProcessor。
我希望看到相同的行为,如果我使用一个流来从主题a消费、处理并发送到主题B(选项2),而不是KafkaListener。但是,即使在我处理IllegalArgumentException时抛出了一个消息,该消息也只被流使用一次。这是预期的行为吗?
在流的情况下,我只有以下配置:
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "calculate-tax-sender-invoice-stream");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
// this should be enough to enable transactions
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
return new StreamsConfig(props);
}
}
//required to create and start a new KafkaStreams, as when an exception is thrown the stream dies
// see here: https://docs.spring.io/spring-kafka/reference/html/_reference.html#after-rollback
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
streamsBuilderFactoryBean.stop();
log.debug("creating and starting a new StreamsThread ..");
streamsBuilderFactoryBean.start();
}
});
return streamsBuilderFactoryBean;
}
@Autowired
public SpecificAvroSerde<InvoiceEvents> eventSerde;
@Autowired
private TaxService taxService;
@Bean
public KStream<String, InvoiceEvents> kStream(StreamsBuilder builder) {
KStream<String, InvoiceEvents> kStream = builder.stream("A",
Consumed.with(Serdes.String(), eventSerde));
kStream
.mapValues(v ->
{
// get tax from possibly remote service
// an IllegalArgumentException("Tax calculation failed") is thrown by getTaxForInvoice()
int tax = taxService.getTaxForInvoice(v);
// create a TaxCalculated event
InvoiceEvents taxCalculatedEvent = InvoiceEvents.newBuilder().setType(InvoiceEvent.TaxCalculated).setTax(tax).build();
log.debug("Generating TaxCalculated event: {}", taxCalculatedEvent);
return taxCalculatedEvent;
})
.to("B", Produced.with(Serdes.String(), eventSerde));
return kStream;
}
我的单元测试:
@Test
public void calculateTaxForInvoiceTaxCalculationFailed() throws Exception {
log.debug("running test calculateTaxForInvoiceTaxCalculationFailed..");
Mockito.when(taxService.getTaxForInvoice(any(InvoiceEvents.class)))
.thenThrow(new IllegalArgumentException("Tax calculation failed"));
InvoiceEvents invoiceCreatedEvent = createInvoiceCreatedEvent();
List<KeyValue<String, InvoiceEvents>> inputEvents = Arrays.asList(
new KeyValue<String, InvoiceEvents>("A", invoiceCreatedEvent));
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 1);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
producerConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "unit-test-producer");
// produce with key
IntegrationTestUtils.produceKeyValuesSynchronously("A", inputEvents, producerConfig);
// wait for 30 seconds - I should observe re-consumptions of invoiceCreatedEvent, but I do not
Thread.sleep(30000);
// ...
}
更新:在我的单元测试中,我发送了50个invoiceEvents(orderId=1,...,50),我处理了它们并将它们发送到目标主题。
在我的日志中,我看到的行为如下:
invoiceEvent.orderId = 43 → consumed and successfully processed
invoiceEvent.orderId = 44 → consumed and IlleagalArgumentException thrown
..new stream starts..
invoiceEvent.orderId = 44 → consumed and successfully processed
invoiceEvent.orderId = 45 → consumed and successfully processed
invoiceEvent.orderId = 46 → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and IlleagalArgumentException thrown
...
[29-0_0-producer] task [0_0] Error sending record (key A value {"type": ..., "payload": {**"id": "46"**, ... }}} timestamp 1529583666036) to topic invoice-with-tax.t due to {}; No more records will be sent and no more offsets will be recorded for this task.
..new stream starts..
invoiceEvent.**orderId = 46** → consumed and successfully processed
invoiceEvent.orderId = 47 → consumed and successfully processed
invoiceEvent.orderId = 48 → consumed and successfully processed
invoiceEvent.orderId = 49 → consumed and successfully processed
invoiceEvent.orderId = 50 → consumed and successfully processed
使选项2(流事务)工作的要点是:
>
请考虑检查如何处理序列化异常(或生产过程中的一般异常)(此处和此处)
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "blabla");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8082");
// this should be enough to enable transactions
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
return new StreamsConfig(props);
}
}
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean myKStreamBuilder(StreamsConfig streamsConfig)
{
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfig);
streamsBuilderFactoryBean.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.debug("StopStartStreamsUncaughtExceptionHandler caught exception {}, stopping StreamsThread ..", e);
streamsBuilderFactoryBean.stop();
log.debug("creating and starting a new StreamsThread ..");
streamsBuilderFactoryBean.start();
}
});
return streamsBuilderFactoryBean;
}
当我只打开一次处理时,我会得到以下错误。注意:我们的应用程序非常安全,我们只允许Kafka用户和消费者访问他们明确需要的资源。 只有一次处理kafka流是否在所有流任务中使用每个流任务的消费者组而不是消费者组?
我试图使用kafka流库只使用一次kafka的功能。我只将proessing.guarantee配置为exactly_once。与此同时,需要将事务状态存储在内部主题(__transaction_state)中。 我的问题是,如何定制主题的名称?如果kafka集群由多个消费者共享,那么每个消费者是否需要不同的事务管理主题? 谢谢你,墨蒂
我正在尝试创建一个仅在共享内存中具有文件的小文件服务器。客户端应该向服务器发送命令,如 CREATE、DELETE 等。但是,我还没有到那个阶段。 我已经准备了一个服务器和一个客户端。服务器接受套接字,并为每个客户端连接创建一个新线程(要求)。 当我启动客户端时,我可以成功连接到服务器并发送将被接收的消息。但是,这只能工作一次。发送我的命令后,服务器将不会收到任何其他命令。 我尝试使用换行符捕获消
我用的是Kafka2,看起来只有一次 Kafka流 Kafka读取/转换/写入事务生产者 Kafka连接 在这里,上述所有工作在主题之间(源和目标是主题)。 有可能与其他目的地只进行一次航班吗?
我试图用事务性生产者/消费者来准确地理解Kafka。 我遇到了下面的例子。但是,我还是很难准确地理解一次。这个代码正确吗? 制作人sendOffsetsToTransaction-此代码的作用是什么?这是否应该针对同一个目标主题? 什么是消费者之前的系统崩溃。commitSync();//将再次读取相同的消息并生成重复消息?
扬尼克