我希望有一个包含大量Lambda消费者的Kinesis流(不同的Lambda函数,而不仅仅是同一个函数的多个实例)。
Kinesis限制为每秒5次读取事务(docs)。Lambda函数将每秒轮询一个碎片。
这是否意味着,当我向流中添加第六个Lambda消费者时,我的读取应该受到限制?
我知道一个流上超过5个消费者可以使用新的增强扇出功能,但是我找不到任何关于这个新功能的Lambda函数的提及。他们只谈论KCL2。
相反,我发现了一些老文章,其中描述了他们自己的扇出实现,以避免落后于Lambda消费者,例如这篇文章。我想知道这是否仍然需要,或者Lambda消费者也可以利用新的增强扇出。
你能试试这样的吗?
/* in the parent lambda */
var consumers = [
{
id: "consumer",
eventData: someData // stream data inserted here
}
]
var params = {
ClientContext: "MyApp",
FunctionName: "MyFunction",
InvocationType: "Event",
LogType: "Tail",
Payload: null, // <Binary String>
Qualifier: "1"
};
let invoke = () => {
return new Promise((resolve, reject) => {
lambda.invoke(params, function(err, data) {
if (err) reject(err);
else resolve(data);
});
})
}
invokeArr = []
consumers.map((consumer) => {
params.Payload = consumer.eventData
invokeArr.push(invoke)
})
Promise.all(invokeArr).then((data) => console.log('were done'))
我们运行一个集群工作线程应用程序,该应用程序依赖于 Kafka 使用高级消费者 API 使用消息。群集中的所有节点共享同一个使用者组。现在我们想要的是将该逻辑的一部分迁移到 Kafka 流处理器 API。这里的方法是什么?如果分配了相同的 groupId/clientId,流拓扑是否会与现有使用者就消息进行斗争?我们应该分配不同的 groupId/clientId 吗?流式传输拓扑?说“组”。 “
我有一个注册了ActionListener的JTextField。当这个JTextField有焦点时,按下enter键,就会创建一个ActionEvent。 如何将此事件链接到另一个组件ActionListener(如JButton),以便运行其代码?
我以前认为设置我的消费者将始终收到他们尚未收到的消息,但最近我发现情况并非如此。这只在使用者尚未提交抵消时才起作用。在任何其他情况下,使用者将继续接收偏移大于其提交的最后偏移的消息。 由于我总是使用随机的组ID创建新的使用者,我意识到我的使用者“没有内存”,他们是新的使用者,并且他们永远不会提交偏移,因此策略将始终适用。我的疑虑就从这里开始了。假设以下场景: 我有两个客户端应用程序,A和B,每个客
我有1个消费者群体和5个消费者。也有5个分区,因此每个消费者得到1个分区。 CLI还显示 bin/Kafka-console-consumer . sh-bootstrap-server localhost:9092-Topic Topic-1-from-beginning-partition { n }正确显示每个分区的不同消息。 然而,我经常看到两个或两个以上的消费者在处理同一条信息,而且对于
我想从多个SQS队列中触发一个lambda函数。lambda将进行的大部分处理都是相同的,只是一个小步骤将基于表名。我不想为此保留两个单独的lambda。拥有相同/多个lambda的利弊是什么?
我是Kafka的初学者。我知道具有相同组id的多个消费者不能在一个主题中使用来自同一个分区的消息。我想知道如果来自一个消费组的多个Kafka消费者从一个分区读取相同的消息会发生什么,为什么这是一件坏事。 。