当前位置: 首页 > 知识库问答 >
问题:

Flink为单个源使用多个数据类

阴靖
2023-03-14

一些代码:

implicit val formats = Serialization.formats(NoTypeHints)

case class DataClass(id: String, name: String)    

val dataSource = env
      .addSource(new FlinkKinesisConsumer[String](s"data-stream-$stage", new SimpleStringSchema, consumerConfig))
      .uid(s"data-stream-$stage-source-id").name("dataSource")
      .map(json => read[DataClass](json))

这里,我从kinesis流中获取数据,并将其序列化到我的数据类中。一切正常,但现在需要增加以另一种格式接收数据的能力(例如,DataClassSecond)

其中一个选项是,添加一个额外的数据源并在您自己的流中处理它们。

但是这需要一个额外的运动队列。我不确定这是否是一个好方法有没有什么方法可以从运动接收不同的数据,然后根据类型分割流?

共有1个答案

凌朗
2023-03-14

您可以尝试根据字段过滤数据流,这样您将得到两个或多个仅包含正确JSON格式元素的流。

因此,最简单的方法是:

val streamDataClass = sourceStream.filter(_.contains("name"))
val streamDataClassSecond = sourceStream.filter(_.contains("surname"))

只有当namesurname对每个DataClass都是唯一的时,这才会起作用。更有效的做法可能是首先将映射DataStream到某种通用格式,或者使用要么之类的东西作为反序列化结果,然后检查它是否成功。

 类似资料:
  • 现在我已经添加了一个新的jasper模板,它必须在同一个文件夹中使用基于year参数的不同数据源。 我所能做的就是为该文件夹中的每个模板向jasper发送的参数列表中添加今年,在这种情况下,它将为所有现有模板使用id,为这个新模板使用year。 但是,为了添加jasper模板,我宁愿不部署这个应用程序的新版本,所以我想做以下事情: 在datasource 1中使用参数id查询所需年份 使用this

  • 当我试图在spring-boot上使用多个数据源时,我面临着一个巨大的问题。我的问题是因为我正在使用spring batch,而我没有足够的权限在我的生产数据库上从spring-batch创建元数据表,所以我需要使用例如H2来创建这些表,但是当我试图在我的模型中加载一个在我的作业处理器中具有关系为@OneToMany的字段时,我收到了LazyInitializationException Spri

  • 我正试图使用INFOQ提供的本教程设置一个包含多个数据源的Springboot(v2.0.0.BUILD-SNAPSHOT)项目 https://www.infoq.com/articles/Multiple-Databases-with-Spring-Boot 但是我需要使用多个EntityManager来代替JdbcTemplate 这是我目前掌握的情况 应用属性 一个pplication.j

  • 问题内容: 我有两个名为simple-core-impl和的项目simple-core-web。 这两个项目都是,spring based并且都具有一个父项目名称simple-core。 我simple-impl-config.xml在simple-core-impl项目simple-web-config.xml中simple-impl-config.xml。 我有一个具有类的bean:simpl

  • 在能够根据add custom data source to Jaspersoft Studio将自定义数据源从java bean添加到报表中之后,我将进入使用Jasper报告的第二点。 我有一个主要的报告,它使用一个数据库作为它的数据源。然后,我将一个bean.xml数据源添加到报表中,并将一个表添加到主报表中,主报表使用这个bean.xml数据源来获取java Beans。 我的目标是从主报表