我编写了一个spring kafka包,使用spring boot将消息发送到kafka主题,其中“Key”作为字符串,“Arraylist”作为值。“Custom Object”是一个具有属性item id、item name和item ordered count的类。
Kafka制作人日志如下所示。
2021-10-29 00:09:34.147 INFO 16496 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [172.26.77.192:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = true
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class io.springbootlearn.orders.customerorders.util.kafkaProducer.KafkaArrayListSerializer
2021-10-29 00:09:34.180 INFO 16496 --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 0 ms.
2021-10-29 00:09:34.180 INFO 16496 --- [ main] org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
2021-10-29 00:09:34.180 INFO 16496 --- [ main] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2021-10-29 00:09:34.181 INFO 16496 --- [ main] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed
2021-10-29 00:09:34.182 INFO 16496 --- [ main] o.a.kafka.common.utils.AppInfoParser : App info kafka.producer for producer-1 unregistered
2021-10-29 00:09:34.182 DEBUG 16496 --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Kafka producer has been closed
ProcessOrders: Exception found.....org.apache.kafka.common.KafkaException: Failed to construct kafka producer
我编写了一个自定义序列化程序,如下所示。
package io.springbootlearn.orders.customerorders.util.kafkaProducer;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import io.springbootlearn.orders.customerorders.models.UpdateItemCount;
public class KafkaArrayListSerializer<UpdateItemCount> implements Serializer<ArrayList<UpdateItemCount>>{
private final Serializer<UpdateItemCount> arrayListSerializer;
public KafkaArrayListSerializer(final Serializer<UpdateItemCount> paramArrayListSerializer) {
System.out.println("KafkaArrayListSerializer: Inside constructor.......");
this.arrayListSerializer = paramArrayListSerializer;
}
@Override
public byte[] serialize(String topic, ArrayList<UpdateItemCount> customerOrderData) {
System.out.println("KafkaArrayListSerializer: Inside serialize.......");
System.out.println("Topic name......."+topic);
int dataSize = customerOrderData.size();
System.out.println("dataSize......."+dataSize);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
Iterator<UpdateItemCount> arrayListIter = customerOrderData.iterator();
try {
out.writeInt(dataSize);
while (arrayListIter.hasNext()) {
final byte[] customerOrderDataToBytes = arrayListSerializer.serialize(topic, arrayListIter.next());
out.writeInt(customerOrderDataToBytes.length);
out.write(customerOrderDataToBytes);
}
out.close();
} catch(IOException e) {
throw new RuntimeException("unable to serialize ArrayList", e);
}
return baos.toByteArray();
}
}
“Arraylist”的Serde类如下所示。
package io.springbootlearn.orders.customerorders.util.kafkaProducer;
import java.util.ArrayList;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
public class ArrayListSerde<T> implements Serde<ArrayList<T>>{
private final Serde<ArrayList<T>> arrayListSerdeObj;
public ArrayListSerde(final Serde<T> SerdeObj) {
this.arrayListSerdeObj = Serdes.serdeFrom(new KafkaArrayListSerializer<>(SerdeObj.serializer())
, new KafkaArrayListDeserializer<>(SerdeObj.deserializer()));
}
@Override
public Serializer<ArrayList<T>> serializer() {
System.out.println("ArrayListSerde: Inside Serializer.......");
return arrayListSerdeObj.serializer();
}
@Override
public Deserializer<ArrayList<T>> deserializer() {
return null;
}
@Override
public void close() {
arrayListSerdeObj.serializer().close();
}
}
Kafka生产者服务代码如下所示:-
package io.springbootlearn.orders.customerorders.util.kafkaProducer;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import io.springbootlearn.orders.customerorders.models.UpdateItemCount;
@Service
@ConfigurationProperties
public class KafkaProducerService {
private final KafkaTemplate<String, ArrayList<UpdateItemCount>> kafkaTemplateObj;
private String KAFKA_TOPIC_NAME = "ItemsOrdered";
private ListenableFuture<SendResult<String,ArrayList<UpdateItemCount>>> asyncCall;
private ProducerListener<String, ArrayList<UpdateItemCount>> producerListener;
public KafkaProducerService(KafkaTemplate<String,ArrayList<UpdateItemCount>> kafkaTemplateParam) {
this.kafkaTemplateObj = kafkaTemplateParam;
}
public KafkaTemplate<String, ArrayList<UpdateItemCount>> producerListener(ProducerListener<String, ArrayList<UpdateItemCount>> producerListener) {
kafkaTemplateObj.setProducerListener(producerListener);
return kafkaTemplateObj;
}
public void sendMessage(Integer PartitionId, String key, ArrayList<UpdateItemCount> UpdateItemCountArr) {
try {
System.out.println("KafkaProducerService:sending message");
final ProducerRecord<String, ArrayList<UpdateItemCount>> record = new ProducerRecord<String, ArrayList<UpdateItemCount>>(KAFKA_TOPIC_NAME, PartitionId, key,UpdateItemCountArr);
System.out.println("KafkaProducerService: sending async call...");
ListenableFuture<SendResult<String,ArrayList<UpdateItemCount>>> future = kafkaTemplateObj.send(record);
kafkaTemplateObj.flush();
} catch(KafkaProducerException ex) {
System.out.println("Exception...." + ex);
System.out.println("Exception msg...." + ex.getMessage());
System.out.println("Exception root cause..." + ex.getRootCause());
System.out.println("Exception stack trace..." + ex.getStackTrace());
}
}
}
kafka配置如下所示
spring.kafka.producer.bootstrap-servers = 172.26.77.192:9092
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = io.springbootlearn.orders.customerorders.util.kafkaProducer.KafkaArrayListSerializer
logging.level.org.apache.kafka=debug
logging.level.org.apache.kafka.clients=debug
我已经确认我的Kafka设置工作正常,方法是编写另一个程序,该程序将消息发送到不同的Kafka主题,其中“Key”和“Value”都作为字符串。消费者能够从此主题获取消息。
有人能帮忙吗?
非常感谢。
Kafka只能使用无参数构造函数创建序列化程序。对于更复杂的对象,您必须自己构造它们,并通过构造函数或setter传递到默认生产者工厂。
我正在手动启动Zoomaster,然后是Kafka服务器,最后是Kafka-Rest服务器及其各自的属性文件。接下来,我正在tomcat上部署我的Spring Boot应用程序 在Tomcat日志跟踪中,我得到了错误org。springframework。上下文ApplicationContextException:无法启动bean的组织。springframework。Kafka。配置。inte
我正在用Springboot做一个简单的Kafka示例项目,我遇到了一个错误,制作人没有创建,但其余的工作正常。 我遇到的错误似乎引发了异常,因为制作人没有创建,但没有解释原因,我也不知道: 这是我的kafka配置: 这里是控制器,endpoint“/api/kafka”:
我有一个主题中的多个事件,我试图在这些步骤中处理: 根据标题值过滤事件 应用反序列化程序 按键分组 聚合以生成新的KTable 新KTable将以流式传输方式传输到与具有新标题的新事件相同的主题 我可以使用transformValues访问标题,但不确定在执行toStream时如何注入新的标题值。 注意:我是KStream的新手。
我有一个kafka streams应用程序 或 这是一个类,用于将消息分发到不同的分区,即使在kafka 2.4版本中使用相同的键 RoundRobinPartitioner具有以下实现: 我的分区器由完全相同的代码组成,但分区方法实现不同,我的代码块是: 当我这样配置时,消息在两种实现中都被分发到不同的分区,但决不使用某些分区。 我有50个分区,而分区14和34从未收到消息。我的分区不是没有价值
我想在Kafka上用Flink设置一个基本的生产者-消费者,但是我很难通过Java向现有消费者生成数据。 CLI解决方案 > 我设置了一个使用zip from 和 我使用创建了一个名为transactions1的主题 现在我可以在命令行上使用生产者和消费者来查看主题已经创建并工作。 设置我运行的消费者 现在,如果任何制作人向主题发送数据,我将在消费者控制台中看到它。 我通过运行 并在cli中的生产
我创建了一个带有三个分区的Kafka主题。使用Spring Kafka中的ProducerFactory,我可以创建一个producer实例。但是,我想创建三个生产者实例,因为我有三个分区。类似地,我想要三个consumer的实例。我该怎么做?请帮忙。