当前位置: 首页 > 知识库问答 >
问题:

从Apache Storm bolt实现HBase中值的插入和删除

高宏峻
2023-03-14
o.a.z.ClientCnxn [INFO] Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
o.a.z.ClientCnxn [INFO] Socket connection established to localhost/127.0.0.1:2181, initiating session
o.a.z.ClientCnxn [INFO] Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect
o.a.h.h.z.RecoverableZooKeeper [WARN] Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid

我的想法是通过在prepare方法中为螺栓打开连接的每个实例创建一个连接,并在cleanup时关闭它,从而减少到HBase的连接数量。但是,根据文档,不能保证在分布式模式下调用cleanup

在这之后,我找到了Storm使用hbase-storm-Hbase工作的框架。不幸的是,几乎没有关于它的信息,只有在它的github Repo上的自述。

  1. 那么我的第一个问题是,使用storm-hbase进行storm-hbase集成是否是一个好的解决方案?最好的方法是什么?

共有1个答案

寇夜洛
2023-03-14

哦,孩子,我该发光了!我已经做了大量的优化写到HBase从Storm,所以希望这将有助于您。

如果你刚刚开始storm-hbase是一个很好的方式开始流数据到HBase。您可以只克隆项目,进行maven安装,然后在拓扑中引用它。

但是,如果您开始获得更复杂的逻辑,那么创建自己的类来与HBase对话可能是可行的方法。这就是我在这里的回答中要展示的。

我假设您使用的是maven和maven-shade插件。您需要引用hbase-client:

<dependency>
   <groupId>org.apache.hbase</groupId>
   <artifactId>hbase-client</artifactId>
   <version>${hbase.version}</version>
</dependency>

还要确保在拓扑JAR中打包hbase-site.xml。您可以从集群下载该文件,并将其放在src/main/resources中。我还有一个名为hbase-site.dev.xml的dev测试工具。然后只需使用shade插件将其移动到JAR的根部。

