当前位置: 首页 > 知识库问答 >
问题:

将AWS SQS连接到Apache-Flink

漆雕育
2023-03-14

为什么AWS SQS不是Apache Flink的默认连接器?这样做有技术限制吗?还是只是一些没有完成的事情?我想实现这一点,任何指点都将不胜感激

共有2个答案

田向荣
2023-03-14

对于原始问题的答案可能太晚了...我使用SQS的Java消息服务库将SQS消费者编写为SourceFunction:

SQSConsumer extends RichParallelSourceFunction<String> {
   private volatile boolean isRunning;
   private transient AmazonSQS sqs;
   private transient SQSConnectionFactory connectionFactory;
   private transient ExecutorService consumerExecutor;

   @Override
   public void open(Configuration parameters) throws Exception {
      String region = ...
      AWSCredentialsProvider credsProvider = ...
      // may be use a blocking array backed thread pool to handle surges?
      consumerExecutor = Executors.newCachedThreadPool();
      ClientConfiguration clientConfig = PredefinedClientConfigurations.defaultConfig();
      this.sqs = AmazonSQSAsyncClientBuilder.standard().withRegion(region).withCredentials(credsProvider)
            .withClientConfiguration(clientConfig)
            .withExecutorFactory(()->consumerExecutor).build();
      this.connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), sqs);
      this.isRunning = true;
   }

   @Override
   public void run(SourceContext<String> ctx) throws Exception {
      SQSConnection connection = connectionFactory.createConnection();
      // ack each msg explicitly
      Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);
      Queue queue = session.createQueue(<queueName>);
      MessageConsumer msgConsumer = session.createConsumer(queue);
      msgConsumer.setMessageListener(msg -> {
          try {
              String msgId = msg.getJMSMessageID();
              String evt = ((TextMessage) msg).getText();
              ctx.collect(evt);
              msg.acknowledge();
          } catch (JSMException e) {
              // log and move on the next msg or bail with an exception
              // have a dead letter queue is configured so this message is not lost
              // msg is not acknowledged so it may be picked up again by another consumer instance
          }
      };
      // check if we were canceled
      if (!isRunning) {
          return;
      }
      connection.start();
      while (!consumerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
          // keep waiting
      }
  }
            

  @Override
  public void cancel() {
      isRunning = false;
      // this method might be called before the task actually starts running
      if (sqs != null) {
          sqs.shutdown();
      }
      if(consumerExecutor != null) {
           consumerExecutor.shutdown();
           try {
               consumerExecutor.awaitTermination(1, TimeUnit.MINUTES); 
           } catch (Exception e) {
               //log e
           }
      }
   }

   @Override
   public void close() throws Exception {
       cancel();
       super.close();
   }
}

注意:如果您使用的是标准SQS队列,则可能需要根据是否需要精确的一次保证对消息进行重复数据消除。

参考:使用JMS和Amazon SQS

连厉刚
2023-03-14

目前,Apache Flink中没有用于AWS SQS的连接器。查看现有的连接器。我想你已经知道了这一点,并想给出一些建议。我最近还在寻找一个SQS连接器,发现了这个邮件线程

Apache Kinesis连接器在某种程度上类似于您可以在此上实现的。查看是否可以使用此连接器开始此操作。

 类似资料:
  • 我是nifi的新手,我想将SQL server数据库连接到nifi,并用处理器创建数据流。我怎样才能做到这一点,有没有人能帮我弄清楚这一点。 事先谢谢山姆

  • 在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。

  • 我对flink/Java/Scala还比较陌生,所以这可能不是问题,但非常感谢您的帮助。我还没有找到一个将Flink Kafka连接器与Flink 1.13结合使用的示例(对我适用)。 我的项目在这里:https://github.com/sysarcher/flink-scala-tests 我想我无法使用我想试用的FlinkKafkaConsumer(链接)。 我正在使用IntelliJ Id

  • 我有一个正在运行的Ignite集群,并且我使用进行节点发现: 它工作得很好,我可以使用节点连接到这个集群。 null

  • 问题内容: 我需要使用angular + bootstrap创建一个带有日期和时间的输入字段。我发现这个日期时间选择器看起来确实很需要我- 日期和时间在一个字段中,并且阻止了用户错误的版本。我写了一条指令,启动了datepickers,但是它改变了视图,并且模型没有改变……我也尝试了onSelect,但是也没有任何反应 js 如何解决?建立联系? 问题答案: 因此,问题是: 改变每在元件; 从元素

  • 问题内容: 如何使用SQLAlchemy连接到MS Access?在他们的网站上,它说连接字符串是access + pyodbc。这是否意味着我需要连接pyodbc?由于我是新手,请保持温柔。 问题答案: 从理论上讲,这将通过create_engine(“ access:/// some_odbc_dsn”)进行,但是自从SQLAlchemy 0.5以来,Access后端就一直没有使用过,并且尚不