flink版本:1.10
项目中需要用到Flink往hbase写数据,集群开启了kerberos认证,但是默认的FlinkUpsertSinkFunction不支持kerberos认证的配置
在原有的HBaseUpsertSinkFunction的基础上修改 增加kerberos认证,以下是修改好之后的代码,主要是修改了open方法,判断是否需要kerberos认证,如果需要认证则进行kerberos认证,否则不需要则不认证。(jsonObject是kerberos的配置信息,keytab文件,kerberos账号这些信息。),修改了invoke方法判断票据是否过期,如果剩余时间只有20%就重新登录kerberos;
package com.rongan.realtime.sink;
import com.alibaba.fastjson.JSONObject;
import com.rongan.realtime.util.HbaseMutilUtil;
import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* The upsert sink for HBase.
*
* <p>This class leverage {@link BufferedMutator} to buffer multiple
* {@link org.apache.hadoop.hbase.client.Mutation Mutations} before sending the requests to cluster.
* The buffering strategy can be configured by {@code bufferFlushMaxSizeInBytes},
* {@code bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}.</p>
*/
public class HBaseUpsertSinkFunction
extends RichSinkFunction<Tuple2<Boolean, Row>>
implements CheckpointedFunction, BufferedMutator.ExceptionListener {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.addons.hbase.HBaseUpsertSinkFunction.class);
private final String hTableName;
private final HBaseTableSchema schema;
private final byte[] serializedConfig;
private final long bufferFlushMaxSizeInBytes;
private final long bufferFlushMaxMutations;
private final long bufferFlushIntervalMillis;
private transient HBaseReadWriteHelper helper;
private transient Connection connection;
private transient BufferedMutator mutator;
private transient ScheduledExecutorService executor;
private transient ScheduledFuture scheduledFuture;
private transient AtomicLong numPendingRequests;
private JSONObject jsonObject;
private transient volatile boolean closed = false;
/**
* This is set from inside the {@link BufferedMutator.ExceptionListener} if a {@link Throwable}
* was thrown.
*
* <p>Errors will be checked and rethrown before processing each input element, and when the sink is closed.
*/
private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
public HBaseUpsertSinkFunction(
String hTableName,
HBaseTableSchema schema,
org.apache.hadoop.conf.Configuration conf,
JSONObject jsonObject,
long bufferFlushMaxSizeInBytes,
long bufferFlushMaxMutations,
long bufferFlushIntervalMillis) {
this.hTableName = hTableName;
this.schema = schema;
// Configuration is not serializable
this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
this.bufferFlushMaxMutations = bufferFlushMaxMutations;
this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
this.jsonObject = jsonObject;
}
@Override
public void open(Configuration parameters) throws Exception {
LOG.info("start open ...");
org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
try {
this.helper = new HBaseReadWriteHelper(schema);
this.numPendingRequests = new AtomicLong(0);
if (null == connection) {
//进行kerberos认证
HbaseMutilUtil.loginKerberos(config, jsonObject);
this.connection = ConnectionFactory.createConnection(config);
}
// create a parameter instance, set the table name and custom listener reference.
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(hTableName))
.listener(this)
.writeBufferSize(bufferFlushMaxSizeInBytes);
this.mutator = connection.getBufferedMutator(params);
if (bufferFlushIntervalMillis > 0) {
this.executor = Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
if (closed) {
return;
}
try {
flush();
} catch (Exception e) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, e);
}
}, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
}
} catch (TableNotFoundException tnfe) {
LOG.error("The table " + hTableName + " not found ", tnfe);
throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
} catch (IOException ioe) {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
LOG.info("end open.");
}
private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
// and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
// user params from client-side have the highest priority
org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfiguration.create());
// do validation: check key option(s) in final runtime configuration
if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
LOG.error("Can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
throw new IOException("Check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
}
return runtimeConfig;
}
private void checkErrorAndRethrow() {
Throwable cause = failureThrowable.get();
if (cause != null) {
throw new RuntimeException("An error occurred in HBaseSink.", cause);
}
}
@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
checkErrorAndRethrow();
//判断是否hbase是否需要进行kerberos认证
if ("true".equals(jsonObject.getString("haveKerberos")))
//如果需要则检查kerberos票据是否过期,如果过期了,重新登录
HbaseMutilUtil.reLoginKerberos();
Row row = value.f1;
Object field = row.getField(0);
if(field == null || field.toString().equals("")){
LOG.warn("hbase sink rowkey is air"+value);
System.out.println("hbase sink rowkey is air"+value);
return;
}
if (value.f0) {
Put put = helper.createPutMutation(value.f1);
mutator.mutate(put);
} else {
Delete delete = helper.createDeleteMutation(value.f1);
mutator.mutate(delete);
}
// flush when the buffer number of mutations greater than the configured max size.
if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
flush();
}
}
private void flush() throws IOException {
// BufferedMutator is thread-safe
mutator.flush();
numPendingRequests.set(0);
checkErrorAndRethrow();
}
@Override
public void close() throws Exception {
closed = true;
if (mutator != null) {
try {
mutator.close();
} catch (IOException e) {
LOG.warn("Exception occurs while closing HBase BufferedMutator.", e);
}
this.mutator = null;
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
LOG.warn("Exception occurs while closing HBase Connection.", e);
}
this.connection = null;
}
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
if (executor != null) {
executor.shutdownNow();
}
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
while (numPendingRequests.get() != 0) {
flush();
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// nothing to do.
}
@Override
public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an exception
failureThrowable.compareAndSet(null, exception);
}
}
因为暂时只修改了HbaseUpsertSinkFunction所以无法通过写sql来配置kerberos信息,只能使用DataStream的方式来使用修改之后的HbaseUpsertSinkFunction。也就是查询完sql 之后调用 toRetractStream[Row] 方法之后,再手动addSink (new HbaseUpsertSinkFunction()),通过这种方式来使用HbaseUpsertSinkFunction。Flink-sql底层也是这么做的。
代码:
val deviceInfo: DataStream[tuple.Tuple2[lang.Boolean, Row]] = tableEnv.sqlQuery("select mac as rowkey, ROW(incident_sign ,system_sign,agent_version ,host_name ,ip ,mac ,system_info ,uuid ,date_time) as info from agetDeviceInfo")
.toRetractStream[Row].
map(st => {
//一定要重新调用一下map方法将类型转为java.lang.Boolean的。因为HbaseUpsertSinkFunction是java开发的,他继承的是 RichSinkFunction<Tuple2<Boolean, Row>> 这个都是java类型,所以要转一下
不然后面添加sink报类型不匹配
new org.apache.flink.api.java.tuple.Tuple2[lang.Boolean, Row](st._1, st._2)
})
//创建hbaseschema
val schema = new HBaseTableSchema()
schema.setRowKey("rowkey", classOf[String])
schema.addColumn("info", "incident_sign", classOf[String])
schema.addColumn("info", "system_sign", classOf[String])
schema.addColumn("info", "agent_version", classOf[String])
schema.addColumn("info", "host_name", classOf[String])
schema.addColumn("info", "ip", classOf[String])
schema.addColumn("info", "mac", classOf[String])
schema.addColumn("info", "system_info", classOf[String])
schema.addColumn("info", "uuid", classOf[String])
schema.addColumn("info", "date_time", classOf[String])
//添加hbaseUpsertSink
deviceInfo.addSink(FlinkUtil.getHbaseSink(params, schema, FlinkUtil.initHbaseProperties(params), "t_rsd_base_asset"))
def initHbaseProperties(conf: ParameterTool) = {
val properties = new Properties()
properties.setProperty("keytabpath", conf.get(Constants.HBASE_KEYTABPATH))
properties.setProperty("zookeeper.address", conf.get(Constants.ZOOKEEPER_ADDRESS))
properties.setProperty("zookeeper.port", conf.get(Constants.ZOOKEEPER_PORT))
properties.setProperty("zookeeper.node", conf.get(Constants.ZOOKEEPER_NODE))
properties.setProperty("hbaseRegionserverKerberosPrincipal", conf.get(Constants.HBASE_REGIN_SERVER_KERBEROS_PRINCIPAL))
properties.setProperty("hbaseMasterKerberosPrincipal", conf.get(Constants.HBASE_MASTER_KERBEROS_PRINCIPAL))
properties.setProperty("hbase.kerberos.principal", conf.get(Constants.HBASE_KERBEROS_PRINCIPAL))
properties.setProperty(Constants.HBASE_HAVE_KERBEROS, conf.get(Constants.HBASE_HAVE_KERBEROS))
properties
}
package com.rongan.realtime.util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rongan.realtime.constants.Constants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
/*
*/
public class HbaseMutilUtil {
private static Logger logger = LoggerFactory.getLogger(HbaseMutilUtil.class);
private HbaseMutilUtil() {
throw new IllegalStateException("Utility class");
}
/**
* 批量插入数据到hbase
*/
public static int batchInsertDataToTable(Table table, List<PutBean> putBeans) throws IOException {
List<Put> puts = new ArrayList<>();
for (PutBean putBean : putBeans) {
Put put = new Put(Bytes.toBytes(putBean.rowkey));
put.addColumn(Bytes.toBytes(putBean.cf), Bytes.toBytes(putBean.column), Bytes.toBytes(putBean.value));
puts.add(put);
}
table.put(puts);
table.close();
return 1;
}
public static void close(Table table) throws IOException {
if (table != null) {
table.close();
}
}
public static void close(ResultScanner scanner) {
if (scanner != null) {
scanner.close();
}
}
public static void close(Admin scanner) throws IOException {
if (scanner != null) {
scanner.close();
}
}
public static void close(Connection connection, Table table, Admin admin, ResultScanner scanner) throws IOException {
close(table);
close(connection);
close(scanner);
close(admin);
}
public static void close(Connection connection) throws IOException {
connection.close();
}
public static class PutBean {
private String rowkey;
private String cf;
private String column;
private String value;
public PutBean() {
}
public PutBean(String rowkey, String cf, String column, String value) {
this.rowkey = rowkey;
this.cf = cf;
this.column = column;
this.value = value;
}
public void setCf(String cf) {
this.cf = cf;
}
public void setColumn(String column) {
this.column = column;
}
public void setValue(String value) {
this.value = value;
}
public String getCf() {
return cf;
}
public String getColumn() {
return column;
}
public String getRowkey() {
return rowkey;
}
public void setRowkey(String rowkey) {
this.rowkey = rowkey;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return "PutBean{" +
"rowkey='" + rowkey + '\'' +
", cf='" + cf + '\'' +
", column='" + column + '\'' +
", value='" + value + '\'' +
'}';
}
}
public static Configuration getHbaseConf(JSONObject json) {
Configuration conf = HBaseConfiguration.create();
JSONObject hbaseConfig = json.getJSONObject("hbaseConfig");
String hbaseZks = hbaseConfig.getString("hbase_zookeeper_quorum");
String[] splits = hbaseZks.split(",");
StringBuilder zks = new StringBuilder();
for (int i = 0; i < splits.length; i++) {
zks.append(splits[i].split(":")[0]).append(",");
conf.set("hbase.zookeeper.property.clientPort", splits[i].split(":")[1]);
}
String quorum = zks.deleteCharAt(zks.length() - 1).toString();
conf.set("hbase.zookeeper.quorum", quorum);
conf.set("hbase.rootdir", hbaseConfig.getString("hbase_rootdir"));
String zookeeperZnodeParent = json.getString("zookeeperZnodeParent");
conf.set("zookeeper.znode.parent", zookeeperZnodeParent);
conf.set("hbase.client.retries.number", "3");
return conf;
}
/**
* 登陆kerberos
* @param hConfiguration
* @param configuration
* @return
*/
public static boolean loginKerberos(org.apache.hadoop.conf.Configuration hConfiguration, JSONObject configuration) {
String haveKerberos = configuration.getString("haveKerberos");
if (haveKerberos != null && haveKerberos.equals("true")) {
// //刷新上一次认证的信息,不刷新会报错。
String kerberosPrincipal = configuration.getString("kerberosPrincipal");
String kerberosKeyTablPath = configuration.getString("kerberosKeytabFilePath");
String hbaseRegionserverKerberosPrincipal = configuration.getString("hbaseRegionserverKerberosPrincipal");
String hbaseMasterKerberosPrincipal = configuration.getString("hbaseMasterKerberosPrincipal");
hConfiguration.set("hadoop.security.authentication", "kerberos");
hConfiguration.set("hbase.security.authentication", "kerberos");
hConfiguration.set("hbase.master.kerberos.principal", hbaseMasterKerberosPrincipal);
hConfiguration.set("hbase.regionserver.kerberos.principal", hbaseRegionserverKerberosPrincipal);
hConfiguration.setBoolean("hadoop.security.authorization", true);
// System.setProperty("java.security.krb5.conf", "D:\\kerberos\\krb5.conf");
UserGroupInformation.setConfiguration(hConfiguration);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeyTablPath);
return true;
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
} else {
return true;
}
return false;
}
/**
* 判断是否需要重新登陆kerberos,如果需要则重新登陆
* @throws IOException
*/
public static void reLoginKerberos() throws IOException {
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
}
/**
* 重置kerberos
*/
public static void reset(){
UserGroupInformation.setLoginUser(null);
HadoopKerberosName.setRules(null);
}
}