Flink---hbase-sink支持kerberos认证1

壤驷兴朝
2023-12-01

 

1.介绍

flink版本:1.10

项目中需要用到Flink往hbase写数据,集群开启了kerberos认证,但是默认的FlinkUpsertSinkFunction不支持kerberos认证的配置

在原有的HBaseUpsertSinkFunction的基础上修改 增加kerberos认证,以下是修改好之后的代码,主要是修改了open方法,判断是否需要kerberos认证,如果需要认证则进行kerberos认证,否则不需要则不认证。(jsonObject是kerberos的配置信息,keytab文件,kerberos账号这些信息。),修改了invoke方法判断票据是否过期,如果剩余时间只有20%就重新登录kerberos;

 

package com.rongan.realtime.sink;

import com.alibaba.fastjson.JSONObject;
import com.rongan.realtime.util.HbaseMutilUtil;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
 * The upsert sink for HBase.
 *
 * <p>This class leverage {@link BufferedMutator} to buffer multiple
 * {@link org.apache.hadoop.hbase.client.Mutation Mutations} before sending the requests to cluster.
 * The buffering strategy can be configured by {@code bufferFlushMaxSizeInBytes},
 * {@code bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}.</p>
 */
public class HBaseUpsertSinkFunction
        extends RichSinkFunction<Tuple2<Boolean, Row>>
        implements CheckpointedFunction, BufferedMutator.ExceptionListener {

    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.addons.hbase.HBaseUpsertSinkFunction.class);

    private final String hTableName;
    private final HBaseTableSchema schema;
    private final byte[] serializedConfig;

    private final long bufferFlushMaxSizeInBytes;
    private final long bufferFlushMaxMutations;
    private final long bufferFlushIntervalMillis;

    private transient HBaseReadWriteHelper helper;

    private transient Connection connection;
    private transient BufferedMutator mutator;

    private transient ScheduledExecutorService executor;
    private transient ScheduledFuture scheduledFuture;
    private transient AtomicLong numPendingRequests;
    private JSONObject jsonObject;

    private transient volatile boolean closed = false;

    /**
     * This is set from inside the {@link BufferedMutator.ExceptionListener} if a {@link Throwable}
     * was thrown.
     *
     * <p>Errors will be checked and rethrown before processing each input element, and when the sink is closed.
     */
    private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();

    public HBaseUpsertSinkFunction(
            String hTableName,
            HBaseTableSchema schema,
            org.apache.hadoop.conf.Configuration conf,
            JSONObject jsonObject,
            long bufferFlushMaxSizeInBytes,
            long bufferFlushMaxMutations,
            long bufferFlushIntervalMillis) {
        this.hTableName = hTableName;
        this.schema = schema;
        // Configuration is not serializable
        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
        this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
        this.bufferFlushMaxMutations = bufferFlushMaxMutations;
        this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
        this.jsonObject = jsonObject;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        LOG.info("start open ...");
        org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
        try {
            this.helper = new HBaseReadWriteHelper(schema);
            this.numPendingRequests = new AtomicLong(0);

            if (null == connection) {
                //进行kerberos认证
                HbaseMutilUtil.loginKerberos(config, jsonObject);
                this.connection = ConnectionFactory.createConnection(config);
            }
            // create a parameter instance, set the table name and custom listener reference.
            BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(hTableName))
                    .listener(this)
                    .writeBufferSize(bufferFlushMaxSizeInBytes);
            this.mutator = connection.getBufferedMutator(params);

            if (bufferFlushIntervalMillis > 0) {
                this.executor = Executors.newScheduledThreadPool(
                        1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
                this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
                    if (closed) {
                        return;
                    }
                    try {
                        flush();
                    } catch (Exception e) {
                        // fail the sink and skip the rest of the items
                        // if the failure handler decides to throw an exception
                        failureThrowable.compareAndSet(null, e);
                    }
                }, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
            }
        } catch (TableNotFoundException tnfe) {
            LOG.error("The table " + hTableName + " not found ", tnfe);
            throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
        } catch (IOException ioe) {
            LOG.error("Exception while creating connection to HBase.", ioe);
            throw new RuntimeException("Cannot create connection to HBase.", ioe);
        }
        LOG.info("end open.");
    }

    private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
        // create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
        // and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
        // user params from client-side have the highest priority
        org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfiguration.create());

        // do validation: check key option(s) in final runtime configuration
        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
            LOG.error("Can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
            throw new IOException("Check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
        }

        return runtimeConfig;
    }

    private void checkErrorAndRethrow() {
        Throwable cause = failureThrowable.get();
        if (cause != null) {
            throw new RuntimeException("An error occurred in HBaseSink.", cause);
        }
    }

    @Override
    public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
        checkErrorAndRethrow();
        //判断是否hbase是否需要进行kerberos认证
        if ("true".equals(jsonObject.getString("haveKerberos")))
            //如果需要则检查kerberos票据是否过期,如果过期了,重新登录
            HbaseMutilUtil.reLoginKerberos();
        Row row = value.f1;
        Object field = row.getField(0);
        if(field == null || field.toString().equals("")){
            LOG.warn("hbase sink rowkey is air"+value);
            System.out.println("hbase sink rowkey is air"+value);
            return;
        }
        if (value.f0) {
            Put put = helper.createPutMutation(value.f1);
            mutator.mutate(put);
        } else {
            Delete delete = helper.createDeleteMutation(value.f1);
            mutator.mutate(delete);
        }

        // flush when the buffer number of mutations greater than the configured max size.
        if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
            flush();
        }
    }

    private void flush() throws IOException {
        // BufferedMutator is thread-safe
        mutator.flush();
        numPendingRequests.set(0);
        checkErrorAndRethrow();
    }

    @Override
    public void close() throws Exception {
        closed = true;

        if (mutator != null) {
            try {
                mutator.close();
            } catch (IOException e) {
                LOG.warn("Exception occurs while closing HBase BufferedMutator.", e);
            }
            this.mutator = null;
        }

        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                LOG.warn("Exception occurs while closing HBase Connection.", e);
            }
            this.connection = null;
        }

        if (scheduledFuture != null) {
            scheduledFuture.cancel(false);
            if (executor != null) {
                executor.shutdownNow();
            }
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        while (numPendingRequests.get() != 0) {
            flush();
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // nothing to do.
    }

    @Override
    public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
        // fail the sink and skip the rest of the items
        // if the failure handler decides to throw an exception
        failureThrowable.compareAndSet(null, exception);
    }
}