<plugin>
   <groupId>org.apache.maven.plugins</groupId>
   <artifactId>maven-shade-plugin</artifactId>
   <version>2.4</version>
   <configuration>
      <createDependencyReducedPom>true</createDependencyReducedPom>
      <artifactSet>
         <excludes>
            <exclude>classworlds:classworlds</exclude>
            <exclude>junit:junit</exclude>
            <exclude>jmock:*</exclude>
            <exclude>*:xml-apis</exclude>
            <exclude>org.apache.maven:lib:tests</exclude>
            <exclude>log4j:log4j:jar:</exclude>
            <exclude>org.testng:testng</exclude>
         </excludes>
      </artifactSet>
   </configuration>
   <executions>
      <execution>
         <phase>package</phase>
         <goals>
            <goal>shade</goal>
         </goals>
         <configuration>
            <transformers>
               <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                       <resource>core-site.xml</resource>
                       <file>src/main/resources/core-site.xml</file>
                   </transformer>
                   <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                       <resource>hbase-site.xml</resource>
                       <file>src/main/resources/hbase-site.xml</file>
                   </transformer>
                   <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
                       <resource>hdfs-site.xml</resource>
                       <file>src/main/resources/hdfs-site.xml</file>
                   </transformer>
               <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
               <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass></mainClass>
               </transformer>
            </transformers>
            <filters>
               <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                     <exclude>META-INF/*.SF</exclude>
                     <exclude>META-INF/*.DSA</exclude>
                     <exclude>META-INF/*.RSA</exclude>
                     <exclude>junit/*</exclude>
                     <exclude>webapps/</exclude>
                     <exclude>testng*</exclude>
                     <exclude>*.js</exclude>
                     <exclude>*.png</exclude>
                     <exclude>*.css</exclude>
                     <exclude>*.json</exclude>
                     <exclude>*.csv</exclude>
                  </excludes>
               </filter>
            </filters>
         </configuration>
      </execution>
   </executions>
</plugin>
Configuration config = HBaseConfiguration.create();
connection = HConnectionManager.createConnection(config);
// single put method
private HConnection connection;

@SuppressWarnings("rawtypes")
@Override
public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) {
   Configuration config = HBaseConfiguration.create();
   connection = HConnectionManager.createConnection(config);
}

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
   try {
      // do stuff
      // call putFruit
   } catch (Exception e) {
      LOG.error("bolt error", e);
      collector.reportError(e);
   }
}

// example put method you'd call from within execute somewhere
private void putFruit(String key, FruitResult data) throws IOException {
   HTableInterface table = connection.getTable(Constants.TABLE_FRUIT);
   try {
     Put p = new Put(key.getBytes());
        long ts = data.getTimestamp();
        p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes());
        p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes());
        p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight()));
        table.put(p);
   } finally {
      try {
         table.close();
      } finally {
         // nothing
      }
   }
}

注意,我在这里重新使用了连接。我建议从这里开始,因为这样更容易工作和调试。最终,由于您试图通过网络发送的请求的数量,这将无法扩展,您将需要开始将多个放入批处理在一起。

为了批处理PUTs,您需要使用HConnection打开一个表,并保持其打开状态。您还需要将自动刷新设置为false。这意味着表将自动缓冲请求,直到它达到“hbase.client.write.buffer”大小(默认值为2097152)。

// batch put method
private static boolean AUTO_FLUSH = false;
private static boolean CLEAR_BUFFER_ON_FAIL = false;
private HConnection connection;
private HTableInterface fruitTable;

@SuppressWarnings("rawtypes")
@Override
public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) {
   Configuration config = HBaseConfiguration.create();
   connection = HConnectionManager.createConnection(config);
   fruitTable = connection.getTable(Constants.TABLE_FRUIT);
   fruitTable.setAutoFlush(AUTO_FLUSH, CLEAR_BUFFER_ON_FAIL);
}

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
   try {
      // do stuff
      // call putFruit
   } catch (Exception e) {
      LOG.error("bolt error", e);
      collector.reportError(e);
   }
}

// example put method you'd call from within execute somewhere
private void putFruit(String key, FruitResult data) throws IOException {
   Put p = new Put(key.getBytes());
   long ts = data.getTimestamp();
   p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes());
   p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes());
   p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight()));
   fruitTable.put(p);
}

在这两种方法中,最好还是在cleanup中尝试关闭HBase连接。请注意,在你的工人被杀之前,它可能不会被调用。

    null
 类似资料:
  • 我希望能够从分布式(而不是本地)Storm拓扑编写新的条目到HBase。有几个GitHub项目提供HBase映射器或预制的Storm bolts来将元组写入HBase。这些项目提供了在LocalCluster上执行其示例的说明。 我在这两个项目中遇到的问题,以及从bolt直接访问HBase API,是它们都需要在类路径中包含hbase-site.xml文件。使用直接API方法,或许也使用GitHu

  • 我正在研究一种算法,它将一小部分对象存储为一大部分对象的子列表。对象本身是有序的,因此需要一个有序列表。 最常见的操作将按频率顺序进行: 从列表中检索第n个元素(对于任意的n) 在列表的开头或结尾插入单曲 从列表中删除第一个或最后一个n个元素(对于一些任意的n) 从中间移除和插入永远不会完成,所以没有必要考虑其效率。 我的问题是,在Java(如LinkedList、ArrayList、Vector

  • 感谢您的时间和关注

  • 我的任务是创建函数来添加和删除链表中的节点,输入数据为int,字符为with函数调用。我不确定我做错了什么。我得到的唯一错误是:退出时返回代码为-11(SIGSEGV)。还有一个编译器方法:main。cpp:在函数“void listInsertValue(ListNode)”中* 感谢任何帮助。谢谢!

  • 本文向大家介绍Oracle + Mybatis实现批量插入、更新和删除示例代码,包括了Oracle + Mybatis实现批量插入、更新和删除示例代码的使用技巧和注意事项,需要的朋友参考一下 前言 Mybatis是web工程开发中非常常用的数据持久化的框架,通过该框架,我们非常容易的进行数据库的增删改查。数据库连接进行事务提交的时候,需要耗费的资源比较多,如果需要插入更新的数据比较多,而且每次事务

  • 本文向大家介绍JSP中使用JavaScript动态插入删除输入框实现代码,包括了JSP中使用JavaScript动态插入删除输入框实现代码的使用技巧和注意事项,需要的朋友参考一下 JavaScript代码: Jsp页面中的关键代码 设置table的id以便JavaScript中能识别该table