当前位置: 首页 > 知识库问答 >
问题:

Apache Spark Kinesis集成:已连接,但未收到记录

王高超
2023-03-14
    null

    -------------------------------------------
    Time: 1448645109000 ms
    -------------------------------------------

    15/11/27 17:25:09 INFO JobScheduler: Finished job streaming job 1448645109000 ms.0 from job set of time 1448645109000 ms
    15/11/27 17:25:09 INFO KinesisBackedBlockRDD: Removing RDD 102 from persistence list
    15/11/27 17:25:09 INFO JobScheduler: Total delay: 0.002 s for time 1448645109000 ms (execution: 0.001 s)
    15/11/27 17:25:09 INFO BlockManager: Removing RDD 102
    15/11/27 17:25:09 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[102] at createStream at NewClass.java:25 of time 1448645109000 ms
    15/11/27 17:25:09 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1448645107000 ms)
    15/11/27 17:25:09 INFO InputInfoTracker: remove old batch metadata: 1448645107000 ms
    15/11/27 17:25:10 INFO JobScheduler: Added jobs for time 1448645110000 ms
    15/11/27 17:25:10 INFO JobScheduler: Starting job streaming job 1448645110000 ms.0 from job set of time 1448645110000 ms
    -------------------------------------------
    Time: 1448645110000 ms
    -------------------------------------------
          <----- Some data expected to show up here!
    15/11/27 17:25:10 INFO JobScheduler: Finished job streaming job 1448645110000 ms.0 from job set of time 1448645110000 ms
    15/11/27 17:25:10 INFO JobScheduler: Total delay: 0.003 s for time 1448645110000 ms (execution: 0.001 s)
    15/11/27 17:25:10 INFO KinesisBackedBlockRDD: Removing RDD 103 from persistence list
    15/11/27 17:25:10 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[103] at createStream at NewClass.java:25 of time 1448645110000 ms
    15/11/27 17:25:10 INFO BlockManager: Removing RDD 103
    15/11/27 17:25:10 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1448645108000 ms)
    15/11/27 17:25:10 INFO InputInfoTracker: remove old batch metadata: 1448645108000 ms
    15/11/27 17:25:11 INFO JobScheduler: Added jobs for time 1448645111000 ms
    15/11/27 17:25:11 INFO JobScheduler: Starting job streaming job 1448645111000 ms.0 from job set of time 1448645111000 ms


    15/11/27 17:23:26 INFO KinesisInputDStream: metadataCleanupDelay = -1
    15/11/27 17:23:26 INFO KinesisInputDStream: Slide time = 1000 ms
    15/11/27 17:23:26 INFO KinesisInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
    15/11/27 17:23:26 INFO KinesisInputDStream: Checkpoint interval = null
    15/11/27 17:23:26 INFO KinesisInputDStream: Remember duration = 1000 ms
    15/11/27 17:23:26 INFO KinesisInputDStream: Initialized and validated org.apache.spark.streaming.kinesis.KinesisInputDStream@74b21a6

源代码(java):


    public class NewClass {

        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setAppName("appname").setMaster("local[3]");
            JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
            JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream(
                    ssc, "webassist-test", "test", "https://kinesis.us-west-1.amazonaws.com", "us-west-1",
                    InitialPositionInStream.LATEST,
                    new Duration(20000),
                    StorageLevel.MEMORY_AND_DISK_2()
            );
            kinesisStream.print();
            ssc.start();
            ssc.awaitTermination();
        }
    }

Python代码(以前尝试pprinting和发送到MongoDB):


    from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    from pyspark import SparkContext, StorageLevel
    from pyspark.streaming import StreamingContext
    from sys import argv

    sc = SparkContext(appName="webassist-test")
    ssc = StreamingContext(sc, 5)

    stream = KinesisUtils.createStream(ssc,
         "appname",
         "test",
         "https://kinesis.us-west-1.amazonaws.com",
         "us-west-1",
         InitialPositionInStream.LATEST,
         5,
         StorageLevel.MEMORY_AND_DISK_2)

    stream.pprint()
    ssc.start()
    ssc.awaitTermination()

注意:我还尝试用stream.foreachrdd(lambda rdd:rdd.foreachpartition(send_partition))向MongoDB发送数据,但没有将其粘贴在这里,因为您需要一个MongoDB实例,而且这与问题无关--输入中已经没有记录了。

leaseKey  checkpoint  leaseCounter  leaseOwner  ownerSwitchesSinceCheckpoint
shardId-000000000000  LATEST  614  localhost:d92516...  8
spark-submit --executor-memory 1024m --master spark://IpAddress:7077 /path/test.py
 Input Rate
   Receivers: 1 / 1 active
   Avg: 0.00 events/sec
 KinesisReceiver-0
   Avg: 0.00 events/sec
...
 Completed Batches (last 76 out of 76)

谢谢你的帮助!

共有1个答案

易招
2023-03-14

在过去与Kinesis连接时,我曾遇到过Spark Streaming中没有显示记录活动的问题。

我会尝试这些东西来从Spark获得更多的反馈/不同的行为:

>

  • 请确保使用foreachRDD、print、saveas等输出操作强制计算DStream转换操作...

    Python示例

    另一个选择是在不同的集群中运行Spark Streaming应用程序并进行比较。

    附注:我目前在不同的集群中使用Spark Streaming 1.5.2和Kinesis,它按照预期处理记录/显示活动。

  •  类似资料:
    • 你好,我一直在使用Spring Kafka活页夹作为消费者。通过查看日志,我能够连接到主题,尽管我不确定它为什么不处理来自制作人的任何消息。 你知道可能遗漏了什么吗?非常感谢。 聚甲醛 应用程序YML 消费者阶层 侦听器类 日志 从日志中可以看到,它能够连接到主题。虽然我不确定为什么我没有收到来自生产者的任何消息。是因为分区被撤销吗?这与为什么我没有收到任何消息有关吗?生产者来自第三方,他需要做些

    • 我配置了一个基于Web服务的入站消息传递网关。我想记录传入的SOAP消息(信封和里面的所有消息)。最好的方法是什么? 我曾尝试使用带有日志通道适配器的有线抽头,但不知道一个好的表达式值来获取实际的SOAP XML。如果入站网关配置为不提取有效负载,则我将SaajSoapMessage视为有效负载,否则将DOMSource视为有效负载。是否有一个表达式将SaajSoapMessage作为XML字符串

    • 问题内容: 我有一个带有redis的laravel(5.3)应用程序,用于会话(使用predis)。只要我使用单个redis节点(使用 config / database.php中的 默认方法),一切都可以正常工作。一旦我切换到Redis集群,尽管我开始像50%的时间一样出现MOVED错误(基于谷歌搜索,我知道这应该由predis管理,但不是这样)。 我尝试将cluster参数更改为true,但是

    • 我正在使用坎内尔向短信发送短信: > < li> 发送包含一部分的短信( 发送多部分(长)短信息是: A. if:由Kannel拆分,零件被ACKed并交付(作为唯一的SMS)。 B.如果:由 Kannel 拆分,并且部件已 ACK 化,但未被删除。 此外,发送到目的地号码的任何后续短信,只要遇到这种情况[2.B],也会得到确认,但不会发送,即使它是由1部分组成的短信! 有人有过类似的经历吗? 由

    • 我正在尝试创建一个TCP服务器,该服务器在端口5002上接受来自外部程序的消息。但是,它不接收来自外部程序的消息。 为了验证我的TCP服务器是否正常工作,我像这样使用了telnet,程序确实收到了文本“hello”。 设置wireshark时,我可以看到计算机正在端口5002上接收来自外部程序(我期待)的消息。为什么我的程序无法接收这些消息? 关于最终解决方案的最新情况: 由于负载没有停止线,我必

    • 我在实现UDP连接时遇到了麻烦,因为当我在局域网内尝试它时,它是有效的,但是当NAT内部的人试图连接到公共服务器地址时,它会失败,因为从服务器作为响应发送的数据包永远不会到达客户端。 我的协议如下: 客户端A向服务器发送一个字节作为连接请求 服务器B为客户端创建一个新的套接字,并从那里向recvfrom()调用中报告的客户端端口响应一个字节。永远不会联系到客户 我也试过: 执行许多调用,每个调用在

    • 我正在运行一个POC来查看将我们的j2ee应用程序迁移到Logback的影响。我花了一些时间在官方网站上,显然,除了新的JAR之外,唯一的变化就是logback.xml文件。不幸的是,这似乎还不够,部署工作了,日志文件也创建了,但没有记录任何内容(空)。 我的代码有以下语句 pom.xml现在具有以下内容 xml(使用官方网站上的web实用程序创建) 这让我想到了weblogic中的log4j。

    • 我对Kafka connect很陌生。我想把我的信息从Kafka主题推到弹性搜索。在阅读了可用的文档之后,我从发行版tar下载并编译了弹性搜索接收器。拉链(https://github.com/confluentinc/kafka-connect-elasticsearch/releases) 我添加了弹性搜索属性文件,并将上述jar包含在类路径中。当我在独立模式下运行kafka connect时