当前位置: 首页 > 知识库问答 >
问题:

如何生产一个Kafkaavro唱片,就像使用avro控制台制作人生产的一样?

敖涵容
2023-03-14

我使用的是汇流3.3.0。我的意图是使用kafka-connect将Kafka主题中的值插入Oracle表中。我的连接器与我使用avro console producer生成的avro记录工作良好,如下所示:

./kafka-avro-console-producer --broker-list 192.168.0.1:9092 --topic topic6 --property value.schema='{"type":"record","name":"flights3","fields":[{"name":"flight_id","type":"string"},{"name":"flight_to", "type": "string"}, {"name":"flight_from", "type": "string"}]}'
{"flight_id":"1","flight_to":"QWE","flight_from":"RTY"}
public class Sender {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.1:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "serializers.custom.FlightSerializer");
        props.put("schema.registry.url", "http://192.168.0.1:8081");
        Producer<String, Flight> producer = new KafkaProducer<String, Flight>(props);
        Flight myflight = new Flight("testflight1","QWE","RTY");
        ProducerRecord<String, Flight> record = new ProducerRecord<String, Flight>("topic5","key",myflight);

        try {
            producer.send(record).get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
package vo;

public class Flight {
    String flight_id,flight_to,flight_from;

    public Flight(String flight_id, String flight_to, String flight_from) {
        this.flight_id = flight_id;
        this.flight_to = flight_to;
        this.flight_from = flight_from;
    }

    public Flight(){
    }

    public String getFlight_id() {
        return flight_id;
    }

    public void setFlight_id(String flight_id) {
        this.flight_id = flight_id;
    }

    public String getFlight_to() {
        return flight_to;
    }

    public void setFlight_to(String flight_to) {
        this.flight_to = flight_to;
    }

    public String getFlight_from() {
        return flight_from;
    }

    public void setFlight_from(String flight_from) {
        this.flight_from = flight_from;
    }
}

最后是序列化程序:

package serializers.custom;

import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import vo.Flight;
import com.fasterxml.jackson.databind.ObjectMapper;

public class FlightSerializer implements Serializer<Flight> {
    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> arg0, boolean arg1) {
    }

    @Override
    public byte[] serialize(String arg0, Flight arg1) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();

        try {
            retVal = objectMapper.writeValueAsString(arg1).getBytes();
        } catch (Exception e) {
            e.printStackTrace();
        }

        return retVal;
    }
}

但我所理解的是,需要定义一些类似模式的东西,并使用一些avro序列化器来获得确切的数据,就像我使用avro console Consumer所做的那样。我读过一些示例代码,但没有一个对我有效。

我尝试了下面的代码。但在avro console Consumer中什么都没有。

package producer.serialized.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import vo.Flight;
import java.util.Properties;

public class Sender {
public static void main(String[] args) {
String flightSchema = "{\"type\":\"record\"," + "\"name\":\"flights\","
+ "\"fields\":[{\"name\":\"flight_id\",\"type\":\"string\"},{\"name\":\"flight_to\",\"type\":\"string\"},{\"name\":\"flight_from\",\"type\":\"string\"}]}";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://192.168.0.1:8081");
KafkaProducer producer = new KafkaProducer(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(flightSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("flight_id", "1");
avroRecord.put("flight_to", "QWE");
avroRecord.put("flight_from", "RTY");
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("topic6", avroRecord);

try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}

共有1个答案

厉文栋
2023-03-14

模式没有定义,因此当kafkaavroserializer必须联系模式注册中心来提交模式时,它将不会有模式。

您必须为对象flight创建架构

下面的file.avdl(avro扩展文件之一)示例就可以了:

@namespace("vo")
protocol FlightSender {

    record Flight {
       union{null, string} flight_id = null;
       union{null, string} flight_to = null;
       union{null, string} flight_from = null;
    }
}
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class); 

和生产者,可以使用生成的特定Avro类

Producer<String, Flight> producer = new KafkaProducer<String, Flight>(props);

希望有帮助:-)

 类似资料:
  • 我试图使用apache kafka二进制文件中的kafka控制台生成器生成消息,并在spring boot中使用消费者设置。消费者使用avro模式。 当消息以json格式生成时,我的消费者抛出异常-“无法序列化”。 我找到了一个解决方案,可以使用“ConFluent Platform 7.1”,它有kafka-avro-console-生产者。它支持avro,但它是企业版。 有没有一种方法可以使用

  • 我有一个需要使用Kafka Console Producer发送键值消息的用例。那么如何通过命令实现这一点呢?

  • 最新版本的kafka支持精确一次语义(EoS)。为了支持这一概念,在每个消息中都添加了额外的详细信息。这意味着在你的消费者;如果打印消息的偏移量,它们不一定是连续的。这使得轮询一个主题以阅读最后提交的消息变得更加困难。 在我的例子中,consumer打印了如下所示的内容 问题:为了编写可重启的proudcer;我对话题进行了投票,并阅读了上一条消息的内容。在这种情况下;最后一条消息将是偏移量#5,

  • 问题内容: This is the CSS: How does it produce the circle below? 假设,如果矩形的宽度为180像素,高度为180像素,则它将矩形变得越来越小,也就是说,如果半径大小增加,矩形几乎会消失。 那么,如何将180像素的边界height/width-> 0px变成半径为180像素的圆? 问题答案: 高度/宽度-> 0px的180像素边框如何变成半径为

  • 我试着写一些关于主题的消息,但是控制台不允许(生产者不等待标准输入)。我也看不到任何错误日志。尽管主题已成功创建。我正在使用: 动物园管理员和Kafka服务器运行良好。我使用的是苹果电脑。可能的问题是什么。我正在关注阿帕奇Kafka文档 http://kafka.apache.org/documentation.html#quickstart。