当前位置: 首页 > 知识库问答 >
问题:

Cassandra接收器连接器:尝试创建/查找主题“_confluent-command”时出错

金成济
2023-03-14

无法使用Ksqldb创建Kafka->Cassandra接收器连接器:

创建接收器连接器cassandra(“CONNECTOR.class”=“io.confluent.connect.cassandra.CassandrasinkConnector”,“tasks.max”=“1”,“topics”=“tst”,“cassandra.contact.points”=“cassandra”,“cassandra.keyspace”=“test”,“cassandra.write.mode”=“update”,“confluent.topic.bootstrap.servers”=“kafka:9092');

ERROR [CASS|worker] WorkerConnector{id=CASS} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:118)
    org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) '_confluent-command'
            at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:262)
            at io.confluent.license.LicenseStore$1.run(LicenseStore.java:161)
            at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:128)
            at io.confluent.license.LicenseStore.start(LicenseStore.java:190)
            at io.confluent.license.LicenseManager.<init>(LicenseManager.java:155)
            at io.confluent.license.LicenseManager.<init>(LicenseManager.java:140)
            at io.confluent.connect.utils.licensing.ConnectLicenseManager$Builder.lambda$build$0(ConnectLicenseManager.java:210)
            at io.confluent.connect.utils.licensing.ConnectLicenseManager.registerOrValidateLicense(ConnectLicenseManager.java:255)
            at io.confluent.connect.cassandra.CassandraSinkConnector.doStart(CassandraSinkConnector.java:50)
            at io.confluent.connect.cassandra.CassandraSinkConnector.start(CassandraSinkConnector.java:45)
            at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:110)
            at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:135)
            at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195)
            at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:257)
            at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1190)
            at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:126)
            at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1206)
            at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:1202)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
            at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
            at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
            at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
            at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
            at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:229)
            ... 21 more
    Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.

共有1个答案

吴欣悦
2023-03-14

汇合Cassandra接收器连接器复制因子的默认值为3。修改连接器配置中的默认值就解决了这个问题!

"confluent.topic.replication.factor" = '1',
 类似资料:
  • **dataframe2:从另一个来源获得的键的Dataframe(这些键是上表中ID列的分区键)-此表中不同键的数量约为0.15万** 现在,此代码总是导致“com.datastax.oss.driver.api.core.servererrors.ReadFailureException:在一致性LOCAL_ONE读取查询期间Cassandra失败(需要1个响应,但只有0个副本响应,1个失败)

  • [2017-08-31 10:15:20715]警告配置“内部”。钥匙已提供“转换器”,但不是已知的配置。(org.apache.kafka.clients.admin.AdminClientConfig:231)[2017-08-31 10:15:20715]警告配置的状态。存储复制。提供了“因子”,但不是已知的配置。(org.apache.kafka.clients.admin.AdminCl

  • Presto与Cassandra/ScylLadb的默认连接数是多少?如何设置此属性?谢谢

  • 我使用自己的自定义Sink插件运行Kafka Connect集群(本地有1个工人Docker Compose)。我想在连接器中使用几个主题:topicA、topicB、topicC,每个主题都有一个分区。 我的连接器启动时的配置子集如下: 使用此配置,我希望Kafka Connect为每个接收器任务分配一个主题,但遗憾的是,这不是我看到的。实践中发生的情况是,为分配了所有主题的每个任务调用Sink

  • 我已经启动了spark-thrift服务器,并使用Beeline连接到thrift服务器。当尝试查询时,创建一个表在hive转移,我得到以下错误。 cassandra不是有效的Spark SQL数据源。 0:jdbc:hive2:/localhost:10000>select*from traveldata.employee_details;

  • 我有一个微服务,它使用 OracleDB 在表中发布系统更改。表包含一个,其中包含事件类型的名称。 JDBC Source Kafka Connect可能会接受表更改,并在KAFKA-TOPIC中使用列的值发布它们? 这是我的源代码kafka连接器配置: