当前位置: 首页 > 知识库问答 >
问题:

分布式运行模式下Kafka Producer的唯一事务ID

左丘边浩
2023-03-14

我有一个基于过程消费的大数据应用程序-

假设我的应用程序在一台机器上运行,我实例化了2个消费者,他们有自己的生产者,例如生产者1有事务ID -

ExecutorService executorService// responsible for starting my consumers
for (int i = 0; i < 2; i++) {
    prod_trans_id = "consumer-" + str(i) + "-producer"
    Custom_Consumer consumer = new Custom_Consumer(prod_trans_id)
    executorService.submit(consumer)
}

如果我的应用程序在一台机器上工作,这完全可以正常工作,但是,事实并非如此,因为应用程序需要在多台机器上运行,因此当相同的代码在机器2上运行时,由机器2上的消费者实例化的生产者将具有与机器1上相同的事务ID。我希望事务ID的生成方式不会相互冲突,并且它们是可重现的,这意味着如果应用程序崩溃/停止(例如有人服务应用程序停止,然后服务应用程序启动),当它重新联机时,它应该使用以前使用的相同事务ID。我想到了基于UUID的方法,但是,UUID是随机的,当一台机器上的应用程序死亡并重新联机时,UUID将不相同。

共有1个答案

茅桐
2023-03-14
private final static String HOSTNAME_COMMAND = "hostname";

public static String getHostName() {
        BufferedReader inputStreamReader = null;
        BufferedReader errorStreamReader = null;
        try {
            Process process = Runtime.getRuntime().exec(HOSTNAME_COMMAND);
            inputStreamReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
            errorStreamReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
            if (errorStreamReader.readLine() != null) {
                throw new RuntimeException(String.format("Failed to get the hostname, exception message: %s",
                        errorStreamReader.readLine()));
            }
            return inputStreamReader.readLine();
        } catch (IOException e) {
            try {
                if (inputStreamReader != null) {
                    inputStreamReader.close();
                }
                if (errorStreamReader != null) {
                    errorStreamReader.close();
                }
            } catch (IOException e1) {
                LogExceptionTrace.logExceptionStackTrace(e1);
                throw new RuntimeException(e1);
            }
            LogExceptionTrace.logExceptionStackTrace(e);
            throw new RuntimeException(e);
        }
    }

然后使用主机名,如下所示:

final String producerTransactionalID = String.format("%s_producer", this.consumerName);

其中消费者名称设置如下:

for (int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
            String consumerName = String.format("%s-worker-%d", hostName, i);
            Executor executor = new Executor(
                    Configuration, consumerName
            );
            Executors.add(executor);
            futures.add(executorService.submit(executor));
        }
 类似资料:
  • ShardingSphereTransactionManager SPI 名称 详细说明 ShardingSphereTransactionManager 分布式事务管理器 已知实现类 详细说明 XAShardingSphereTransactionManager 基于 XA 的分布式事务管理器 SeataATShardingSphereTransactionManager 基于 Seata 的分

  • ShardingSphere-Proxy 接入的分布式事务 API 同 ShardingSphere-JDBC 保持一致,支持 LOCAL,XA,BASE 类型的事务。 XA 事务 ShardingSphere-Proxy 原生支持 XA 事务,默认的事务管理器为 Atomikos。 可以通过在 ShardingSphere-Proxy 的 conf 目录中添加 jta.properties 来定

  • 通过 Apache ShardingSphere 使用分布式事务,与本地事务并无区别。 除了透明化分布式事务的使用之外,Apache ShardingSphere 还能够在每次数据库访问时切换分布式事务类型。 支持的事务类型包括 本地事务、XA事务 和 柔性事务。可在创建数据库连接之前设置,缺省为 Apache ShardingSphere 启动时的默认事务类型。

  • 背景 数据库事务需要满足 ACID(原子性、一致性、隔离性、持久性)四个特性。 原子性(Atomicity)指事务作为整体来执行,要么全部执行,要么全不执行。 一致性(Consistency)指事务应确保数据从一个一致的状态转变为另一个一致的状态。 隔离性(Isolation)指多个事务并发执行时,一个事务的执行不应影响其他事务的执行。 持久性(Durability)指已提交的事务修改数据会被持久

  • 单文档原子性可满足大多数业务需求 在 MongoDB 中,对单个文档的操作是原子操作。 由于 MongoDB 文档数据模型,一个文档中通过嵌入式的文档和数组来表示传统关系数据库模型中的一对一、一对多关系,而不是通过文档之间的复杂关系来描述业务需求中的一对一、一对多关系。 所以单文档原子性可以满足实际生产中大多数关于事务的需求。 对于需要对多个文档(在单个或多个集合中)进行原子读写的情况,Mongo

  • 我已经在Ubuntu上安装了hadoop 1.2.0。所有的服务namenode,sec namenode,datanode,jobtracker,tasktracker运行良好。 然后我安装了hbase-0-94.8,我希望配置也可以。但是HMaster无法在端口9000上启动。它实际上开始,然后下降。 >2014-05-14 09:28:37,015 INFO org.apache.hadoo