------------------------------------------- Time: 1448645109000 ms ------------------------------------------- 15/11/27 17:25:09 INFO JobScheduler: Finished job streaming job 1448645109000 ms.0 from job set of time 1448645109000 ms 15/11/27 17:25:09 INFO KinesisBackedBlockRDD: Removing RDD 102 from persistence list 15/11/27 17:25:09 INFO JobScheduler: Total delay: 0.002 s for time 1448645109000 ms (execution: 0.001 s) 15/11/27 17:25:09 INFO BlockManager: Removing RDD 102 15/11/27 17:25:09 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[102] at createStream at NewClass.java:25 of time 1448645109000 ms 15/11/27 17:25:09 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1448645107000 ms) 15/11/27 17:25:09 INFO InputInfoTracker: remove old batch metadata: 1448645107000 ms 15/11/27 17:25:10 INFO JobScheduler: Added jobs for time 1448645110000 ms 15/11/27 17:25:10 INFO JobScheduler: Starting job streaming job 1448645110000 ms.0 from job set of time 1448645110000 ms ------------------------------------------- Time: 1448645110000 ms ------------------------------------------- <----- Some data expected to show up here! 15/11/27 17:25:10 INFO JobScheduler: Finished job streaming job 1448645110000 ms.0 from job set of time 1448645110000 ms 15/11/27 17:25:10 INFO JobScheduler: Total delay: 0.003 s for time 1448645110000 ms (execution: 0.001 s) 15/11/27 17:25:10 INFO KinesisBackedBlockRDD: Removing RDD 103 from persistence list 15/11/27 17:25:10 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[103] at createStream at NewClass.java:25 of time 1448645110000 ms 15/11/27 17:25:10 INFO BlockManager: Removing RDD 103 15/11/27 17:25:10 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1448645108000 ms) 15/11/27 17:25:10 INFO InputInfoTracker: remove old batch metadata: 1448645108000 ms 15/11/27 17:25:11 INFO JobScheduler: Added jobs for time 1448645111000 ms 15/11/27 17:25:11 INFO JobScheduler: Starting job streaming job 1448645111000 ms.0 from job set of time 1448645111000 ms
15/11/27 17:23:26 INFO KinesisInputDStream: metadataCleanupDelay = -1 15/11/27 17:23:26 INFO KinesisInputDStream: Slide time = 1000 ms 15/11/27 17:23:26 INFO KinesisInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/11/27 17:23:26 INFO KinesisInputDStream: Checkpoint interval = null 15/11/27 17:23:26 INFO KinesisInputDStream: Remember duration = 1000 ms 15/11/27 17:23:26 INFO KinesisInputDStream: Initialized and validated org.apache.spark.streaming.kinesis.KinesisInputDStream@74b21a6
源代码(java):
public class NewClass { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("appname").setMaster("local[3]"); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream( ssc, "webassist-test", "test", "https://kinesis.us-west-1.amazonaws.com", "us-west-1", InitialPositionInStream.LATEST, new Duration(20000), StorageLevel.MEMORY_AND_DISK_2() ); kinesisStream.print(); ssc.start(); ssc.awaitTermination(); } }
Python代码(以前尝试pprinting和发送到MongoDB):
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream from pyspark import SparkContext, StorageLevel from pyspark.streaming import StreamingContext from sys import argv sc = SparkContext(appName="webassist-test") ssc = StreamingContext(sc, 5) stream = KinesisUtils.createStream(ssc, "appname", "test", "https://kinesis.us-west-1.amazonaws.com", "us-west-1", InitialPositionInStream.LATEST, 5, StorageLevel.MEMORY_AND_DISK_2) stream.pprint() ssc.start() ssc.awaitTermination()
注意:我还尝试用stream.foreachrdd(lambda rdd:rdd.foreachpartition(send_partition))
向MongoDB发送数据,但没有将其粘贴在这里,因为您需要一个MongoDB实例,而且这与问题无关--输入中已经没有记录了。
leaseKey checkpoint leaseCounter leaseOwner ownerSwitchesSinceCheckpoint shardId-000000000000 LATEST 614 localhost:d92516... 8
spark-submit --executor-memory 1024m --master spark://IpAddress:7077 /path/test.py
Input Rate
Receivers: 1 / 1 active
Avg: 0.00 events/sec
KinesisReceiver-0
Avg: 0.00 events/sec
...
Completed Batches (last 76 out of 76)
谢谢你的帮助!
在过去与Kinesis连接时,我曾遇到过Spark Streaming中没有显示记录活动的问题。
我会尝试这些东西来从Spark获得更多的反馈/不同的行为:
>
请确保使用foreachRDD、print、saveas等输出操作强制计算DStream转换操作...
Python示例
另一个选择是在不同的集群中运行Spark Streaming应用程序并进行比较。
附注:我目前在不同的集群中使用Spark Streaming 1.5.2和Kinesis,它按照预期处理记录/显示活动。
你好,我一直在使用Spring Kafka活页夹作为消费者。通过查看日志,我能够连接到主题,尽管我不确定它为什么不处理来自制作人的任何消息。 你知道可能遗漏了什么吗?非常感谢。 聚甲醛 应用程序YML 消费者阶层 侦听器类 日志 从日志中可以看到,它能够连接到主题。虽然我不确定为什么我没有收到来自生产者的任何消息。是因为分区被撤销吗?这与为什么我没有收到任何消息有关吗?生产者来自第三方,他需要做些
我配置了一个基于Web服务的入站消息传递网关。我想记录传入的SOAP消息(信封和里面的所有消息)。最好的方法是什么? 我曾尝试使用带有日志通道适配器的有线抽头,但不知道一个好的表达式值来获取实际的SOAP XML。如果入站网关配置为不提取有效负载,则我将SaajSoapMessage视为有效负载,否则将DOMSource视为有效负载。是否有一个表达式将SaajSoapMessage作为XML字符串
问题内容: 我有一个带有redis的laravel(5.3)应用程序,用于会话(使用predis)。只要我使用单个redis节点(使用 config / database.php中的 默认方法),一切都可以正常工作。一旦我切换到Redis集群,尽管我开始像50%的时间一样出现MOVED错误(基于谷歌搜索,我知道这应该由predis管理,但不是这样)。 我尝试将cluster参数更改为true,但是
我正在使用坎内尔向短信发送短信: > < li> 发送包含一部分的短信( 发送多部分(长)短信息是: A. if:由Kannel拆分,零件被ACKed并交付(作为唯一的SMS)。 B.如果:由 Kannel 拆分,并且部件已 ACK 化,但未被删除。 此外,发送到目的地号码的任何后续短信,只要遇到这种情况[2.B],也会得到确认,但不会发送,即使它是由1部分组成的短信! 有人有过类似的经历吗? 由
我正在尝试创建一个TCP服务器,该服务器在端口5002上接受来自外部程序的消息。但是,它不接收来自外部程序的消息。 为了验证我的TCP服务器是否正常工作,我像这样使用了telnet,程序确实收到了文本“hello”。 设置wireshark时,我可以看到计算机正在端口5002上接收来自外部程序(我期待)的消息。为什么我的程序无法接收这些消息? 关于最终解决方案的最新情况: 由于负载没有停止线,我必
我在实现UDP连接时遇到了麻烦,因为当我在局域网内尝试它时,它是有效的,但是当NAT内部的人试图连接到公共服务器地址时,它会失败,因为从服务器作为响应发送的数据包永远不会到达客户端。 我的协议如下: 客户端A向服务器发送一个字节作为连接请求 服务器B为客户端创建一个新的套接字,并从那里向recvfrom()调用中报告的客户端端口响应一个字节。永远不会联系到客户 我也试过: 执行许多调用,每个调用在
我正在运行一个POC来查看将我们的j2ee应用程序迁移到Logback的影响。我花了一些时间在官方网站上,显然,除了新的JAR之外,唯一的变化就是logback.xml文件。不幸的是,这似乎还不够,部署工作了,日志文件也创建了,但没有记录任何内容(空)。 我的代码有以下语句 pom.xml现在具有以下内容 xml(使用官方网站上的web实用程序创建) 这让我想到了weblogic中的log4j。
我对Kafka connect很陌生。我想把我的信息从Kafka主题推到弹性搜索。在阅读了可用的文档之后,我从发行版tar下载并编译了弹性搜索接收器。拉链(https://github.com/confluentinc/kafka-connect-elasticsearch/releases) 我添加了弹性搜索属性文件,并将上述jar包含在类路径中。当我在独立模式下运行kafka connect时