2.使用修改之后的HBaseUpsertSinkFunction

因为暂时只修改了HbaseUpsertSinkFunction所以无法通过写sql来配置kerberos信息,只能使用DataStream的方式来使用修改之后的HbaseUpsertSinkFunction。也就是查询完sql 之后调用 toRetractStream[Row] 方法之后,再手动addSink (new HbaseUpsertSinkFunction()),通过这种方式来使用HbaseUpsertSinkFunction。Flink-sql底层也是这么做的。

代码:

   val deviceInfo: DataStream[tuple.Tuple2[lang.Boolean, Row]] = tableEnv.sqlQuery("select mac as rowkey, ROW(incident_sign ,system_sign,agent_version ,host_name ,ip ,mac ,system_info ,uuid ,date_time) as info from agetDeviceInfo")
      .toRetractStream[Row].
     map(st => {
     //一定要重新调用一下map方法将类型转为java.lang.Boolean的。因为HbaseUpsertSinkFunction是java开发的,他继承的是 RichSinkFunction<Tuple2<Boolean, Row>> 这个都是java类型,所以要转一下
不然后面添加sink报类型不匹配
      new org.apache.flink.api.java.tuple.Tuple2[lang.Boolean, Row](st._1, st._2)
    })


   //创建hbaseschema
    val schema = new HBaseTableSchema()
    schema.setRowKey("rowkey", classOf[String])
    schema.addColumn("info", "incident_sign", classOf[String])
    schema.addColumn("info", "system_sign", classOf[String])
    schema.addColumn("info", "agent_version", classOf[String])
    schema.addColumn("info", "host_name", classOf[String])
    schema.addColumn("info", "ip", classOf[String])
    schema.addColumn("info", "mac", classOf[String])
    schema.addColumn("info", "system_info", classOf[String])
    schema.addColumn("info", "uuid", classOf[String])
    schema.addColumn("info", "date_time", classOf[String])
   //添加hbaseUpsertSink
   deviceInfo.addSink(FlinkUtil.getHbaseSink(params, schema, FlinkUtil.initHbaseProperties(params), "t_rsd_base_asset"))

FlinkUtil.initHbaseProperties代码

  def initHbaseProperties(conf: ParameterTool) = {
    val properties = new Properties()
    properties.setProperty("keytabpath", conf.get(Constants.HBASE_KEYTABPATH))
    properties.setProperty("zookeeper.address", conf.get(Constants.ZOOKEEPER_ADDRESS))
    properties.setProperty("zookeeper.port", conf.get(Constants.ZOOKEEPER_PORT))
    properties.setProperty("zookeeper.node", conf.get(Constants.ZOOKEEPER_NODE))
    properties.setProperty("hbaseRegionserverKerberosPrincipal", conf.get(Constants.HBASE_REGIN_SERVER_KERBEROS_PRINCIPAL))
    properties.setProperty("hbaseMasterKerberosPrincipal", conf.get(Constants.HBASE_MASTER_KERBEROS_PRINCIPAL))
    properties.setProperty("hbase.kerberos.principal", conf.get(Constants.HBASE_KERBEROS_PRINCIPAL))
    properties.setProperty(Constants.HBASE_HAVE_KERBEROS, conf.get(Constants.HBASE_HAVE_KERBEROS))
    properties
  }

3.util代码

package com.rongan.realtime.util;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rongan.realtime.constants.Constants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;

/*
 */

public class HbaseMutilUtil {

    private static Logger logger = LoggerFactory.getLogger(HbaseMutilUtil.class);

    private HbaseMutilUtil() {
        throw new IllegalStateException("Utility class");
    }

    /**
     * 批量插入数据到hbase
     */
    public static int batchInsertDataToTable(Table table, List<PutBean> putBeans) throws IOException {
        List<Put> puts = new ArrayList<>();
        for (PutBean putBean : putBeans) {
            Put put = new Put(Bytes.toBytes(putBean.rowkey));
            put.addColumn(Bytes.toBytes(putBean.cf), Bytes.toBytes(putBean.column), Bytes.toBytes(putBean.value));
            puts.add(put);
        }
        table.put(puts);

        table.close();
        return 1;
    }
    
    public static void close(Table table) throws IOException {
        if (table != null) {
            table.close();
        }

    }

    public static void close(ResultScanner scanner) {
        if (scanner != null) {
            scanner.close();
        }
    }


    public static void close(Admin scanner) throws IOException {
        if (scanner != null) {
            scanner.close();
        }
    }

    public static void close(Connection connection, Table table, Admin admin, ResultScanner scanner) throws IOException {
        close(table);
        close(connection);
        close(scanner);
        close(admin);
    }


    public static void close(Connection connection) throws IOException {
        connection.close();
    }

    public static class PutBean {
        private String rowkey;
        private String cf;
        private String column;
        private String value;

        public PutBean() {
        }

        public PutBean(String rowkey, String cf, String column, String value) {
            this.rowkey = rowkey;
            this.cf = cf;
            this.column = column;
            this.value = value;
        }


        public void setCf(String cf) {
            this.cf = cf;
        }

        public void setColumn(String column) {
            this.column = column;
        }

        public void setValue(String value) {
            this.value = value;
        }


        public String getCf() {
            return cf;
        }

        public String getColumn() {
            return column;
        }

        public String getRowkey() {
            return rowkey;
        }

        public void setRowkey(String rowkey) {
            this.rowkey = rowkey;
        }

        public String getValue() {

            return value;
        }

        @Override
        public String toString() {
            return "PutBean{" +
                    "rowkey='" + rowkey + '\'' +
                    ", cf='" + cf + '\'' +
                    ", column='" + column + '\'' +
                    ", value='" + value + '\'' +
                    '}';
        }
    }

    public static Configuration getHbaseConf(JSONObject json) {
        Configuration conf = HBaseConfiguration.create();
        JSONObject hbaseConfig = json.getJSONObject("hbaseConfig");
        String hbaseZks = hbaseConfig.getString("hbase_zookeeper_quorum");
        String[] splits = hbaseZks.split(",");
        StringBuilder zks = new StringBuilder();
        for (int i = 0; i < splits.length; i++) {
            zks.append(splits[i].split(":")[0]).append(",");
            conf.set("hbase.zookeeper.property.clientPort", splits[i].split(":")[1]);
        }
        String quorum = zks.deleteCharAt(zks.length() - 1).toString();
        conf.set("hbase.zookeeper.quorum", quorum);
        conf.set("hbase.rootdir", hbaseConfig.getString("hbase_rootdir"));
        String zookeeperZnodeParent = json.getString("zookeeperZnodeParent");
        conf.set("zookeeper.znode.parent", zookeeperZnodeParent);
        conf.set("hbase.client.retries.number", "3");
        return conf;
    }



    /**
     * 登陆kerberos
     * @param hConfiguration
     * @param configuration
     * @return
     */
    public static boolean loginKerberos(org.apache.hadoop.conf.Configuration hConfiguration, JSONObject configuration) {
        String haveKerberos = configuration.getString("haveKerberos");
        if (haveKerberos != null && haveKerberos.equals("true")) {
//            //刷新上一次认证的信息,不刷新会报错。

            String kerberosPrincipal = configuration.getString("kerberosPrincipal");
            String kerberosKeyTablPath = configuration.getString("kerberosKeytabFilePath");

            String hbaseRegionserverKerberosPrincipal = configuration.getString("hbaseRegionserverKerberosPrincipal");
            String hbaseMasterKerberosPrincipal = configuration.getString("hbaseMasterKerberosPrincipal");
            hConfiguration.set("hadoop.security.authentication", "kerberos");
            hConfiguration.set("hbase.security.authentication", "kerberos");
            hConfiguration.set("hbase.master.kerberos.principal", hbaseMasterKerberosPrincipal);
            hConfiguration.set("hbase.regionserver.kerberos.principal", hbaseRegionserverKerberosPrincipal);

            hConfiguration.setBoolean("hadoop.security.authorization", true);
         //   System.setProperty("java.security.krb5.conf", "D:\\kerberos\\krb5.conf");
            UserGroupInformation.setConfiguration(hConfiguration);
            try {
                UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeyTablPath);
                return true;
            } catch (IOException e) {
                logger.error(e.getMessage(), e);

            }
        } else {
            return true;
        }
        return false;
    }

    /**
     * 判断是否需要重新登陆kerberos,如果需要则重新登陆
     * @throws IOException
     */
    public static void reLoginKerberos() throws IOException {
        UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
    }

    /**
     * 重置kerberos
     */
    public static void reset(){
        UserGroupInformation.setLoginUser(null);
        HadoopKerberosName.setRules(null);
    }

}

 

 类似资料: