我试图用以下链接中提供的信息将Cassandra作为Flink中的数据来源:
AsyncWaitOperator
异常跟踪-->
02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> async wait operator -> (Flat Map, Sink: Unnamed) (1/1) (2809cef511194e612b2cc65510f78c64) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception. Failing the AsyncWaitOperator.
at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133) [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
... 2 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader)
contextClassLoader (java.lang.Thread)
threads (java.lang.ThreadGroup)
groups (java.lang.ThreadGroup)
threadGroup (io.netty.util.concurrent.DefaultThreadFactory)
val$backingThreadFactory (com.google.common.util.concurrent.ThreadFactoryBuilder$1)
threadFactory (java.util.concurrent.ThreadPoolExecutor)
delegate (com.google.common.util.concurrent.MoreExecutors$ListeningDecorator)
blockingExecutor (com.datastax.driver.core.Cluster$Manager)
manager (com.datastax.driver.core.Host)
triedHosts (com.datastax.driver.core.ExecutionInfo)
info (com.datastax.driver.core.ArrayBackedResultSet$SinglePage)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:82) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505) ~[kryo-2.24.0.jar:na]
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:182) ~[flink-core-1.4.2.jar:1.4.2]
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
... 10 common frames omitted
Caused by: java.util.ConcurrentModificationException: null
at java.util.Vector$Itr.checkForComodification(Vector.java:1184) ~[na:1.8.0_60]
at java.util.Vector$Itr.next(Vector.java:1137) ~[na:1.8.0_60]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523) ~[kryo-2.24.0.jar:na]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61) ~[kryo-2.24.0.jar:na]
... 68 common frames omitted
下面的代码应该适用于从Cassandra读取,以便在Flink中进行批处理。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ClusterBuilder clusterBuilder = new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint(<cassandraHost>))
.withPort(9042)
.withCredentials(<cassandraUserName>,<cassandraPassword>)
.build();
}
};
DataSet<Tuple3<String,String,String>> inputRecords = env
.createInput
(new CassandraInputFormat<Tuple3<String,String,String>>(<select query>,clusterBuilder)
,TupleTypeInfo.of(new TypeHint<Tuple3<String,String,String>>() {}));
DataSet的数据类型(在示例中由三个字符串组成的Tuple3)将根据您的select查询返回的字段的类型和数量而变化。
命令用于从Cassandra表中读取数据。 您可以使用此命令读取整个表,单个列,特定单元格等等。 语法 示例: 下面举个例子来演示如何从Cassandra表中读取数据。 我们有一个名为“”的表和以下列(,,)。 使用SELECT命令读整个表 执行结果如下 - 读取特定列 - 该示例将从表中只读和列的数据。 使用WHERE子句 WHERE子句与SELECT命令一起使用,以指定必须满足获取数据的确切条
尝试使用Apache Flink从Cassandra获取数据,引用本文,我可以读取数据,但我不知道如何将其加载到DataStream对象中。代码如下: 我试过了 将变量中的数据加载到数据流中
问题内容: 嗨,我习惯了SQL,但是我需要从HBase表读取数据。任何帮助都会很棒。一本书,或者只是一些示例代码,可以从表中读取。有人说使用扫描仪可以解决问题,但我不知道如何使用。 问题答案: 从网站:
我试图从Firebase数据库中读取数据,我已经到处阅读和查找,但我已经走到了死胡同。 这就是我所做的一切。 依赖项: 实现'com.google.firebase: Firebase存储: 9.2.1' 实现'com。谷歌。firebase:firebase数据库:9.2。1' 实现'com。谷歌。firebase:firebase授权:9.2。1' 实现'com。谷歌。火基:火基核心:9.2。
问题很简单:我在gDrive上有一些数据,例如在上。 我也有一个简单的gColab笔记本。 所以,我想做这样的事情: 不幸的是,所有的例子(像这样-https://colab.research.google.com/notebook#fileId=/v2/external/notebooks/io.ipynb,例如)建议只将所有必要的数据主要加载到笔记本。 但是,如果我有很多数据,它可能会非常复杂
我有一个Cassandra节点集群,每个节点机器上都有Spark worker。对于通信,我使用Datastax Spark-Cassasndra连接器。Datastax连接器是否对同一台机器中的工作人员从Cassandra节点读取数据进行了优化,或者在机器之间存在一些数据流?