当前位置: 首页 > 知识库问答 >
问题:

Cassandra:如何使用CQL以良好的性能插入新的宽行

贡俊
2023-03-14

我在评估卡桑德拉。我正在使用datastax驱动程序和CQL。

+-------+-------+-------+-------+-------+-------+
|       | name1 | name2 | name3 | ...   | nameN |
| time  +-------+-------+-------+-------+-------+
|       | val1  | val2  | val3  | ...   | valN  |
+-------+-------+-------+-------|-------+-------+
CREATE TABLE IF NOT EXISTS test.wide (
  time varchar,
  name varchar,
  value varchar,
  PRIMARY KEY (time,name))
  WITH COMPACT STORAGE

我必须能够处理每秒至少1000个宽行插入,使用不同但数量很大(~1000)的名称/值对。

问题是:我编写了一个简单的基准测试,它执行1000个宽行插入,每个插入10000个名称/值对。我使用CQL和datastax驱动程序的性能非常慢,而不使用CQL的版本(使用astyanax)在相同的测试集群上有很好的性能。

我已经读过这个相关的问题,在这个问题的公认答案中,建议您应该能够通过使用批处理准备语句来原子地快速创建一个新的宽行,这些语句在Cassandra2中可用。

>

  • 它实际上是不推荐的,还是只是因为它是较低的级别而不方便使用?

    我是否能够用CQL查询使用thrift api创建的表?

    下面是Scala中的一个自包含代码示例。它只需创建一个batch语句,用于插入具有10000列的宽行,并重复执行插入性能。

    package cassandra
    
    import com.datastax.driver.core._
    
    object CassandraTestMinimized extends App {
    
      val keyspace = "test"
      val table = "wide"
      val tableName = s"$keyspace.$table"
    
      def createKeyspace = s"""
    CREATE KEYSPACE IF NOT EXISTS ${keyspace}
    WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
    """
    
      def createWideTable = s"""
    CREATE TABLE IF NOT EXISTS ${tableName} (
    time varchar,
    name varchar,
    value varchar,
    PRIMARY KEY (time,name))
    WITH COMPACT STORAGE
    """
    
      def writeTimeNameValue(time: String) = s"""
    INSERT INTO ${tableName} (time, name, value)
    VALUES ('$time', ?, ?)
    """
    
      val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
      val session = cluster.connect()
    
      session.execute(createKeyspace)
      session.execute(createWideTable)
    
      for(i<-0 until 1000) {
        val entries =
          for {
            i <- 0 until 10000
            name = i.toString
            value = name
          } yield name -> value
        val batchPreparedStatement = writeMap(i, entries)
        val t0 = System.nanoTime()
        session.execute(batchPreparedStatement)
        val dt = System.nanoTime() - t0
        println(i + " " + (dt/1.0e9))
      }
    
      def writeMap(time: Long, update: Seq[(String, String)]) : BatchStatement = {
        val template = session
          .prepare(writeTimeNameValue(time.toString))
          .setConsistencyLevel(ConsistencyLevel.ONE)
        val batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
        for ((k, v) <- update)
          batch.add(template.bind(k, v))
        batch
      }
    }
    
    package cassandra;
    
    import java.util.Iterator;
    
    import com.netflix.astyanax.ColumnListMutation;
    import com.netflix.astyanax.serializers.AsciiSerializer;
    import com.netflix.astyanax.serializers.LongSerializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.netflix.astyanax.AstyanaxContext;
    import com.netflix.astyanax.Keyspace;
    import com.netflix.astyanax.MutationBatch;
    import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
    import com.netflix.astyanax.connectionpool.OperationResult;
    import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
    import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
    import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
    import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
    import com.netflix.astyanax.model.Column;
    import com.netflix.astyanax.model.ColumnFamily;
    import com.netflix.astyanax.model.ColumnList;
    import com.netflix.astyanax.thrift.ThriftFamilyFactory;
    
    public class AstClient {
        private static final Logger logger = LoggerFactory.getLogger(AstClient.class);
    
        private AstyanaxContext<Keyspace> context;
        private Keyspace keyspace;
        private ColumnFamily<Long, String> EMP_CF;
        private static final String EMP_CF_NAME = "employees2";
    
        public void init() {
            logger.debug("init()");
    
            context = new AstyanaxContext.Builder()
                    .forCluster("Test Cluster")
                    .forKeyspace("test1")
                    .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                            .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                    )
                    .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
                            .setPort(9160)
                            .setMaxConnsPerHost(1)
                            .setSeeds("127.0.0.1:9160")
                    )
                    .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                            .setCqlVersion("3.0.0")
                            .setTargetCassandraVersion("2.0.5"))
                    .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                    .buildKeyspace(ThriftFamilyFactory.getInstance());
    
            context.start();
            keyspace = context.getClient();
    
            EMP_CF = ColumnFamily.newColumnFamily(
                    EMP_CF_NAME,
                    LongSerializer.get(),
                    AsciiSerializer.get());
        }
    
        public void insert(long time) {
            MutationBatch m = keyspace.prepareMutationBatch();
    
            ColumnListMutation<String> x =
                    m.withRow(EMP_CF, time);
            for(int i=0;i<10000;i++)
                x.putColumn(Integer.toString(i), Integer.toString(i));
    
            try {
                @SuppressWarnings("unused")
                Object result = m.execute();
            } catch (ConnectionException e) {
                logger.error("failed to write data to C*", e);
                throw new RuntimeException("failed to write data to C*", e);
            }
            logger.debug("insert ok");
        }
    
        public void createCF() {
        }
    
        public void read(long time) {
            OperationResult<ColumnList<String>> result;
            try {
                result = keyspace.prepareQuery(EMP_CF)
                        .getKey(time)
                        .execute();
    
                ColumnList<String> cols = result.getResult();
                // process data
    
                // a) iterate over columsn
                for (Iterator<Column<String>> i = cols.iterator(); i.hasNext(); ) {
                    Column<String> c = i.next();
                    String v = c.getStringValue();
                    System.out.println(c.getName() + " " + v);
                }
    
            } catch (ConnectionException e) {
                logger.error("failed to read from C*", e);
                throw new RuntimeException("failed to read from C*", e);
            }
        }
    
        public static void main(String[] args) {
            AstClient c = new AstClient();
            c.init();
            long t00 = System.nanoTime();
            for(int i=0;i<1000;i++) {
                long t0 = System.nanoTime();
                c.insert(i);
                long dt = System.nanoTime() - t0;
                System.out.println((1.0e9/dt) + " " + i);
            }
            long dtt = System.nanoTime() - t00;
    
            c.read(0);
            System.out.println(dtt / 1e9);
        }
    
    }
    

    更新2:我已经试用了CASSANDRA-6737附带的修补程序,我可以确认这个修补程序完全修复了这个问题。感谢DataStax的Sylvain Lebresne这么快就修复了这个问题!

  • 共有1个答案

    郎鸿雪
    2023-03-14

    您的代码中有一个错误,我认为这解释了您所看到的许多性能问题:对于每个批处理,您都要重新准备语句。准备一个语句并不是非常昂贵的,但是这样做会增加很多延迟。等待该语句准备的时间是不构建批处理的时间,也是Cassandra不处理该批处理的时间。一个准备好的语句只需要准备一次,并且应该重复使用。

    我认为很多性能不好的原因可以解释为延迟问题。瓶颈很可能是您的应用程序代码,而不是Cassandra。即使只准备一次该语句,您仍然会将大部分时间花费在应用程序中的CPU绑定(构建一个大批处理)或什么都不做(等待网络和Cassandra)。

    您可以做两件事:首先,使用CQL驱动程序的异步API,在network和Cassandra忙于您刚刚完成的一个时构建下一个批;其次,尝试运行多个线程来执行相同的操作。您必须试验的线程的确切数量取决于您拥有的核心数,以及在同一台机器上运行一个或三个节点。

    如果使用批处理,就使用压缩。压缩在大多数请求负载中没有什么不同(响应是另一回事),但是当您发送大量的批时,它会产生很大的不同。

    在卡桑德拉中,宽行写没有什么特别的。除了一些例外,模式不会改变处理写所需的时间。我运行的应用程序每秒进行数万次非批处理的混合宽行和非宽行写操作。集群不大,每个只有三到四个M1.xLarge EC2节点。诀窍是永远不要等待一个请求返回后再发送下一个请求(这并不意味着触发和忘记,只是以同样的异步方式处理响应)。延迟是性能杀手。

     类似资料:
    • 谁能告诉我为什么火花连接器要花这么多时间插入?我在代码中做了什么错误吗?或者使用spark-cassandra连接器进行插入操作是否不可取?

    • 我正在编写一些python代码,这些代码将随着时间的推移收集数据。我要把这个存放在卡桑德拉。我花了一整天的时间在这个问题上,但找不到有效的东西。 我可以创建表,但在插入各种数据时遇到了麻烦(time_current timestamp,data blob)。我无法正确格式化它。我计划按小时划分行(在我的用例中,数据大小应该很好),每个数据条目都有列(2-3/分钟)。 2)我的数据是一个小纸条。它将

    • 我必须为每个客户端每秒存储大约250个数值,即每小时大约90万个数字。它可能不会是全天的记录(可能每天5-10个小时),但我会根据客户端ID和读取日期对数据进行分区。最大行长约为22-23M,这仍然是可管理的。无论如何,我的方案看起来像这样: 密钥空间的复制因子为2,仅用于测试,告密者为和。我知道复制因子3更符合生产标准。 接下来,我在公司服务器上创建了一个小型集群,三台裸机虚拟化机器,具有2个C

    • 问题内容: 像Go这样的类型,并且不能存储null值,因此我发现可以为此使用sql.NullInt64和sql.NullString。 但是,当我在Struct中使用它们,并使用json包从Struct生成JSON时,格式与使用常规和类型时不同。 JSON具有附加级别,因为sql.Null ***也是Struct。 有没有很好的解决方法,还是应该在我的SQL数据库中不使用NULL? 问题答案: 像

    • 我正在尝试使用CQL将我的域模型放入卡桑德拉中。假设我有USER_FAVOURITES张桌子。每个收藏夹都有 ID 作为主键。我想按顺序存储最多 10 个多个字段的记录的列表,field_name、field_location等。 对这样的表进行建模是个好主意吗? 并且对象将由匹配索引的列表项构成(例如 我总是在一起查询收藏夹。我可能想添加和项目到某个位置,开始,结束或中间。 这是一个好的做法吗?

    • node-cassandra-cql 是一个 Apache Cassandra CQL3 二进制协议的 Node.js CQL 驱动。CQL 是 Cassandra 的查询语言。该项目提供到多个主机的连接池、查询参数,以及可通过列名获取数值和支持 bigint。 示例代码: // Creating a new connection pool to multiple hosts.var cql =