当前位置: 首页 > 知识库问答 >
问题:

Amazon Kinesis KCL客户端,适用于不在. NET中工作的消费者

阳长恨
2023-03-14

请帮忙。我在为中的Kinesis数据流设置消费者时遇到问题。NET控制台应用程序。

我已经按照留档做了所有的事情,但是每当我运行消费者时,我仍然会得到一个空白的控制台屏幕。到目前为止,生产者工作正常,AWS凭据也在工作。

  1. 我的系统上的JDK配置良好(对Java开发来说并不新鲜)
  2. 我有所有必要的政策附加到我的IAM用户
  3. 我可以看到生产者可以使用相同的AWS凭据以编程方式创建流、desc流等

我可以在创建KclProcess时点击程序中的断点,但我不能点击下面Kinesis Test类中的任何断点

至于消费者,我已经创建了一个类程序。cs如下:

class Program  
{

    public static void Main(string[] args)
    {
        //added these lines after trying everything
        Environment.SetEnvironmentVariable("AWS_ACCESS_KEY_ID", "***");
        Environment.SetEnvironmentVariable("AWS_SECRET_ACCESS_KEY", "***");
        Environment.SetEnvironmentVariable("AWS_REGION", "us-east-1");

        try
        {
            KclProcess.Create(new KinesisTest()).Run();
        }
        catch (Exception e)
        {
            Console.Error.WriteLine("ERROR: " + e);
        }
    }

}

和另一类

public class KinesisTest: IRecordProcessor
{

    private static readonly TimeSpan Backoff = TimeSpan.FromSeconds(3);
    private static readonly TimeSpan CheckpointInterval = TimeSpan.FromMinutes(1);
    private static readonly int NumRetries = 10;

    /// <value>The shard ID on which this record processor is working.</value>
    private string _kinesisShardId;

    private DateTime _nextCheckpointTime = DateTime.UtcNow;


    public void Initialize(InitializationInput input)
    {
        Console.Error.WriteLine("Initializing record processor for shard: " + input.ShardId);
        this._kinesisShardId = input.ShardId;
    }

    public void ProcessRecords(ProcessRecordsInput input)
    {
        Console.Error.WriteLine("Processing " + input.Records.Count + " records from " + _kinesisShardId);
        ProcessRecordsWithRetries(input.Records);

        // Checkpoint once every checkpoint interval.
        if (DateTime.UtcNow >= _nextCheckpointTime)
        {
            Checkpoint(input.Checkpointer);
            _nextCheckpointTime = DateTime.UtcNow + CheckpointInterval;
        }
    }

    public void Shutdown(ShutdownInput input)
    {
        Console.Error.WriteLine("Shutting down record processor for shard: " + _kinesisShardId);
        // Checkpoint after reaching end of shard, so we can start processing data from child shards.
        if (input.Reason == ShutdownReason.TERMINATE)
        {
            Checkpoint(input.Checkpointer);
        }
    }

    private void ProcessRecordsWithRetries(List<Record> records)
    {
        foreach (Record rec in records)
        {
            bool processedSuccessfully = false;
            string data = null;
            for (int i = 0; i < NumRetries; ++i)
            {
                try
                {
                    data = System.Text.Encoding.UTF8.GetString(rec.Data);

                    Console.Error.WriteLine( String.Format("Retrieved record:\n\tpartition key = {0},\n\tsequence number = {1},\n\tdata = {2}", rec.PartitionKey, rec.SequenceNumber, data));

                    // Your own logic to process a record goes here.

                    processedSuccessfully = true;
                    break;
                }
                catch (Exception e)
                {
                    Console.Error.WriteLine("Exception processing record data: " + data, e);
                }

                //Back off before retrying upon an exception.
                Thread.Sleep(Backoff);
            }

            if (!processedSuccessfully)
            {
                Console.Error.WriteLine("Couldn't process record " + rec + ". Skipping the record.");
            }
        }
    }

    private void Checkpoint(Checkpointer checkpointer)
    {
        Console.Error.WriteLine("Checkpointing shard " + _kinesisShardId);

        checkpointer.Checkpoint(RetryingCheckpointErrorHandler.Create(NumRetries, Backoff));
    }
}

最后是kcl。属性文件:

executableName = dotnet KinesisTest.dll

streamName = testStream

applicationName = KinesisTest

AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

processingLanguage = C#

initialPositionInStream = TRIM_HORIZON

regionName = us-east-1

maxRecords = 5000

idleTimeBetweenReadsInMillis = 1000

# failoverTimeMillis = 10000
# workerId =
# shardSyncIntervalMillis = 60000
# callProcessRecordsEvenForEmptyRecordList = false
# parentShardPollIntervalMillis = 10000
# cleanupLeasesUponShardCompletion = true
# taskBackoffTimeMillis = 500
# metricsBufferTimeMillis = 10000
# metricsMaxQueueSize = 10000
# validateSequenceNumberBeforeCheckpointing = true
# maxActiveThreads = 0

如果我做错了什么,请告诉我。

我希望看到消费者处理流中的数据,但它只是一个空控制台

共有1个答案

龚玄天
2023-03-14

虽然我从未找到这个问题的答案,但我找到了一种更好、更合适的方法来使用lambdas实现这一点。

我的最终设置涉及使用SNS从Kinesis获取消息/事件,然后将消息散开给任何订阅者(在我的情况下是SQS队列),然后订阅者将排队的消息提供给lambdas。

类似这样:

                             SQS Queue ----------------> Lambda
                             ^
                            /
                           /
                          /
KINESIS STREAM ------->SNS-------->SQS Queue ------> Lambda
                          \
                           \
                            \
                             >SQS Queue ----------------> Lambda

您可能会问,为什么不直接向SQS进行动觉?或者,为什么SNS不能直接向Lambda发送?

第一个问题的答案是Kinesis不是为多次阅读而构建的。SNS允许您阅读一次,并根据需要将其分发给尽可能多的订阅者。如果你有20个听众在等待对一个事件采取行动,你不能将他们都插入Kinesis。所以去SNS吧。

第二个问题的答案是,SNS有时会丢失消息,而不会传递或触发lambda(非常正确)。然而(根据经验推测),如果将其连接到SQS,这种可能性会降低,它几乎总是落在队列中。然后,队列可以触发lambda。。反正他们只有一份工作。

所以我希望这能帮助到某人。

 类似资料:
  • 我有一个连接到WebSocket服务器并从服务器接收消息的微服务。我想在这个应用程序中集成Sleuth,因为这是我的一组微服务的网关。但是当我连接到WebSocket服务器并开始接收消息时,我无法在日志中跟踪ID,spanid。我期待着这样的事情: 但我得到了: 我的代码如下: 我们的pom条目包括:

  • 我在kafka消费者文档中看到了这个注释-

  • 问题内容: 我正在开发一个项目,该项目需要解析一些受保护的网页中的数据。为了获得对这些页面的访问权限,我必须克服SAML身份验证形式(Shibboleth)。是否有人能够在Android(Java)中实现此标准?我已经读过这个线程:Android的SAML客户端实现? 但这并不能给我一个很好的解决方案。实际上,我需要 获取某些受保护网页的数据 以便对其进行解析,而不是让用户看到此类网页的内容。因此

  • 问题内容: 有人知道iOS的Elasticsearch客户端库吗?如果它也是迅速写的,那将是一个好处。 elasticsearch的“客户端”部分显示了多个平台的多个库,但对于iOS没有任何显示,我觉得有人必须这样做吗? 干杯 问题答案: 我怀疑是否有人- 上次我检查时没有,并且有充分的理由。请记住,为了允许IOS客户端(或Android)使用客户端库连接到Elasticsearch,您必须打开集

  • 我有一个wsdl: 我想提交信息以获得回应。我创建了client.php如下: 但它在浏览器中显示错误: SoapFault对象([消息:受保护]= 我错在哪里?对此,可能的解决方案是什么? 编辑: 我已经创建了一个php文件:client。php 但它产生了这个错误: 调用错误:响应不是文本/xml类型:应用程序/wsdl xmlHTTP/1.1 200确定日期:星期二,9月17日2013 15

  • 我正试图通过hbc-twitter4j-v3获得推文。示例代码为:https://github.com/twitter/hbc/blob/master/hbc-example/src/main/java/com/twitter/hbc/example/Twitter4jSampleStreamExample.java为了在代理上启用身份验证,我还设置了主机、端口和身份验证的系统属性。但它显示出以下