当前位置: 首页 > 知识库问答 >
问题:

Kafka producer自动故障切换/故障回复

从建明
2023-03-14

我想知道是否可以在Kafka制作程序中配置2个不同的Kafka集群。

目前我正试图让我的制片人

我正在使用Apache Kafka 2.8和Python 3.7的confluent_kafka==1.8.2包。

生产商代码下方:

from time import sleep

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'clusterA:32531, clusterB:30804'})


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:

        print(f'Message delivered to {msg.offset()}')


with open('test_data.csv', 'r') as read_obj:
    csv_reader = reader(read_obj)
    header = next(csv_reader)
    # Check file as empty
    if header is not None:
        # Iterate over each row after the header in the csv
        for row in csv_reader:
            sleep(0.02)
            p.produce(topic='demo', key=row[5], value=str(row), callback=delivery_report)
p.flush()

当我杀死clusterB时,我得到了以下错误消息。

%4|1643837239.074|CLUSTERID|rdkafka#producer-1| [thrd:main]: Broker clusterA:32531/bootstrap reports different ClusterId "MLWCRsVXSxOf2YGPRIivjA" than previously known "6ZtcQCRPQ5msgeD3r7I11w": a client must not be simultaneously connected to multiple clusters
%3|1643837240.995|FAIL|rdkafka#producer-1| [thrd:clusterB:30804/bootstrap]: 172.27.176.222:30804/bootstrap: Connect to ipv4#clusterB:30804 failed: Unknown error (after 2044ms in state CONNECT)

共有1个答案

后安民
2023-03-14

目前,您必须手动将引导信息更新到辅助集群,这将需要重新启动客户端以进行故障切换。

从编程角度来看,为了连接到单独的集群,您必须停止当前producer实例,并使用新的引导服务器配置启动一个新实例。然而,这可能会变得相当复杂。

其他选择是,

  • 您使用LB或VIP配置kafka(不推荐,因为本质上需要从客户端到代理的直接连接)
  • 配置一个共享存储(memcache或redis),用于存储引导服务器配置。您的客户端将在引导过程中获取引导服务器。在失败期间,您更改存储中的值并重新启动您的客户端。(这使得操作非常容易)
 类似资料:
  • 我正在尝试用6台机器实现一个Redis集群。我有一个由六台机器组成的流浪集群: 运行redis服务器 我编辑了上述所有服务器的/etc/redis/redis.conf文件,添加了这个 然后我在六台机器中的一台上运行了这个程序; Redis集群已启动并运行。我通过在一台机器上设置值手动检查它显示在其他机器上。 我的问题是,当我关闭或停止任何一台主机上的redis server时,整个集群都会停止运

  • 蓝鲸故障自愈,是腾讯蓝鲸推出的一款SaaS服务,目前可以支持和open-falcon无缝对接了,通过接入蓝鲸故障自愈系统,可以帮助使用open-falcon的用户,做到告警无人值守。 具体的配置非常简单: open-falcon接入蓝鲸 腾讯蓝鲸故障自愈的使用案例参考:蓝鲸故障自愈案例 那些年我们想做的无人值守

  • 自动故障剔除会自动监控 RPC 调用的情况,对故障节点进行权重降级,并在节点恢复健康时进行权重恢复。目前支持 bolt 协议。 在 SOFABoot 中,只需要配置自动故障剔除的参数到 application.properties 即可。可以不完全配置,只配置自己关心的参数,其余参数会取默认值。需要注意的是 com.alipay.sofa.rpc.aft.regulation.effective

  • 集群中通常一个服务有多个服务提供者。其中部分服务提供者可能由于网络,配置,长时间 fullgc ,线程池满,硬件故障等导致长连接还存活但是程序已经无法正常响应。单机故障剔除功能会将这部分异常的服务提供者进行降级,使得客户端的请求更多地指向健康节点。当异常节点的表现正常后,单机故障剔除功能会对该节点进行恢复,使得客户端请求逐渐将流量分发到该节点。单机故障剔除功能解决了服务故障持续影响业务的问题,避免

  • 问题内容: 在简单情况下,如果3台服务器具有1个主服务器和2个从属服务器而没有分片。是否有使用Java和Jedis的经过验证的解决方案,该解决方案没有单点故障,并且将自动处理单个服务器(无论是主服务器还是从服务器)(自动故障转移)。例如,提升主机并在故障后重置,而不会丢失任何数据。 在我看来,这似乎应该是一个已解决的问题,但是我找不到关于它的任何代码,而仅是对实现此方法的高级描述。 谁实际覆盖并在

  • Webpack 的配置比较复杂,很容出现错误,下面是一些通常的故障处理手段。 一般情况下,webpack 如果出问题,会打印一些简单的错误信息,比如模块没有找到。我们还可以通过参数 --display-error-details 来打印错误详情。 $ webpack --display-error-details Hash: a40fbc6d852c51fceadb Version: webpa