当前位置: 首页 > 知识库问答 >
问题:

KTable KTable FK连接:无法选择外键序列

戚升
2023-03-14

我正在尝试执行 KTable-KTable 外键联接,但我收到一个错误,因为 Kafka 流正在尝试对外键使用字符串 serde。

我希望它使用Kotlinx序列化服务器。如何指定?

我想使用FK选择器将两个KTables的数据连接在一起,并将值重新映射到一个聚合对象中。

tilesGroupedByChunk
  .join<ChunkTilesAndProtos, SurfaceIndex, SurfacePrototypesData>(
    tilePrototypesTable, // join the prototypes KTable
    { cd: MapChunkData -> cd.chunkPosition.surfaceIndex }, // FK join on SurfaceIndex
    { chunkTiles: MapChunkData, protos: SurfacePrototypesData ->
      ChunkTilesAndProtos(chunkTiles, protos) // remap value 
    },
    namedAs("joining-chunks-tiles-prototypes"),
    materializedAs(
      "joined-chunked-tiles-with-prototypes",
      // `.serde()`- helper function to make a Serde from a Kotlinx Serialization JSON module 
      // see https://github.com/adamko-dev/kotka-streams/blob/38388e74b16f3626a2733df1faea2037b89dee7c/modules/kotka-streams-kotlinx-serialization/src/main/kotlin/dev/adamko/kotka/kxs/jsonSerdes.kt#L48
      jsonMapper.serde(),
      jsonMapper.serde(),
    ),
  )

然而,我得到一个错误,因为Kafka Streams正在使用Serdes。String()(我的默认Serde)用于反序列化外键。但它是一个JSON对象,我希望它使用Kotlinx序列化。

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. 
Do the Processor's input types match the deserialized types? 
Check the Serde setup and change the default Serdes in 
StreamConfig or provide correct Serdes via method 
parameters. Make sure the Processor can accept the 
deserialized input of type key: myproject.MyTopology$MapChunkDataPosition, and value: org.apache.kafka.streams.kstream.internals.Change.

Note that although incorrect Serdes are a common cause 
of error, the cast exception might have another cause 
(in user code, for example). For example, if a 
processor wires in a store, but casts the generics 
incorrectly, a class cast exception could be raised 
during processing, but the cause would not be wrong Serdes.

我正在处理的数据来自一个电脑游戏。游戏有一个地图,称为曲面。每个曲面都由曲面索引唯一标识。每个曲面在x/y平面上都有瓷砖。瓷砖有一个“原型名称”,它是TilePrototype的ID。每个TilePrototype都有关于该tile的功能或外观的信息。我需要它来配色。

首先,我将瓷砖分组为32x32块,然后将其分组为KTable。

/** Each chunk is identified by the surface, and an x/y coordinate */
@Serializable
data class MapChunkDataPosition(
  val position: MapChunkPosition,
  val surfaceIndex: SurfaceIndex,
)

/** Each chunk has 32 tiles */
@Serializable
data class MapChunkData(
  val chunkPosition: MapChunkDataPosition,
  val tiles: Set<MapTile>,
)

// get all incoming tiles and group them by chunk,
// this works successfully
val tilesGroupedByChunk: KTable<MapChunkDataPosition, MapChunkData> =
  buildChunkedTilesTable(tilesTable)

然后,我按表面索引收集所有原型,并将它们聚合到一个列表中


/** Identifier for a surface (a simple wrapper, so I can use a Kotlinx Serialization serde everywhere)*/
@Serializable
data class SurfaceIndex(
  val surfaceIndex: Int
)

/** Each surface has some 'prototypes' - I want this because each tile has a colour */
@Serializable
data class SurfacePrototypesData(
  val surfaceIndex: SurfaceIndex,
  val mapTilePrototypes: Set<MapTilePrototype>,
)

// get all incoming prototypes and group them by surface index,
// this works successfully
val tilePrototypesTable: KTable<SurfaceIndex, SurfacePrototypesData> =
  tilePrototypesTable()

这是导致错误的代码

/** For each chunk, get all tiles in that chunk, and all prototypes */
@Serializable
data class ChunkTilesAndProtos(
  val chunkTiles: MapChunkData,
  val protos: SurfacePrototypesData
)

tilesGroupedByChunk
  .join<ChunkTilesAndProtos, SurfaceIndex, SurfacePrototypesData>(
    tilePrototypesTable, // join the prototypes
    { cd: MapChunkData -> cd.chunkPosition.surfaceIndex }, // FK join on SurfaceIndex
    { chunkTiles: MapChunkData, protos: SurfacePrototypesData ->
      ChunkTilesAndProtos(chunkTiles, protos) // remap value 
    },
    namedAs("joining-chunks-tiles-prototypes"),
    materializedAs(
      "joined-chunked-tiles-with-prototypes",
      // `.serde()`- helper function to make a Serde from a Kotlinx Serialization JSON module 
      // see https://github.com/adamko-dev/kotka-streams/blob/38388e74b16f3626a2733df1faea2037b89dee7c/modules/kotka-streams-kotlinx-serialization/src/main/kotlin/dev/adamko/kotka/kxs/jsonSerdes.kt#L48
      jsonMapper.serde(),
      jsonMapper.serde(),
    ),
  )
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: MyProject.processor.Topology$MapChunkDataPosition, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:150)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:131)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:105)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:186)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:54)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:29)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$1.apply(MeteredKeyValueStore.java:182)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$1.apply(MeteredKeyValueStore.java:179)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:107)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:87)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:136)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flushCache(CachingKeyValueStore.java:345)
at org.apache.kafka.streams.state.internals.WrappedStateStore.flushCache(WrappedStateStore.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flushCache(ProcessorStateManager.java:487)
at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:402)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1043)
at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1016)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1017)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
Caused by: java.lang.ClassCastException: class MyProjectTopology$MapChunkData cannot be cast to class java.lang.String (MyProject.processor.MyProject$MapChunkData is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29)
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:99)
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:69)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
... 30 common frames omitted

  • 科特林1.6.10
  • Kafka Streams 3.0.0版
  • Kotlinx系列化1.3.2

共有1个答案

蒙经纶
2023-03-14

有点出乎意料的是,我在拓扑定义中犯了一个错误。

在创建其中一个表的最后阶段,我映射了值 - 但我没有指定 serdes。

    .mapValues { _, v ->
      ChunkTilesAndProtos(v.tiles, v.protos)
    }

所以我更改了它以指定serdes。

    .mapValues(
      "finalise-web-map-tile-chunk-aggregation",
      materializedAs("web-map-tile-chunks", jsonMapper.serde(), jsonMapper.serde())
    ) { _, v ->
      ChunkTilesAndProtos(v.tiles, v.protos)
    }
// note: this uses extension functions from github.com/adamko-dev/kotka-streams 

找到这个不容易。我通过在AbstractStream.java的构造函数(以及其他构造函数)中放置一个断点来查看< code>keySerde和< code>valueSerde字段何时没有被设置,从而找到了它。

有时会出现空serde(我认为某些KTables/KStreams是“虚拟的”,不会对Kafka主题进行编码/解码)。然而,我能够找到导致我的问题的操作,并定义serdes,因为我正在更改值类型。

 类似资料:
  • Cassandra 2.1,Spark 1.1,Spark-Cassandra-Connector 1.1

  • 我有一个 20 列和 3000 万行的表格。一列是外键,具有索引(不唯一)。另一个表有 10 列,具有主键 和唯一的索引列。我想用中的内容更新一些列: <代码>劳动力。lab_id</code>可以作为第二列或最后一列放置。用于联接的索引列的位置是否会影响更新的性能? 如果删除 并将定义为主键,性能是否会发生变化?

  • 我试图保存一个表单,该表单在选择标记中有一个外键,但它总是为空。当我使用inspect元素时,它会发布选定的id,所以我不能说这是表单的问题。有人能帮忙吗。形式 控制器 根本原因 java.lang.ClassNotFoundException:org.hibernate.exception.locktimeoutException org.apache.catalina.loader.webap

  • 我在尝试使用Spring数据和Hibernate作为JPA实现和Postgresql来选择更新行时遇到了一个问题。 假设我们有实体:A,B,C。 假设我们希望选择一个包含所有相关B和C实体的A进行更新,即锁定与表相关的行。 查询将如下所示 更新a, c 该查询将尝试锁定导致异常的两个表,例如:org.postgresql.util.PSQLException:ERROR:FOR UPDATE无法应

  • 我有几个应用程序容器,我想连接到 mongodb 容器。我尝试了external_links但我无法连接到mongodb。 我得到 MongoError:第一次连接时无法连接到服务器[mongodb:27017] 我必须将容器添加到同一个网络中才能让external_links工作吗? MongoDB: 应用程序: 网络:

  • 我正在尝试将nib文件加载到UITableView,我使用情节提要将其放置在UIView中。 UITableView和UIView有两个独立的ViewController,我正在尝试连接它们,但我收到一条错误消息,说无法识别选择器。这是什么意思? 谢谢你! VC1 VC2 错误