当前位置: 首页 > 知识库问答 >
问题:

使用线程在消费流中使用Flink producer in循环运行Flink消费程序

颜德馨
2023-03-14

我希望flink consumer stream中的每条消息都能使用flink kafka producer生成多条消息,每条消息都通过一个单独的线程指向Kafka中的某个主题。我正在用Scala编写程序,但用Java就可以了

类似这样:

def thread(x:String): Thread =
    {   
    val thread_ = new Thread {
          override def run {

              val str = some_processing(x)
              flink_producer(str)

      }
    }
    return thread_
 }

val stream = flink_consumer()

stream.map(x =>{

                 var i = 0
                 while(i < 10){

                                val th = thread(x)
                                th.start()
                                i = i+1
                                }

           })

因此,对于flink消费者中的每个输入,我希望使用多线程向其他队列生成10条消息。

共有1个答案

袁青青
2023-03-14

大多数Flink操作符都是并行操作符,因此您没有理由在数据管道中创建任何类型的线程,Flink应该是管理一个操作符可以存在多少个并行实例的操作符,如果您想设置该值,应该使用以下API方法。

<代码>。setParallelism(N)//N对您来说是10,

您可以在Fink文档中获得更多信息

你应该这样做:

  1. 向群集配置中添加更多任务管理器插槽

您的代码应如下所示:

val stream = flink_consumer()

stream.flatMap((x, out) =>{
                 var i = 0
                 while(i < 10){
                      val valueToCollect = process(x,i)
                      out.collect(valueToCollect)
                 }

           }).setParallelism(10)
           .map(doSomethingWithGeneratedValues)
           .addSink(sinkThatSendsDataToYourDesiredSystem)

如果您知道要有多少个并行任务,请再进行一次尝试

val stream = flink_consumer()

val resultStream = stream.map(process)
val sinkStream = resultStream.union(resultStream,resultStream,resultStream,...) // joins resultStream N times
sinkStream.addSink(sinkThatSendsDataToYourDesiredSystem)

最后,您还可以为一个数据流设置多个接收器

val stream = flink_consumer()

val resultStream = stream.map(process)
resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)
...
N
...
resultStream.addSink(sinkThatSendsDataToYourDesiredSystem)

如果要对数据接收器执行并行写入,必须确保所使用的接收器支持此类写入操作。

 类似资料:
  • Kafka的doc给出了一种方法,大约用以下描述: 每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者 我的代码: 但它不起作用,引发了一个异常: JAVAutil。ConcurrentModificationException:KafkaConsumer对于多线程访问不安全 此外,我还阅读了Flink(一个用于分布式流和批处理数据的开源平台)的源代码。Flink使用多线程消费程序与我

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我的Ruby项目中有一个模型重发,它包含内容和状态列。 使用EventMachine使用状态为0的所有记录的最佳/最快方式是什么? 我想创建一个简单的worker,它尝试在每个时段(比如每5分钟)查找status==0的记录 我对EventMachine还是新手,找不到那么多关于如何处理DB的例子。 到目前为止,我做了如下工作,但不确定这是否是最好的实现: 任何帮助都将不胜感激

  • 问题内容: 我对于如何使用特定的生产者-消费者模式感到困惑,在该模式中,生产者和消费者都可以同时并独立地进行操作。 首先,考虑以下示例,该示例紧随docs中的示例: 关于此脚本,有一个更详细的细节:通过常规的for循环将项目同步放入队列。 我的目标是创建一个使用(或)和的脚本。两者都应安排为同时运行。没有一个消费者协程明确地与生产者绑定或链接。 我如何修改上面的程序,以便生产者是可以与消费者/工人

  • 我正在使用Spring Kafka消费者。我已将并发设置为10,并创建了5个消费者(用于5个主题)。所以有50个Spring Kafka消费者线程。 Kafka消费者可以使用的最大线程数是多少?如何增加此线程池的大小?我查阅了spring文档,但没有发现任何相关内容。

  • 问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别