为什么AWS SQS不是Apache Flink的默认连接器?这样做有技术限制吗?还是只是一些没有完成的事情?我想实现这一点,任何指点都将不胜感激
对于原始问题的答案可能太晚了...我使用SQS的Java消息服务库将SQS消费者编写为SourceFunction:
SQSConsumer extends RichParallelSourceFunction<String> {
private volatile boolean isRunning;
private transient AmazonSQS sqs;
private transient SQSConnectionFactory connectionFactory;
private transient ExecutorService consumerExecutor;
@Override
public void open(Configuration parameters) throws Exception {
String region = ...
AWSCredentialsProvider credsProvider = ...
// may be use a blocking array backed thread pool to handle surges?
consumerExecutor = Executors.newCachedThreadPool();
ClientConfiguration clientConfig = PredefinedClientConfigurations.defaultConfig();
this.sqs = AmazonSQSAsyncClientBuilder.standard().withRegion(region).withCredentials(credsProvider)
.withClientConfiguration(clientConfig)
.withExecutorFactory(()->consumerExecutor).build();
this.connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), sqs);
this.isRunning = true;
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
SQSConnection connection = connectionFactory.createConnection();
// ack each msg explicitly
Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE);
Queue queue = session.createQueue(<queueName>);
MessageConsumer msgConsumer = session.createConsumer(queue);
msgConsumer.setMessageListener(msg -> {
try {
String msgId = msg.getJMSMessageID();
String evt = ((TextMessage) msg).getText();
ctx.collect(evt);
msg.acknowledge();
} catch (JSMException e) {
// log and move on the next msg or bail with an exception
// have a dead letter queue is configured so this message is not lost
// msg is not acknowledged so it may be picked up again by another consumer instance
}
};
// check if we were canceled
if (!isRunning) {
return;
}
connection.start();
while (!consumerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
// keep waiting
}
}
@Override
public void cancel() {
isRunning = false;
// this method might be called before the task actually starts running
if (sqs != null) {
sqs.shutdown();
}
if(consumerExecutor != null) {
consumerExecutor.shutdown();
try {
consumerExecutor.awaitTermination(1, TimeUnit.MINUTES);
} catch (Exception e) {
//log e
}
}
}
@Override
public void close() throws Exception {
cancel();
super.close();
}
}
注意:如果您使用的是标准SQS队列,则可能需要根据是否需要精确的一次保证对消息进行重复数据消除。
参考:使用JMS和Amazon SQS
目前,Apache Flink中没有用于AWS SQS的连接器。查看现有的连接器。我想你已经知道了这一点,并想给出一些建议。我最近还在寻找一个SQS连接器,发现了这个邮件线程。
Apache Kinesis连接器在某种程度上类似于您可以在此上实现的。查看是否可以使用此连接器开始此操作。
我是nifi的新手,我想将SQL server数据库连接到nifi,并用处理器创建数据流。我怎样才能做到这一点,有没有人能帮我弄清楚这一点。 事先谢谢山姆
在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。
我对flink/Java/Scala还比较陌生,所以这可能不是问题,但非常感谢您的帮助。我还没有找到一个将Flink Kafka连接器与Flink 1.13结合使用的示例(对我适用)。 我的项目在这里:https://github.com/sysarcher/flink-scala-tests 我想我无法使用我想试用的FlinkKafkaConsumer(链接)。 我正在使用IntelliJ Id
我有一个正在运行的Ignite集群,并且我使用进行节点发现: 它工作得很好,我可以使用节点连接到这个集群。 null
问题内容: 我需要使用angular + bootstrap创建一个带有日期和时间的输入字段。我发现这个日期时间选择器看起来确实很需要我- 日期和时间在一个字段中,并且阻止了用户错误的版本。我写了一条指令,启动了datepickers,但是它改变了视图,并且模型没有改变……我也尝试了onSelect,但是也没有任何反应 js 如何解决?建立联系? 问题答案: 因此,问题是: 改变每在元件; 从元素
问题内容: 如何使用SQLAlchemy连接到MS Access?在他们的网站上,它说连接字符串是access + pyodbc。这是否意味着我需要连接pyodbc?由于我是新手,请保持温柔。 问题答案: 从理论上讲,这将通过create_engine(“ access:/// some_odbc_dsn”)进行,但是自从SQLAlchemy 0.5以来,Access后端就一直没有使用过,并且尚不