当前位置: 首页 > 知识库问答 >
问题:

在hbase中插入Spark dataframe

山森
2023-03-14
 --------------------
|id | name | address |
|--------------------|
|23 |marry |france   |
|--------------------|
|87 |zied  |italie   |
 --------------------
val tableName = "two"
val conf = HBaseConfiguration.create()
if(!admin.isTableAvailable(tableName)) {
          print("-----------------------------------------------------------------------------------------------------------")
          val tableDesc = new HTableDescriptor(tableName)
          tableDesc.addFamily(new HColumnDescriptor("z1".getBytes()))
          admin.createTable(tableDesc)
        }else{
          print("Table already exists!!--------------------------------------------------------------------------------------")
        }
val myTable = new HTable(conf, tableName)
    for (i <- 0 to 1000) {
      var p = new Put(Bytes.toBytes(""+i))
      p.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(""+(i*5)))
      p.add("z1".getBytes(), "age".getBytes(), Bytes.toBytes("2017-04-20"))
      p.add("z2".getBytes(), "job".getBytes(), Bytes.toBytes(""+i))
      p.add("z2".getBytes(), "salary".getBytes(), Bytes.toBytes(""+i))
      myTable.put(p)
    }
    myTable.flushCommits()

感谢您的时间和关注

共有1个答案

齐英耀
2023-03-14

下面是使用Maven中Hortonworks提供的spark hbase连接器的完整示例。

此示例显示

  • 如何检查HBase表是否存在
  • 如果不存在,则创建HBase表
  • 将DataFrame插入HBase表
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, TableDescriptorBuilder}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog

object Main extends App {

  case class Employee(key: String, fName: String, lName: String, mName: String,
                      addressLine: String, city: String, state: String, zipCode: String)

  // as pre-requisites the table 'employee' with column families 'person' and 'address' should exist
  val tableNameString = "default:employee"
  val colFamilyPString = "person"
  val colFamilyAString = "address"
  val tableName = TableName.valueOf(tableNameString)
  val colFamilyP = colFamilyPString.getBytes
  val colFamilyA = colFamilyAString.getBytes

  val hBaseConf = HBaseConfiguration.create()
  val connection = ConnectionFactory.createConnection(hBaseConf);
  val admin = connection.getAdmin();

  println("Check if table 'employee' exists:")
  val tableExistsCheck: Boolean = admin.tableExists(tableName)
  println(s"Table " + tableName.toString + " exists? " + tableExistsCheck)

  if(tableExistsCheck == false) {
    println("Create Table employee with column families 'person' and 'address'")
    val colFamilyBuild1 = ColumnFamilyDescriptorBuilder.newBuilder(colFamilyP).build()
    val colFamilyBuild2 = ColumnFamilyDescriptorBuilder.newBuilder(colFamilyA).build()
    val tableDescriptorBuild = TableDescriptorBuilder.newBuilder(tableName)
      .setColumnFamily(colFamilyBuild1)
      .setColumnFamily(colFamilyBuild2)
      .build()
    admin.createTable(tableDescriptorBuild)
  }

  // define schema for the dataframe that should be loaded into HBase
  def catalog =
    s"""{
       |"table":{"namespace":"default","name":"employee"},
       |"rowkey":"key",
       |"columns":{
       |"key":{"cf":"rowkey","col":"key","type":"string"},
       |"fName":{"cf":"person","col":"firstName","type":"string"},
       |"lName":{"cf":"person","col":"lastName","type":"string"},
       |"mName":{"cf":"person","col":"middleName","type":"string"},
       |"addressLine":{"cf":"address","col":"addressLine","type":"string"},
       |"city":{"cf":"address","col":"city","type":"string"},
       |"state":{"cf":"address","col":"state","type":"string"},
       |"zipCode":{"cf":"address","col":"zipCode","type":"string"}
       |}
       |}""".stripMargin

  // define some test data
  val data = Seq(
    Employee("1","Horst","Hans","A","12main","NYC","NY","123"),
    Employee("2","Joe","Bill","B","1337ave","LA","CA","456"),
    Employee("3","Mohammed","Mohammed","C","1Apple","SanFran","CA","678")
  )

  // create SparkSession
  val spark: SparkSession = SparkSession.builder()
    .master("local[*]")
    .appName("HBaseConnector")
    .getOrCreate()

  // serialize data
  import spark.implicits._
  val df = spark.sparkContext.parallelize(data).toDF

  // write dataframe into HBase
  df.write.options(
    Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "3")) // create 3 regions
    .format("org.apache.spark.sql.execution.datasources.hbase")
    .save()

}
 类似资料:
  • 我希望能够从分布式(而不是本地)Storm拓扑编写新的条目到HBase。有几个GitHub项目提供HBase映射器或预制的Storm bolts来将元组写入HBase。这些项目提供了在LocalCluster上执行其示例的说明。 我在这两个项目中遇到的问题,以及从bolt直接访问HBase API,是它们都需要在类路径中包含hbase-site.xml文件。使用直接API方法,或许也使用GitHu

  • 我的想法是通过在方法中为螺栓打开连接的每个实例创建一个连接,并在时关闭它,从而减少到HBase的连接数量。但是,根据文档,不能保证在分布式模式下调用。 在这之后,我找到了Storm使用hbase-storm-Hbase工作的框架。不幸的是,几乎没有关于它的信息,只有在它的github Repo上的自述。 那么我的第一个问题是,使用storm-hbase进行storm-hbase集成是否是一个好的解

  • 问题内容: 我已经使用hive在hbase中创建了一个表: 并创建了另一个表来加载数据: 最后将数据插入到hbase表中: 该表在hbase中如下所示: 我可以对JSON文件做同样的事情: 并做: 请帮忙 !:) 问题答案: 您可以使用该函数将数据解析为JSON对象。例如,如果您使用JSON数据创建登台表: 然后使用提取要加载到表中的属性: 有此功能的更全面的讨论在这里。

  • 问题内容: 我有一个具有contenteditable设置的div,并且在按下回车键时使用jquery捕获了按键,以调用preventDefault()。类似于 这个在光标处插入文本的问题,我想直接插入html,为简便起见,我们将其称为br标签。使用上面问题的答案实际上在IE中有效,因为它使用range.pasteHTML方法,但是在其他浏览器中,br标签将显示为纯文本而不是html。如何修改答案

  • 一、概述 HBase是Hadoop的一个子项目,HBase采用了Google BigTable的稀疏的,面向列的数据库实现方式的理论,建立在hadoop的hdfs上,一方面里用了hdfs的高可靠性和可伸缩行,另外一方面里用了BigTable的高效数据组织形式.可以说HBase为海量数据的real-time相应提供了很好的一个开源解决方案.据说在某运营商中使用类似于BigTable(个人猜测应该就是

  • 使用与hive 2.2.0集成的Spark2.3thriftserver。从火花直线运行。尝试将数据插入配置单元hbase表(以hbase作为存储的配置单元表)。插入到配置单元本机表是可以的。当插入到配置单元hbase表时,它会引发以下异常: