当前位置: 首页 > 工具软件 > Phoneix > 使用案例 >

Phoneix 写入优化

淳于亦
2023-12-01

phoenix 写入优化

phoenix是HBase上的SQL组件. 提供了基于jdbc访问HBase的方式. 在写入数据时可以通过phoenix的SQL将数据写入hbase . phoenix 提供了许多特性. 列编码压缩等. 但是在压测写入的时候发现了一些问题. 主要是写入性能不高. 我们主要是消费rabbitmq的数据然后通过phoenix再写入到hbase. 压测时每秒只能写入1700条左右. 经过以下调整提高到了7000每秒. 所以开始查找问题.总结了一下几点.

表元数据缓存

UPDATE_CACHE_FREQUENCY = 900000Phoneix在客户端会缓存表的元数据(表结构等), 如果表不频繁修改建议给一个时间. 避免每次插入都要查询RegionServer

批量commit

将多条数据一起commit, 类似于批插入.

连接池

在调整之前直接使用的hikari与phoenix整合进行写入. 发现性能并不高. phoenix不推荐使用连接池,因为其基于HBase的连接的创建成本很低. 建议用户自己写代码实现一个管理连接的工具类. 在alibaba的phoenix包中发现其自己做的一个连接池. 所以就拿过来使用.发现性能有很大提升.


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.phoenix.jdbc.PhoenixDriver;

import java.sql.*;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;

/**
 * A simple Phoenix connection pool.
 * Usage:
 * 1. create a pool with a connection string or a {@link PoolConfig} object
 * 2. call {@link #getConnection()} to get/create a connection from the pool and use this connection
 * 3. call {@link Connection#close()} to return the connection back to pool.
 * 4. When you need to shutdown, make sure that all the connections retrived from this pool have been closed,
 * then call {@link #close()} to destroy to pool.
 */
public class PhoenixConnectionPool {
    public static class PoolConfig {
        /**
         * [optional]
         * Pool name. Use unique names if you need multiple pools.
         */
        public String name = "HSQLPool";

        /**
         * [required]
         * JDBC connection string
         */
        public String connectionString = null;

        /**
         * [optional]
         * Per connection configuration
         */
        public Properties props = null;

        /**
         * [optional]
         * There's no upper limit of the connections you can get from the pool. But the pool won't hold all
         * the connections forever, so this is the number of connections that the pool will keep alive. When a
         * connection is returned and current pool size is greater than this, then this connection will be closed.
         */
        public int maxConnectionsToKeepAlive = 64;

        /**
         * [optional]
         * ClassLoader to load all classes of Phoenix JDBC driver.
         * Leave it null if you don't care about it.
         */
        public ClassLoader classLoader = null;

        public PoolConfig(String connectionString) {
            this.connectionString = connectionString;
        }

        public PoolConfig(String connectionString, Properties props) {
            this.connectionString = connectionString;
            this.props = props;
        }

        @Override
        public String toString() {
            StringBuilder str = new StringBuilder();
            str.append("name = [");
            str.append(name);
            str.append("], connString = [");
            str.append(connectionString);
            str.append("], maxConnectionsToKeepAlive = ");
            str.append(maxConnectionsToKeepAlive);
            str.append(", props = [");
            str.append(props);
            str.append("], classLoader = [");
            str.append(classLoader);
            str.append("]");
            return str.toString();
        }

        private void validateConfig() {
            if (connectionString == null || connectionString.isEmpty()) {
                throw new IllegalArgumentException(
                        "Invalid PhoenixConnectionPool config: connectionString must not be null or empty");
            }
            if (maxConnectionsToKeepAlive < 1) {
                throw new IllegalArgumentException(
                        "Invalid PhoenixConnectionPool config: maxConnectionsToKeepAlive must >= 1");
            }
        }
    }

    private static final Log LOG = LogFactory.getLog(PhoenixConnectionPool.class);
    private PoolConfig cfg;
    private ConcurrentLinkedQueue<Connection> pool = new ConcurrentLinkedQueue<Connection>();

    public PhoenixConnectionPool(String connectionString) {
        this(new PoolConfig(connectionString));
    }

    public PhoenixConnectionPool(String connectionString, Properties props) {
        this(new PoolConfig(connectionString, props));
    }

    public PhoenixConnectionPool(PoolConfig cfg) {
        // validate config
        cfg.validateConfig();
        this.cfg = cfg;

        // load JDBC driver class
        try {
            if (cfg.classLoader != null) {
                Class.forName(PhoenixDriver.class.getName(), true, cfg.classLoader);
            } else {
                Class.forName(PhoenixDriver.class.getName());
            }
        } catch (Throwable t) {
            LOG.error("Failed loading Phoenix JDBC driver", t);
            throw new RuntimeException("Failed loading Phoenix JDBC driver", t);
        }

        // dump the configurations to log
        LOG.info("PhoenixConnectionPool created. config [" + cfg.toString() + "]");
    }

    /**
     * Close all connections currently in this pool.
     * Before calling this method, please make sure that all connections retrieved from this pool have been closed.
     */
    public void close() throws SQLException {
        while (true) {
            PDelegateConnection conn = (PDelegateConnection) pool.poll();
            if (conn != null) {
                conn.getDelegate().close();
            } else {
                return;
            }
        }
    }

    /**
     * Get a connection.
     * Client should call {@link Connection#close()} after use and the connection will be automatically returned to pool.
     */
    public Connection getConnection() throws SQLException {
        Connection conn = pool.poll();
        if (conn != null) {
            return conn;
        } else {
            if (cfg.props != null) {
                return new PDelegateConnection(DriverManager.getConnection(cfg.connectionString, cfg.props));
            } else {
                return new PDelegateConnection(DriverManager.getConnection(cfg.connectionString));
            }
        }
    }

    private void putConnection(Connection conn) throws SQLException {
        PDelegateConnection dc = (PDelegateConnection) conn;
        if (pool.size() > cfg.maxConnectionsToKeepAlive) {
            // there's enough connection in pool, close this one
            dc.getDelegate().close();
        } else {
            // put this connection to pool
            pool.add(conn);
        }
    }

    private class PDelegateConnection implements Connection {
        private Connection delegate;

        public PDelegateConnection(Connection delegate) {
            this.delegate = delegate;
        }

        public Connection getDelegate() {
            return this.delegate;
        }

        @Override
        public void close() throws SQLException {
            // don't actually close this connection, just return it back to pool
            putConnection(this);
        }

        @Override
        public Statement createStatement() throws SQLException {
            return delegate.createStatement();
        }

        @Override
        public PreparedStatement prepareStatement(String sql) throws SQLException {
            return delegate.prepareStatement(sql);
        }

        @Override
        public CallableStatement prepareCall(String sql) throws SQLException {
            return delegate.prepareCall(sql);
        }

        @Override
        public String nativeSQL(String sql) throws SQLException {
            return delegate.nativeSQL(sql);
        }

        @Override
        public void setAutoCommit(boolean autoCommit) throws SQLException {
            delegate.setAutoCommit(autoCommit);
        }

        @Override
        public boolean getAutoCommit() throws SQLException {
            return delegate.getAutoCommit();
        }

        @Override
        public void commit() throws SQLException {
            delegate.commit();
        }

        @Override
        public void rollback() throws SQLException {
            delegate.rollback();
        }

        @Override
        public boolean isClosed() throws SQLException {
            return delegate.isClosed();
        }

        @Override
        public DatabaseMetaData getMetaData() throws SQLException {
            return delegate.getMetaData();
        }

        @Override
        public void setReadOnly(boolean readOnly) throws SQLException {
            delegate.setReadOnly(readOnly);
        }

        @Override
        public boolean isReadOnly() throws SQLException {
            return delegate.isReadOnly();
        }

        @Override
        public void setCatalog(String catalog) throws SQLException {
            delegate.setCatalog(catalog);
        }

        @Override
        public String getCatalog() throws SQLException {
            return delegate.getCatalog();
        }

        @Override
        public void setTransactionIsolation(int level) throws SQLException {
            delegate.setTransactionIsolation(level);
        }

        @Override
        public int getTransactionIsolation() throws SQLException {
            return delegate.getTransactionIsolation();
        }

        @Override
        public SQLWarning getWarnings() throws SQLException {
            return delegate.getWarnings();
        }

        @Override
        public void clearWarnings() throws SQLException {
            delegate.clearWarnings();
        }

        @Override
        public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
            return delegate.createStatement(resultSetType, resultSetConcurrency);
        }

        @Override
        public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
            return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency);
        }

        @Override
        public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
            return delegate.prepareCall(sql, resultSetType, resultSetConcurrency);
        }

        @Override
        public java.util.Map<String, Class<?>> getTypeMap() throws SQLException {
            return delegate.getTypeMap();
        }

        @Override
        public void setTypeMap(java.util.Map<String, Class<?>> map) throws SQLException {
            delegate.setTypeMap(map);
        }

        @Override
        public void setHoldability(int holdability) throws SQLException {
            delegate.setHoldability(holdability);
        }

        @Override
        public int getHoldability() throws SQLException {
            return delegate.getHoldability();
        }

        @Override
        public Savepoint setSavepoint() throws SQLException {
            return delegate.setSavepoint();
        }

        @Override
        public Savepoint setSavepoint(String name) throws SQLException {
            return delegate.setSavepoint(name);
        }

        @Override
        public void rollback(Savepoint savepoint) throws SQLException {
            delegate.rollback(savepoint);
        }

        @Override
        public void releaseSavepoint(Savepoint savepoint) throws SQLException {
            delegate.releaseSavepoint(savepoint);
        }

        @Override
        public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
                throws SQLException {
            return delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
        }

        @Override
        public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency,
                                                  int resultSetHoldability) throws SQLException {
            return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
        }

        @Override
        public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency,
                                             int resultSetHoldability) throws SQLException {
            return delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
        }

        @Override
        public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
            return delegate.prepareStatement(sql, autoGeneratedKeys);
        }

        @Override
        public PreparedStatement prepareStatement(String sql, int columnIndexes[]) throws SQLException {
            return delegate.prepareStatement(sql, columnIndexes);
        }

        @Override
        public PreparedStatement prepareStatement(String sql, String columnNames[]) throws SQLException {
            return delegate.prepareStatement(sql, columnNames);
        }

        @Override
        public Clob createClob() throws SQLException {
            return delegate.createClob();
        }

        @Override
        public Blob createBlob() throws SQLException {
            return delegate.createBlob();
        }

        @Override
        public NClob createNClob() throws SQLException {
            return delegate.createNClob();
        }

        @Override
        public SQLXML createSQLXML() throws SQLException {
            return delegate.createSQLXML();
        }

        @Override
        public boolean isValid(int timeout) throws SQLException {
            return delegate.isValid(timeout);
        }

        @Override
        public void setClientInfo(String name, String value) throws SQLClientInfoException {
            delegate.setClientInfo(name, value);
        }

        @Override
        public void setClientInfo(Properties properties) throws SQLClientInfoException {
            delegate.setClientInfo(properties);
        }

        @Override
        public String getClientInfo(String name) throws SQLException {
            return delegate.getClientInfo(name);
        }

        @Override
        public Properties getClientInfo() throws SQLException {
            return delegate.getClientInfo();
        }

        @Override
        public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
            return delegate.createArrayOf(typeName, elements);
        }

        @Override
        public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
            return delegate.createStruct(typeName, attributes);
        }

        @Override
        public void setSchema(String schema) throws SQLException {
            delegate.setSchema(schema);
        }

        @Override
        public String getSchema() throws SQLException {
            return delegate.getSchema();
        }

        @Override
        public void abort(Executor executor) throws SQLException {
            delegate.abort(executor);
        }

        @Override
        public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
            delegate.setNetworkTimeout(executor, milliseconds);
        }

        @Override
        public int getNetworkTimeout() throws SQLException {
            return delegate.getNetworkTimeout();
        }

        @Override
        public <T> T unwrap(java.lang.Class<T> iface) throws java.sql.SQLException {
            return delegate.unwrap(iface);
        }

        @Override
        public boolean isWrapperFor(java.lang.Class<?> iface) throws java.sql.SQLException {
            return delegate.isWrapperFor(iface);
        }
    }
}

我们在写入时使用了这个连接池. 但是在查询数据时还是用hikari的连接池.

参考

  • https://stackoverflow.com/questions/35183713/what-is-the-correct-way-to-pool-the-phoenix-query-server-connections
 类似资料: