>
简单说明一下我想实现的目标:我想对一个kafka流拓扑(使用拓扑测试驱动程序)进行功能测试,用于avro记录。
问题:无法“模拟”模式注册表尝试自动发布/读取模式
到目前为止,我尝试使用MockSchemaRegistryClient来模拟schemaRegistry,但我不知道如何将其链接到Avro Serde。
代码
public class SyncronizerIntegrationTest {
private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());
MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
@Test
void integrationTest() throws IOException, RestClientException {
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
StreamsBuilder kStreamBuilder = new StreamsBuilder();
Serde<Tracking> avroSerde = getAvroSerde();
mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());
KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
"topic",
Consumed.with(Serdes.String(), avroSerde));
unmappedOrdersStream
.filter((k, v) -> v != null).to("ouput");
Topology topology = kStreamBuilder.build();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));
}
}
阿夫罗斯德法
private <T extends SpecificRecord> Serde<T> getAvroSerde() {
// Configure Avro ser/des
final Map<String,String> avroSerdeConfig = new HashMap<>();
avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");
final Serde<T> avroSerde = new SpecificAvroSerde<>();
avroSerde.configure(avroSerdeConfig, false); // `false` for record values
return avroSerde;
}
如果我运行测试没有testDriver.pipe输入(记录actory.create("主题","1", createValidMepdTrack()));
它工作得很好(看起来一切都解决了)
但是
当我尝试插入数据(pipeInput)时,它抛出以下异常:对象“跟踪”已满。
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)
编辑,我没有删除这个,为“历史日志”提供遵循的路径。
对我们来说最有效的方法是java测试容器,它融合了平台docker映像。您可以设置以下docker撰写文件:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.0.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:5.0.0
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 9092:9092
depends_on:
- zookeeper
schema-registry:
image: confluentinc/cp-schema-registry:5.0.0
environment:
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
ports:
- 8081:8081
depends_on:
- zookeeper
- kafka
您唯一需要做的是将127.0.0.1kafka
添加到/etc/host
中。使用这种方法,您基本上已经为集成测试启动并运行了整个集群。集成测试完成后,集群将被销毁。
编辑:
在不修改/etc/host
的情况下更好地编写docker-comush
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.0.0
hostname: zookeeper
ports:
- '32181:32181'
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
extra_hosts:
- "moby:127.0.0.1"
kafka:
image: confluentinc/cp-kafka:5.0.0
hostname: kafka
ports:
- '9092:9092'
- '29092:29092'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
extra_hosts:
- "moby:127.0.0.1"
schema-registry:
image: confluentinc/cp-schema-registry:5.0.0
hostname: schema-registry
depends_on:
- zookeeper
- kafka
ports:
- '8081:8081'
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181
extra_hosts:
- "moby:127.0.0.1"
Kafka将在本地主机上提供:9092
免责声明:我还没有测试过这个。这只是我分享的一些想法,你可以如何使它工作。希望这有所帮助。如果你能对此答案提供反馈,那将是一个很好的正确和有效的解决方案。
我认为你不能通过配置使用常规的Avro Serde:
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
据我所知,这将尝试连接到
props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");
但是,使用MockSchemaRegistryClient
时,没有要连接的httpendpoint。相反,您需要在创建模拟客户端时将其传递到Serde:
MockSchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
// add the schemas you want to use
schemaRegistryClient.register(...);
SpecificAvroSerde<T> serde = new SpecificAvroSerde<>(schemaRegistryClient);
因此,您只需配置一个“虚拟”httpendpoint,因为提供模拟客户端无论如何都不会使用它。
通过如下代码传递相应的Serde似乎是正确的:
StreamBuilder.stream("topic", Consumed.with(Serdes.String(), avroSerde));
Confluent提供了大量用于测试Kafka(Streams)以及模式注册表的示例代码。
https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java
最重要的是,mocking并不是一个完整的集成测试--使用内存中的模式注册表启动一个实际的Kafka代理才是。
在上面的代码中,请参见
@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
而且
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl());
在探索如何对Kafka流进行单元测试时,我遇到了,不幸的是,这个类似乎被版本(KAFKA-4408)破坏了 对于KTable的问题,是否有一个解决方案? 我看到了“mocked streams”项目,但首先它使用的是,而我使用的是,其次它是Scala,而我的测试是Java/Groovy。 这里的任何关于如何在不需要引导zookeeper/kafka的情况下对流进行单元测试的帮助都将非常棒。 注意:
我正在尝试(单元)测试使用Kafka DSL的Spring Cloud Stream Kafka处理器,但收到以下错误“无法建立到节点1的连接。代理可能不可用。”。此外,测试不会停止。我尝试了EmbeddedKafka和TestBinder,但我有同样的行为。我试图从Spring Cloud团队(有效)给出的响应开始,我将应用程序改编为使用Kafka DSL,并将测试类保持原样。EmbeddedK
本文向大家介绍您如何进行功能测试?,包括了您如何进行功能测试?的使用技巧和注意事项,需要的朋友参考一下 功能测试 作为功能测试的一部分,应该对业务指定的所有用户要求进行良好的测试。功能测试要求在类似于客户要求的环境中进行测试。所有边界条件,负面情景都被考虑在内。应用程序的每个功能都应该可以很好地运行,并且不会出现任何错误。 功能测试的目的 应该测试运行良好的应用程序预期的基本要求。用户界面应易于使
问题内容: 我正在尝试使用Avro来读取和写入Kafka的邮件。有没有人有使用Avro二进制编码器对将放入消息队列中的数据进行编码/解码的示例? 我需要的是Avro而不是Kafka。或者,也许我应该考虑其他解决方案?基本上,我试图在空间方面找到一种更有效的JSON解决方案。刚刚提到了Avro,因为它可以比JSON紧凑。 问题答案: 我终于想起要询问Kafka邮件列表,并得到以下答复,效果很好。 是
问题内容: 我正在使用Mocha以便对为node.js编写的应用程序进行单元测试 我想知道是否可以对模块中未导出的功能进行单元测试。 例: 我在其中定义了很多功能 以及一些作为公共导出的功能: 测试用例的结构如下: 显然,这是行不通的,因为没有导出。 对私有方法进行单元测试的正确方法是什么?摩卡咖啡有内置的方法吗? 问题答案: 如果模块未导出功能,则模块外部的测试代码无法调用该功能。那是由于Jav
第1流: 流-2: 现在,我想对这两个流执行JOIN操作,并希望仅检索流-1中不存在于流-2中的行。我的输入流数据是AVRO格式 预期产出: 那么,我应该执行哪个连接操作,以及如何实现预期的输出呢?谁能帮我实现这个目标