当前位置: 首页 > 工具软件 > Tephra > 使用案例 >

Apache Tephra -使用

龚振濂
2023-12-01

 

一.     下载源代码并编译打包:

git clone https://git-wip-us.apache.org/repos/asf/incubator-tephra.git

cd incubator-tephra

mvn clean package

二.     编译完成后,在tephra-distribution/target/ directory包下拿到 tephra-<version>.tar.gz 安装到你的系统中。

三.     Maven pom文件添加:

<dependency>
   <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-api</artifactId>
    <version>${tephra.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-core</artifactId>
    <version>${tephra.version}</version>
  </dependency>

由于HBase api在不同版本之间发生了变化,所以您需要选择合适的HBase兼容性库:

 HBase 0.96.x:

<dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-hbase-compat-0.96</artifactId>
    <version>${tephra.version}</version>
  </dependency>

HBase0.98.x:

<dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-hbase-compat-0.98</artifactId>
    <version>${tephra.version}</version>
  </dependency>

HBase1.0.x:

<dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-hbase-compat-1.0</artifactId>
    <version>${tephra.version}</version>
  </dependency>

如果您运行的是CDH 5.45.55.6版本的HBase 1.0.x(这个版本与Apache HBase 1.0.xAPI不兼容):

<dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-hbase-compat-1.0-cdh</artifactId>
    <version>${tephra.version}</version>
  </dependency>

HBase1.1.x (除了 1.1.5):

<dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-hbase-compat-1.1</artifactId>
    <version>${tephra.version}</version>
  </dependency>

 HBase 1.2.x (除了 1.2.2):

<dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-hbase-compat-1.2</artifactId>
    <version>${tephra.version}</version>
  </dependency>

如果你运行的是CDH 5.7,或者是5.8版本的HBase 1.2.x(这个版本与Apache HBase 1.2.xAPI不兼容):

<dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-hbase-compat-1.2-cdh</artifactId>
    <version>${tephra.version}</version>
  </dependency>

四.          配置部署

Tephra使用了一个中央事务服务器来为数据修改分配独特的事务id,并执行冲突检测。只有单个事务服务器可以一次主动地处理客户端请求,但是,可以同时运行其他事务服务器实例,提供自动故障转移如果active的服务器出现问题时。

事务服务器配置:

Tephra事务服务器可以部署在运行HBase HMaster进程的相同集群节点上。事务服务器要求在服务器的Java类路径中提供HBase库。

事务服务器支持以下配置属性。所有的配置属性都可以添加到服务器classpath下的hbase -site.xml文件中。

要运行事务服务器,在您的Tephra安装目录中执行以下命令:

        ./bin/tephra start

事务客户端配置:

因为Tephra的客户需要与HBase进行沟通HBase客户端库和HBase集群配置必须在客户机的Java类路径中可用。客户端API的使用在客户端API部分中进行了描述。

 

客户端支持以下配置属性。所有的配置属性都可以添加到客户端classpath下的hbase -site.xml文件中。

HBase Coprocessor配置:

      除了事务服务器之外,Tephra还需要在所有的表上安装一个HBase coprocessor,在那里执行事务读和写操作。

要在所有HBase表上配置coprocessor,可以将以下内容添加到hbase-site.xml:

<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>org.apache.tephra.hbase.coprocessor.TransactionProcessor</value>
  </property>

可以将TransactionProcessor配置为仅在HBase表上加载,而这些表将用于事务读取和写入。但是,您必须确保在所有受影响的表上都可以使用coprocessor,以便Tephra能够正常工作。

现有的HBase表事务化

Tephra使用事务id覆盖HBase单元时间戳,并使用这些事务id来过滤比TTL(生存时间)更大的单元格。事务id的级别比单元时间戳要高。当一个有现有数据的常规的HBase表被转换为事务表时,在读取时可能会过滤现有的数据。为了允许从事务表读取现有数据,需要设置属性data.tx.read.pre在表的表描述符中设置为rue

即使data.tx.read.pre没有设置为true,已存在的数据也不会被删除,仅仅实在读取过程中不会被返回。

指标报告

通过使用Dropwizard度量库,Tephra内置支持通过JMX和日志文件来报告度量标准。

要使JMX报告用于度量,您需要在Java运行时参数中启用JMX。编辑bin / tephra-env.sh脚本并取消下面的行,对端口使用、SSLJMX身份验证进行任何想要的更改:

export JMX_OPTS="-Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=13001"
  export OPTS="$OPTS$JMX_OPTS"

 

要支持基于文件的报告,修改conf/logback.xml取消注释部分,将文件路径占位符替换为本地文件系统上的有效目录:

<appendername="METRICS"class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>/FILE-PATH/metrics.log</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
      <fileNamePattern>metrics.log.%d{yyyy-MM-dd}</fileNamePattern>
      <maxHistory>30</maxHistory>
    </rollingPolicy>
    <encoder>
      <pattern>%d{ISO8601} %msg%n</pattern>
    </encoder>
  </appender>
  <logger name="tephra-metrics"level="TRACE"additivity="false">
    <appender-ref ref="METRICS" />
  </logger>

通过设置data.tx.metrics,可以配置度量报告的频率。周期配置属性以秒为报告频率。

 

TransactionAwareHTable 类实现 HBase HTableInterface,从而提供相同的 Api 该实例提供标准 HBase HTable。只有某些操作是事务性支持。这些都是:

Methods Supported In Transactions

exists(Get get)

exists(List<Get> gets)

get(Get get)

get(List<Get> gets)

batch(List<? extends Row> actions, Object[] results)

batch(List<? extends Row> actions)

batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) [0.96]

batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) [0.96]

getScanner(byte[] family)

getScanner(byte[] family, byte[] qualifier)

put(Put put)

put(List<Put> puts)

delete(Delete delete)

delete(List<Delete> deletes)

不支持事务其他操作,调用时将抛出UnsupportedOperationException,如果要允许调用这些方法,可以设置setAllowNonTransactional(true).下面是不支持事务的方法:

Methods Supported Outside of Transactions

getRowOrBefore(byte[] row, byte[], family)

checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put)

checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete)

mutateRow(RowMutations rm)

append(Append append)

increment(Increment increment)

incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)

incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)

incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)

Note that for batch operations, only certain supportedoperations (getput, and delete) are applied transactionally.

注意,对于批处理操作,只有某些受支持的操作(getputdelete)是应用事务的。

Usage

 要使用TransactionalAwareHTable,需要一个TransactionContext的实例。TransactionContext为客户端使用事务提供了基本的契约。在事务生命周期的每个点上,它都提供了与Tephra事务服务器的必要交互,以便启动、提交和回滚事务。

TransactionContext的用法:

 

  TransactionContext context = new TransactionContext(client, transactionAwareHTable);
  try {
    context.start();
    transactionAwareHTable.put(new Put(Bytes.toBytes("row"));
    // ...
    context.finish();
  } catch (TransactionFailureException e) {
    context.abort();
  }

1.    首先,使用TransactionContext.start()启动一个新的事务。

2.    接下来,任何数据操作都是在事务上下文中执行的。

3.    数据操作完成后,TransactionContext.finish()提交事务。

4.    如果出现异常,则可以调用transactioncontext.中止()来回滚事务

TransactionAwareHTable处理执行数据操作的细节,并实现必要的钩子,以便提交和回滚数据更改(请参阅事务感知)

Example

为了演示如何使用TransactionAwareHTables,下面是一个二级索引表的基本实现。这个类封装了TransactionContext的用法,并为用户提供了一个简单的接口:

 

/**
   * A Transactional SecondaryIndexTable.
   */
  public class SecondaryIndexTable {
    private byte[] secondaryIndex;
    private TransactionAwareHTable transactionAwareHTable;
    private TransactionAwareHTable secondaryIndexTable;
    private TransactionContext transactionContext;
    private final TableName secondaryIndexTableName;
    private static final byte[] secondaryIndexFamily =
      Bytes.toBytes("secondaryIndexFamily");
    private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r');
    private static final byte[] DELIMITER  = new byte[] {0};
 
    public SecondaryIndexTable(TransactionServiceClient transactionServiceClient,
      HTable hTable, byte[] secondaryIndex) {
        secondaryIndexTableName =TableName.valueOf(hTable.getName().getNameAsString() + ".idx");
      HTable secondaryIndexHTable = null;
      HBaseAdmin hBaseAdmin = null;
      try {
        hBaseAdmin = new HBaseAdmin(hTable.getConfiguration());
        if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {
          hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName));
        }
        secondaryIndexHTable = new HTable(hTable.getConfiguration(),secondaryIndexTableName);
      } catch (Exception e) {
        Throwables.propagate(e);
      } finally {
        try {
          hBaseAdmin.close();
        } catch (Exception e) {
          Throwables.propagate(e);
        }
      }
 
      this.secondaryIndex = secondaryIndex;
      this.transactionAwareHTable = new TransactionAwareHTable(hTable);
      this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable);
      this.transactionContext = new TransactionContext(transactionServiceClient, transactionAwareHTable, secondaryIndexTable);
    }
    public Result get(Get get) throws IOException {
      return get(Collections.singletonList(get))[0];
    }
    public Result[] get(List<Get> gets) throws IOException {
      try {
        transactionContext.start();
        Result[] result = transactionAwareHTable.get(gets);
        transactionContext.finish();
        return result;
      } catch (Exception e) {
        try {
          transactionContext.abort();
        } catch (TransactionFailureException e1) {
          throw new IOException("Could not rollback transaction", e1);
        }
      }
      return null;
    }
 
    public Result[] getByIndex(byte[] value) throws IOException {
      try {
        transactionContext.start();
        Scan scan = new Scan(value, Bytes.add(value, new byte[0]));
        scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier);
        ResultScanner indexScanner = secondaryIndexTable.getScanner(scan);
 
        ArrayList<Get> gets = new ArrayList<Get>();
        for (Result result : indexScanner) {
          for (Cell cell : result.listCells()) {
            gets.add(new Get(cell.getValue()));
          }
        }
        Result[] results = transactionAwareHTable.get(gets);
        transactionContext.finish();
        return results;
      } catch (Exception e) {
        try {
          transactionContext.abort();
        } catch (TransactionFailureException e1) {
          throw new IOException("Could not rollback transaction", e1);
        }
      }
      return null;
    }
 
    public void put(Put put) throws IOException {
      put(Collections.singletonList(put));
    }
    public void put(List<Put> puts) throws IOException {
      try {
        transactionContext.start();
        ArrayList<Put> secondaryIndexPuts = new ArrayList<Put>();
        for (Put put : puts) {
          List<Put> indexPuts = new ArrayList<Put>();
          Set<Map.Entry<byte[], List<KeyValue>>> familyMap = put.getFamilyMap().entrySet();
          for (Map.Entry<byte [], List<KeyValue>> family : familyMap) {
            for (KeyValue value : family.getValue()) {
              if (value.getQualifier().equals(secondaryIndex)) {
                byte[] secondaryRow = Bytes.add(value.getQualifier(),DELIMITER, Bytes.add(value.getValue(),DELIMITER, value.getRow()));
                Put indexPut = new Put(secondaryRow);
                indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, put.getRow());
                indexPuts.add(indexPut);
              }
            }
          }
          secondaryIndexPuts.addAll(indexPuts);
        }
        transactionAwareHTable.put(puts);
        secondaryIndexTable.put(secondaryIndexPuts);
        transactionContext.finish();
      } catch (Exception e) {
        try {
          transactionContext.abort();
        } catch (TransactionFailureException e1) {
          throw new IOException("Could not rollback transaction", e1);
        }
      }
    }
  }

 

已知问题和局限性:

1.      当前,列家族删除操作是通过编写一个空限定符(空字节)和空值(空字节)来实现的。这是在本地的HBase Delete操作的基础上完成的,因此在事务失败的情况下,删除标记可以回滚——正常的HBase Delete操作不能被撤销。但是,这意味着在带有空限定符的列中存储数据的应用程序将不能存储空值,并且不能对该列进行事务删除。

2.      列删除操作是通过向列写入空值(空字节)来实现的。这意味着应用程序将不能将空值存储到列中。

3.      无效的事务不会从排除列表中自动清除。当一个事务失效时,无论是由于超时或由于失败而导致客户失效,它的事务ID都会被添加到一个被排除的事务列表中。在HBase区域刷新和压缩操作的事务处理器coprocessor将会删除无效事务的数据。然而,目前,事务id只能通过使用org.apache.tephra TransactionAdmin工具从被排除的事务id列表中手动删除。

 类似资料: