o.a.z.ClientCnxn [INFO] Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
o.a.z.ClientCnxn [INFO] Socket connection established to localhost/127.0.0.1:2181, initiating session
o.a.z.ClientCnxn [INFO] Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect
o.a.h.h.z.RecoverableZooKeeper [WARN] Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
我的想法是通过在prepare
方法中为螺栓打开连接的每个实例创建一个连接,并在cleanup
时关闭它,从而减少到HBase的连接数量。但是,根据文档,不能保证在分布式模式下调用cleanup
。
在这之后,我找到了Storm使用hbase-storm-Hbase工作的框架。不幸的是,几乎没有关于它的信息,只有在它的github Repo上的自述。
哦,孩子,我该发光了!我已经做了大量的优化写到HBase从Storm,所以希望这将有助于您。
如果你刚刚开始storm-hbase是一个很好的方式开始流数据到HBase。您可以只克隆项目,进行maven安装,然后在拓扑中引用它。
但是,如果您开始获得更复杂的逻辑,那么创建自己的类来与HBase对话可能是可行的方法。这就是我在这里的回答中要展示的。
我假设您使用的是maven和maven-shade插件。您需要引用hbase-client:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
还要确保在拓扑JAR中打包hbase-site.xml
。您可以从集群下载该文件,并将其放在src/main/resources
中。我还有一个名为hbase-site.dev.xml
的dev测试工具。然后只需使用shade插件将其移动到JAR的根部。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>classworlds:classworlds</exclude>
<exclude>junit:junit</exclude>
<exclude>jmock:*</exclude>
<exclude>*:xml-apis</exclude>
<exclude>org.apache.maven:lib:tests</exclude>
<exclude>log4j:log4j:jar:</exclude>
<exclude>org.testng:testng</exclude>
</excludes>
</artifactSet>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>core-site.xml</resource>
<file>src/main/resources/core-site.xml</file>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>hbase-site.xml</resource>
<file>src/main/resources/hbase-site.xml</file>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer">
<resource>hdfs-site.xml</resource>
<file>src/main/resources/hdfs-site.xml</file>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>junit/*</exclude>
<exclude>webapps/</exclude>
<exclude>testng*</exclude>
<exclude>*.js</exclude>
<exclude>*.png</exclude>
<exclude>*.css</exclude>
<exclude>*.json</exclude>
<exclude>*.csv</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
Configuration config = HBaseConfiguration.create();
connection = HConnectionManager.createConnection(config);
// single put method
private HConnection connection;
@SuppressWarnings("rawtypes")
@Override
public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) {
Configuration config = HBaseConfiguration.create();
connection = HConnectionManager.createConnection(config);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
// do stuff
// call putFruit
} catch (Exception e) {
LOG.error("bolt error", e);
collector.reportError(e);
}
}
// example put method you'd call from within execute somewhere
private void putFruit(String key, FruitResult data) throws IOException {
HTableInterface table = connection.getTable(Constants.TABLE_FRUIT);
try {
Put p = new Put(key.getBytes());
long ts = data.getTimestamp();
p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes());
p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes());
p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight()));
table.put(p);
} finally {
try {
table.close();
} finally {
// nothing
}
}
}
注意,我在这里重新使用了连接。我建议从这里开始,因为这样更容易工作和调试。最终,由于您试图通过网络发送的请求的数量,这将无法扩展,您将需要开始将多个放入批处理在一起。
为了批处理PUTs,您需要使用HConnection打开一个表,并保持其打开状态。您还需要将自动刷新设置为false。这意味着表将自动缓冲请求,直到它达到“hbase.client.write.buffer”大小(默认值为2097152)。
// batch put method
private static boolean AUTO_FLUSH = false;
private static boolean CLEAR_BUFFER_ON_FAIL = false;
private HConnection connection;
private HTableInterface fruitTable;
@SuppressWarnings("rawtypes")
@Override
public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) {
Configuration config = HBaseConfiguration.create();
connection = HConnectionManager.createConnection(config);
fruitTable = connection.getTable(Constants.TABLE_FRUIT);
fruitTable.setAutoFlush(AUTO_FLUSH, CLEAR_BUFFER_ON_FAIL);
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
try {
// do stuff
// call putFruit
} catch (Exception e) {
LOG.error("bolt error", e);
collector.reportError(e);
}
}
// example put method you'd call from within execute somewhere
private void putFruit(String key, FruitResult data) throws IOException {
Put p = new Put(key.getBytes());
long ts = data.getTimestamp();
p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes());
p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes());
p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight()));
fruitTable.put(p);
}
在这两种方法中,最好还是在cleanup
中尝试关闭HBase连接。请注意,在你的工人被杀之前,它可能不会被调用。
我希望能够从分布式(而不是本地)Storm拓扑编写新的条目到HBase。有几个GitHub项目提供HBase映射器或预制的Storm bolts来将元组写入HBase。这些项目提供了在LocalCluster上执行其示例的说明。 我在这两个项目中遇到的问题,以及从bolt直接访问HBase API,是它们都需要在类路径中包含hbase-site.xml文件。使用直接API方法,或许也使用GitHu
我正在研究一种算法,它将一小部分对象存储为一大部分对象的子列表。对象本身是有序的,因此需要一个有序列表。 最常见的操作将按频率顺序进行: 从列表中检索第n个元素(对于任意的n) 在列表的开头或结尾插入单曲 从列表中删除第一个或最后一个n个元素(对于一些任意的n) 从中间移除和插入永远不会完成,所以没有必要考虑其效率。 我的问题是,在Java(如LinkedList、ArrayList、Vector
感谢您的时间和关注
我的任务是创建函数来添加和删除链表中的节点,输入数据为int,字符为with函数调用。我不确定我做错了什么。我得到的唯一错误是:退出时返回代码为-11(SIGSEGV)。还有一个编译器方法:main。cpp:在函数“void listInsertValue(ListNode)”中* 感谢任何帮助。谢谢!
本文向大家介绍Oracle + Mybatis实现批量插入、更新和删除示例代码,包括了Oracle + Mybatis实现批量插入、更新和删除示例代码的使用技巧和注意事项,需要的朋友参考一下 前言 Mybatis是web工程开发中非常常用的数据持久化的框架,通过该框架,我们非常容易的进行数据库的增删改查。数据库连接进行事务提交的时候,需要耗费的资源比较多,如果需要插入更新的数据比较多,而且每次事务
本文向大家介绍JSP中使用JavaScript动态插入删除输入框实现代码,包括了JSP中使用JavaScript动态插入删除输入框实现代码的使用技巧和注意事项,需要的朋友参考一下 JavaScript代码: Jsp页面中的关键代码 设置table的id以便JavaScript中能识别该table