当前位置: 首页 > 知识库问答 >
问题:

带有CassandrSink的Flink作业失败,写入错误

储俊英
2023-03-14

我有两个简单的Flink流式作业,从Kafka读取,做一些转换,并将结果放入Cassandra sink。他们从不同的Kafka主题阅读,并存入不同的卡桑德拉表。

当我单独运行这两个工作中的任何一个时,一切都很好。检查点被触发并完成,数据被保存到Cassandra。

我找不到关于这个错误的很多信息,它可能是由下列任何一个引起的:

  • Flink(V1.10.0-Scala2.12),
  • Flink Cassandra连接器(Flink-Connector-Cassandra2.11:Jar:1.10.2,也使用Flink-Connector-Cassandra2.12:Jar:1.10.0),
  • DataStax基础驱动程序(v3.10.2),
  • Cassandra V4.0(与V3.0相同),
  • 网络传输(V4.1.51.final)。

我还使用可能与第一个有冲突的包:

    null
ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        Cluster cluster = null;
        try {
            cluster = builder
                    .addContactPoint("localhost")
                    .withPort(9042)
                    .withClusterName("Test Cluster")
                    .withoutJMXReporting()
                    .withProtocolVersion(ProtocolVersion.V4)
                    .withoutMetrics()
                    .build();

            // register codecs from datastax extras.
            cluster.getConfiguration().getCodecRegistry()
                    .register(LocalTimeCodec.instance);
        } catch (ConfigurationException e) {
            e.printStackTrace();
        } catch (NoHostAvailableException nhae) {
            nhae.printStackTrace();
        }

        return cluster;
    }
};

我尝试了不同的PoolingOptions和SocketOptions设置,但没有成功。

卡桑德拉·辛克:

CassandraSink.addSink(dataRows)
.setQuery("insert into table_name_(16 columns names) " +
        "values (16 placeholders);")
.enableWriteAheadLog()
.setClusterBuilder(builder)
.setFailureHandler(new CassandraFailureHandler() {
    @Override
    public void onFailure(Throwable throwable) {
        LOG.error("A {} occurred.", "Cassandra Failure", throwable);
    }
})
.build()
.setParallelism(1)
.name("Cassandra Sink For Unique Count every N minutes.");

flink作业管理器的完整跟踪日志:

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
    at com.datastax.driver.core.Cluster.init(Cluster.java:162)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.createSession(CassandraSinkBase.java:143)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:87)
    at org.apache.flink.streaming.connectors.cassandra.AbstractCassandraTupleSink.open(AbstractCassandraTupleSink.java:49)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.base/java.lang.Thread.run(Thread.java:834)
    null
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.abcde.ai</groupId>
    <artifactId>analytics-etl</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.10.2</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>commons-configuration</groupId>
            <artifactId>commons-configuration</artifactId>
            <version>1.10</version>
        </dependency>
        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.abcde.analytics.etl.KafkaUniqueCountsStreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

共有1个答案

单于皓轩
2023-03-14

我可能错了,但最有可能的问题是由netty客户端版本冲突引起的。错误状态为NoHostAvailableException,但基础错误是TransportException带有错误写入错误消息。Cassandra的情况肯定很好。

有一种类似的stackoverflow情况--Cassandra-Error writing,其症状非常相似--单个项目运行良好,并且AllNodesFailedException带有TransportException带有error writing消息,这是添加多个项目时的根本原因。作者通过统一netty客户端来解决这一问题。

在您的例子中,我不确定为什么会有这么多的依赖项,所以我会尝试删除所有附加项和库,只保留Flink(v1.10.0-Scala2.12)和Flink Cassandra Connector(flink-connector-Cassandra2.12:jar:1.10.0)库。它们必须已经包括必要的驱动程序、netty等。所有其他驱动程序都应该被跳过(至少在初始迭代中是这样,以确保这解决了问题和它的库冲突)。

 类似资料:
  • 我目前正在尝试为一个项目设置Elasticsearch。我已经安装了,还安装了Java,即。 但是当我尝试使用以下命令启动Elasticsearch时 我得到以下错误 loaded:loaded(/usr/lib/systemd/system/elasticsearch.service;disabled;vend 活动:自世界协调时2019-11-01 06:09:54开始失败(结果:退出-代码)

  • 我已经在Ubuntu 18.04上注册了一个安装了shell执行器的GitLab运行程序,并使用下面的命令设置了一个docker容器 然后,我尝试从gitlab ci中的执行以下命令,但出现了一个错误。 除了使用docker exec之外,是否有任何关于此问题的线索?我知道,在Gitlab CI环境中工作,但它将在容器中创建一个新会话,这对我来说是不可取的。谢谢

  • 我是一个新的flink和尝试提交我的flink程序到我的flink集群。

  • 我是hadoop和地图还原的新手,我正在尝试编写一个地图还原器,计算单词计数txt文件的前10个计数单词。 我的 txt 文件“q2_result.txt”看起来像: 映射: 减速机: 我知道你可以在Hadoop jar命令中将一个标志传递给-D选项,这样它就会按照你想要的键进行排序(在我的情况下,计数是k2,2),这里我只是先使用一个简单的命令: 所以我认为这样简单的映射器和缩减器不应该给我错误

  • 我想在阿兹卡班经营蜂巢工作

  • 我的工作流程工作原理如下: src[Kafka]- 但我的工作是运行精细的数据完美地流向Kafka和MySQL,但它在检查点失败,附加图像相同。 Ps :目前我已经禁用了检查点,但是当我使用相同的属性启用时,它会失败