我有一个应用程序(spring-boot-shipping-service),其中包含一个KStream,它获取由外部生产者(spring-boot-order-service)生成的OrderCreatedEvent消息。此生成器使用以下架构:
Order-Created-Event.avsc
{
"namespace" : "com.codependent.statetransfer.order",
"type" : "record",
"name" : "OrderCreatedEvent",
"fields" : [
{"name":"id","type":"int"},
{"name":"productId","type":"int"},
{"name":"customerId","type":"int"}
]
}
我的kstream
与ktable
联接,并向order主题发布一种新的消息:Ordershippedevent。
Order-Shiped-Event.avsc
{
"namespace" : "com.codependent.statetransfer.order",
"type" : "record",
"name" : "OrderShippedEvent",
"fields" : [
{"name":"id","type":"int"},
{"name":"productId","type":"int"},
{"name":"customerName","type":"string"},
{"name":"customerAddress","type":"string"}
]
}
出于某种原因,新的OrderShippedEvent消息生成时没有标头application/vnd.OrderShippedEvent.v1+avro
,而是标头application/vnd.orderCreatedEvent.v1+avro
。
这是order主题中的原始OrderCreatedEvent:
Key (4 bytes): +
Value (4 bytes): V?
Timestamp: 1555943926163
Partition: 0
Offset: 34
Headers: contentType="application/vnd.ordercreatedevent.v1+avro",spring_json_header_types={"contentType":"java.lang.String"}
生成的OrderShippedEvent的模式不正确:
Key (4 bytes): +
Value (26 bytes): V?
JamesHill Street
Timestamp: 1555943926163
Partition: 0
Offset: 35
Headers: contentType="application/vnd.ordercreatedevent.v1+avro",spring_json_header_types={"contentType":"java.lang.String"}
我已经检查了Confluent Schema注册表内容,其中有Order-Shipped-Event.avsc模式:
为什么它不在生成的消息中使用正确的shema?
application.yml
server:
port: 8085
spring:
application:
name: spring-boot-shipping-service
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
bindings:
input:
destination: customer
contentType: application/*+avro
order:
destination: order
contentType: application/*+avro
output:
destination: order
contentType: application/*+avro
schema-registry-client:
endpoint: http://localhost:8081
ShippingkStreamProcessor
interface ShippingKStreamProcessor {
@Input("input")
fun input(): KStream<Int, Customer>
@Input("order")
fun order(): KStream<String, OrderCreatedEvent>
@Output("output")
fun output(): KStream<String, OrderShippedEvent>
ShippingkStreamConfiguration
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderCreatedEvent>): KStream<Int, OrderShippedEvent> {
val serdeConfig = mapOf(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")
val intSerde = Serdes.IntegerSerde()
val customerSerde = SpecificAvroSerde<Customer>()
customerSerde.configure(serdeConfig, true)
val orderCreatedSerde = SpecificAvroSerde<OrderCreatedEvent>()
orderCreatedSerde.configure(serdeConfig, true)
val orderShippedSerde = SpecificAvroSerde<OrderShippedEvent>()
orderShippedSerde.configure(serdeConfig, true)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (orderEvent.filter { _, value -> value is OrderCreatedEvent && value.id != 0 }
.selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
.join(customerTable, { orderIt, customer ->
OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
}, Joined.with(intSerde, orderCreatedSerde, customerSerde))
.selectKey { _, value -> value.id }
}
2019-04-22 23:40:39.953 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : HTTP GET http://localhost:8081/subjects/ordercreatedevent/versions/1
2019-04-22 23:40:39.971 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Accept=[application/json, application/*+json]
2019-04-22 23:40:39.972 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Writing [] as "application/vnd.schemaregistry.v1+json"
2019-04-22 23:40:39.984 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Response 200 OK
2019-04-22 23:40:39.985 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Reading to [java.util.Map<?, ?>]
2019-04-22 23:40:40.186 INFO 46039 --- [read-1-producer] org.apache.kafka.clients.Metadata : Cluster ID: 5Sw6sBD0TFOaximF3Or-dQ
2019-04-22 23:40:40.318 DEBUG 46039 --- [-StreamThread-1] AvroSchemaRegistryClientMessageConverter : Obtaining schema for class class com.codependent.statetransfer.order.OrderShippedEvent
2019-04-22 23:40:40.318 DEBUG 46039 --- [-StreamThread-1] AvroSchemaRegistryClientMessageConverter : Avro type detected, using schema from object
2019-04-22 23:40:40.342 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : HTTP POST http://localhost:8081/subjects/ordershippedevent/versions
2019-04-22 23:40:40.342 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Accept=[application/json, application/*+json]
2019-04-22 23:40:40.342 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Writing [{"schema":"{\"type\":\"record\",\"name\":\"OrderShippedEvent\",\"namespace\":\"com.codependent.statetransfer.order\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"productId\",\"type\":\"int\"},{\"name\":\"customerName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"customerAddress\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"}] as "application/json"
2019-04-22 23:40:40.348 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Response 200 OK
2019-04-22 23:40:40.348 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Reading to [java.util.Map<?, ?>]
2019-04-22 23:40:40.349 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : HTTP POST http://localhost:8081/subjects/ordershippedevent
2019-04-22 23:40:40.349 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Accept=[application/json, application/*+json]
2019-04-22 23:40:40.349 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Writing [{"schema":"{\"type\":\"record\",\"name\":\"OrderShippedEvent\",\"namespace\":\"com.codependent.statetransfer.order\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"productId\",\"type\":\"int\"},{\"name\":\"customerName\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"customerAddress\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"}] as "application/json"
2019-04-22 23:40:40.361 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Response 200 OK
2019-04-22 23:40:40.362 DEBUG 46039 --- [-StreamThread-1] o.s.web.client.RestTemplate : Reading to [java.util.Map<?, ?>]
2019-04-22 23:40:40.362 DEBUG 46039 --- [-StreamThread-1] AvroSchemaRegistryClientMessageConverter : Finding correct DatumWriter for type com.codependent.statetransfer.order.OrderShippedEvent
return
messageConverter.toMessage(message.getPayload(),
messageHeaders).getPayload();
collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner);
更新3:
复制的步骤:
>
下载并开始合流5.2.1合流开始
您可以检查添加到order主题的新创建的OrderShippedEvent消息是否有错误的标题。这可以在合流控制中心(localhost:9092->topics->order
)或运行kafkacat中看到:
$> kafkacat -b localhost:9092 -t order -C \
-f '\nKey (%K bytes): %k
Value (%S bytes): %s
Timestamp: %T
Partition: %p
Offset: %o
Headers: %h\n'
@相互依赖这确实是一个我们需要在活页夹中解决的问题,我们很快就会解决。同时,作为一种变通方法,您可以让处理器不返回kstream
,而是在方法本身中执行发送。您可以对当前返回的KStream
调用to(TopicNameExtractor)
。TopicNameExtractor
将使您能够访问记录上下文,您可以使用该上下文手动设置内容类型。
有没有办法从Apache spark生成无模式的avro?我可以看到一种使用apache avro库通过Java/Scala和融合avro生成它的方法。当我用下面的方式从Spark编写Avro时,它用模式创建了Avro。我想在没有模式的情况下创建,以减少最终数据集的大小。
我有一个带有日期的简单POJO,在导入Google BigQuery之前,它将作为Avro存储在存储器中。日期转换为long,我试图使用@AvroSchema覆盖日期字段的模式生成,以便BigQuery了解字段的类型。 简单的POJO: 这最终得到以下AVRO-Schema: 这些似乎是错误的,应该是简单的{“name”:“tm”,“type”:“long”,“logicalType”:“time
我有一个quarkus应用程序,当我用下面的命令构建应用程序时,进程第一次开始完美地编译quarkus:dev-DskipTests=true 成功启动的日志: 但是当我停下来重新开始这个过程时,过程并没有开始... 启动失败的日志: 当我尝试重新启动机器,然后启动quarkus服务时,它会再次工作。 pom。xml: 如果有人遇到过类似的行为,请告诉我,这可能是什么根源。
我试图使用ConsumerSeeKaware,阅读kafka主题中可用的最后一条消息。消息类型是Avro对象列表。我能成功地做到这一点。但在反序列化过程中会失败。该消息使用spring-cloud-stream-kafka框架生成。消息具有contentType。 我知道avro消息可以像下面这样反序列化。 但不管用。可能是因为两件事。 > 消息是avro对象的列表。但我正在尝试使用Avro模式创
我正在尝试使用以下方法在运行时获取Avro模式: 但由于我的POJO类包含以下泛型: 我得到以下例外情况: 我知道Avro不会支持泛型类型。是否有一种方法可以在运行时生成架构时从类中省略某些类字段?
当我发布windows窗体应用程序时,它在我开发它的机器上运行良好。但当我把它放在另一台笔记本电脑上就不工作了。双击.exe文件不会出错,也不会发生任何事情我的应用程序中有一个本地数据库。连接字符串为“data source=(LocalDB)\mssqllocaldb;attachdbfilename=datadirectory\database1.mdf;integrated security