:)
我已经在一个(奇怪的)情况中结束了自己,简单地说,我不想使用来自Kafka的任何新记录,因此暂停主题中所有分区的sparkStreaming消费(InputStream[ConsumerRecord]),执行一些操作,最后,恢复消费记录。
首先这可能吗?
我一直在尝试这样的事情:
var consumer: KafkaConsumer[String, String] = _
consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(java.util.Arrays.asList(topicName))
consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())
但是我得到了这个:
println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]
任何帮助我理解我遗漏了什么,以及为什么当消费者明确分配了分区时我会得到空结果的帮助都将受到欢迎!
版本:Kafka:0.10火花:2.3。0斯卡拉:2.11。8.
是的,可以在代码中添加检查点并传递持久存储(本地磁盘、S3、HDFS)路径
每当您开始/恢复工作时,它将从检查点获取带有消费者偏移的Kafka消费者组信息,并从停止的地方开始处理。
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
Spark Check-=定点不仅是用于保存偏移量的机制,而且还用于保存阶段和作业的DAG的序列化状态。因此,每当您使用新代码重新启动作业时,它都会
现在,从磁盘读取数据只是Spark加载Kafka偏移量、DAG和旧的未完成处理数据所需的一次性操作。
一旦完成,它将始终按照默认或指定的检查点间隔将数据保存到磁盘。
Spark streaming提供了指定Kafka组id的选项,但Spark structured stream没有。
问题内容: 我注意到,有很多主题是有关使用暂停/恢复MP3的,因此为了帮助所有人,我专门为此设计了整个课堂!请参阅下面的答案。 注意:这是供我个人使用的,因此它可能不如某些人希望的那样健壮。但是由于其简单性,进行简单的修改并不难。 问题答案: 播放器的一个非常简单的实现,实际上是暂停播放。它通过使用单独的线程播放流并告诉播放器线程是否/何时暂停和继续工作来工作。
我相信答案是否定的,但是Twilio提供暂停/恢复录音的能力吗?用例是记录一个呼叫,但在收集敏感信息时暂停记录。从REST文档来看,它似乎不是一个受支持的功能。我想有人可能已经为这个要求找到了一些选择。
我要做的是暂停< code>KafkaConsumer,如果在使用消息的过程中出现错误。 这是我写的 然后我写了一个REST服务来恢复消费者 现在,我有两个问题。第一个问题:当我打电话给消费者时。来自<code>@KafkaListener</code>注释方法的pause()会发生什么?消费者立即暂停,或者我可以接收到同一主题分区的其他偏移量上的其他消息。例如,我有偏移量为3的“message1
问题内容: 我有一个基本的Swing UI,带有一个标记为“播放”的按钮。按下按钮后,标签变为“暂停”。现在,当按下按钮时,它变为“继续”。 在“播放”中,我将实例化并执行一个SwingWorker。我想要的是能够暂停该线程(不要取消该线程),并根据上述按钮按下来恢复它。但是,我不想在doInBackground()中求助于Thread.sleep()。这似乎有点骇人听闻。有什么方法可以阻止运行d
有些情况下,例如爬取大的站点,我们希望能暂停爬取,之后再恢复运行。 Scrapy通过如下工具支持这个功能: 一个把调度请求保存在磁盘的调度器 一个把访问请求保存在磁盘的副本过滤器[duplicates filter] 一个能持续保持爬虫状态(键/值对)的扩展 Job 路径 要启用持久化支持,你只需要通过 JOBDIR 设置 job directory 选项。这个路径将会存储 所有的请求数据来保持一
我已经实现了Kafka消费者,现在我有了一个场景。 从Kafka流2.2.5中读取数据。通过Srpingboot发布 加载数据库表1 将数据从表1复制到表2 清理桌子1 要执行上述操作,我需要使用quartz的调度作业(已编写)暂停/恢复Kafka使用者,该作业将数据从表1复制到表2。但是在这个活动中,我希望我的Kafka听众暂停,一旦复制完成,它应该继续。 我的实施: