当前位置: 首页 > 知识库问答 >
问题:

Spark Streaming Kafka流批量执行

穆浩皛
2023-03-14

我是spark streaming的新手,我有一个关于其用法的一般性问题。我目前正在实现一个应用程序,它从一个Kafka主题流式传输数据。

使用应用程序只运行一次批处理是一种常见的场景吗,例如,一天结束,收集主题中的所有数据,做一些聚合和转换等等?

这意味着在用spark-submit启动应用程序后,所有这些东西将在一批中执行,然后应用程序将被关闭。或者spark stream build是为了在连续的批处理中运行无尽的和永久的数据流?

共有1个答案

姬雪松
2023-03-14

您可以使用kafka-stream api,并固定一个窗口时间,一次只对主题中的事件执行一个批处理的聚合和转换。有关窗口化的移动信息,请查看https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#windowing

 类似资料:
  • 本文向大家介绍如何执行批量插入?相关面试题,主要包含被问及如何执行批量插入?时的应答技巧和注意事项,需要的朋友参考一下 首先,创建一个简单的 insert 语句:   然后在 java 代码中像下面这样执行批处理插入:

  • 以下是使用对象进行批处理的典型步骤顺序 - 使用占位符创建SQL语句。 使用方法创建对象。 使用将自动提交设置为。 使用方法在创建的对象上添加SQL语句到批处理中。 在创建的对象上使用方法执行所有SQL语句。 最后,使用方法提交所有更改。 此示例代码是基于前面章节中完成的环境和数据库设置编写的。 以下代码片段提供了使用对象的批量更新示例,将下面代码保存到文件:BatchingWithPrepare

  • 以下是使用对象的批处理的典型步骤序列 - 使用方法创建对象。 使用将自动提交设置为。 使用方法在创建的对象上添加SQL语句到批处理中。 在创建的对象上使用方法执行所有SQL语句。 最后,使用方法提交所有更改。 此示例代码是基于前面章节中完成的环境和数据库设置编写的。 以下代码片段提供了使用对象的批量更新示例,将下面代码保存到文件:BatchingWithStatement.java - 编译上面代

  • 使用telnet也可以连接redis-server。并且在脚本中使用nc命令进行redis操作也是很有效的: gnuhpc@gnuhpc:~$ (echo -en "ping\r\nset key abc\r\nget key\r\n";sleep 1) | nc 127.0.0.1 6379 +PONG +OK $3 abc 另一个方式是使用pipeline: 在一个脚本中批量执行多个写入操作:

  • BatchResult partialAllowedBatch(BatchRequest request) 功能 一般的batch请求,当表quota不足时会全部拒绝;而partialAllowedBatch允许在quota满足的范围内,部分记录可以执行, 通过返回值告知用户哪些记录执行成功,哪些记录没有执行, 若当前quota不满足甚至1条记录执行,会抛出THROUGHPUT_EXCEED异常;

  • 我需要使用服务帐户执行数据流作业,下面是同一平台中提供的一个非常简单和基本的wordcount示例。 根据这一点,GCP要求服务号具有数据流工作者的权限,以便执行我的作业。即使我已经设置了所需的权限,错误仍然出现时,堰部分会出现: 有人能解释这种奇怪的行为吗?太感谢了