package com.cloudera.examples.hbase.bulkimport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * HBase bulk import example
* Data preparation MapReduce job driver **
- args[0]: HDFS input path *
- args[1]: HDFS output path *
- args[2]: HBase table name *
package com.cloudera.examples.hbase.bulkimport; /** * HBase table columns for the 'srv' column family */ public enum HColumnEnum { SRV_COL_employeeid ("employeeid".getBytes()), SRV_COL_eventdesc ("eventdesc".getBytes()), SRV_COL_eventdate ("eventdate".getBytes()), SRV_COL_objectname ("objectname".getBytes()), SRV_COL_objectfolder ("objectfolder".getBytes()), SRV_COL_ipaddress ("ipaddress".getBytes()); private final byte[] columnName; HColumnEnum (byte[] column) { this.columnName = column; } public byte[] getColumnName() { return this.columnName; } }hBaseKvMapper.java
package com.cloudera.examples.hbase.bulkimport; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import au.com.bytecode.opencsv.CSVParser; /** * HBase bulk import example * <p> * Parses Facebook and Twitter messages from CSV files and outputs * <ImmutableBytesWritable, KeyValue>. * <p> * The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it * into the correct HBase table region. * <p> * The KeyValue value holds the HBase mutation information (column family, * column, and value) */ public class HBaseKVMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { final static byte[] SRV_COL_FAM = "srv".getBytes(); final static int NUM_FIELDS = 6; CSVParser csvParser = new CSVParser(); int tipOffSeconds = 0; String tableName = ""; // DateTimeFormatter p = DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss") // .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT")); ImmutableBytesWritable hKey = new ImmutableBytesWritable(); KeyValue kv; /** {@inheritDoc} */ @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration c = context.getConfiguration(); // tipOffSeconds = c.getInt("epoch.seconds.tipoff", 0); tableName = c.get("hbase.table.name"); } /** {@inheritDoc} */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /*if (value.find("Service,Term,") > -1) { // Skip header return; }*/ String[] fields = null; try { fields = value.toString().split(","); //csvParser.parseLine(value.toString()); } catch (Exception ex) { context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1); return; } if (fields.length != NUM_FIELDS) { context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1); return; } // Get game offset in seconds from tip-off /* DateTime dt = null; try { dt = p.parseDateTime(fields[9]); } catch (Exception ex) { context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1); return; } int gameOffset = (int) ((dt.getMillis() / 1000) - tipOffSeconds); String offsetForKey = String.format("%04d", gameOffset); String username = fields[2]; if (username.equals("")) { username = fields[3]; }*/ // Key: e.g. "1200:twitter:jrkinley" hKey.set(String.format("%s|%s|%s|%s|%s|%s", fields[0], fields[1], fields[2],fields[3],fields[4],fields[5]) .getBytes()); // Service columns if (!fields[0].equals("")) { kv = new KeyValue(hKey.get(), SRV_COL_FAM, HColumnEnum.SRV_COL_employeeid.getColumnName(), fields[0].getBytes()); context.write(hKey, kv); } if (!fields[1].equals("")) { kv = new KeyValue(hKey.get(), SRV_COL_FAM, HColumnEnum.SRV_COL_eventdesc.getColumnName(), fields[1].getBytes()); context.write(hKey, kv); } if (!fields[2].equals("")) { kv = new KeyValue(hKey.get(), SRV_COL_FAM, HColumnEnum.SRV_COL_eventdate.getColumnName(), fields[2].getBytes()); context.write(hKey, kv); } if (!fields[3].equals("")) { kv = new KeyValue(hKey.get(), SRV_COL_FAM, HColumnEnum.SRV_COL_objectname.getColumnName(), fields[3].getBytes()); context.write(hKey, kv); } if (!fields[4].equals("")) { kv = new KeyValue(hKey.get(), SRV_COL_FAM, HColumnEnum.SRV_COL_objectfolder.getColumnName(), fields[4].getBytes()); context.write(hKey, kv); } if (!fields[5].equals("")) { kv = new KeyValue(hKey.get(), SRV_COL_FAM, HColumnEnum.SRV_COL_ipaddress.getColumnName(), fields[5].getBytes()); context.write(hKey, kv); } context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1); /* * Output number of messages per quarter and before/after game. This should * correspond to the number of messages per region in HBase */ /* if (gameOffset < 0) { context.getCounter("QStats", "BEFORE_GAME").increment(1); } else if (gameOffset < 900) { context.getCounter("QStats", "Q1").increment(1); } else if (gameOffset < 1800) { context.getCounter("QStats", "Q2").increment(1); } else if (gameOffset < 2700) { context.getCounter("QStats", "Q3").increment(1); } else if (gameOffset < 3600) { context.getCounter("QStats", "Q4").increment(1); } else { context.getCounter("QStats", "AFTER_GAME").increment(1); }*/ } }
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>mapred.job.tracker</name> <value>namenode:54311</value> </property> <property> <name>mapred.reduce.parallel.copies</name> <value>20</value> </property> <property> <name>tasktracker.http.threads</name> <value>50</value> </property> <property> <name>mapred.job.shuffle.input.buffer.percent</name> <value>0.70</value> </property> <property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>4</value> </property> <property> <name>mapred.tasktracker.reduce.tasks.maximum</name> <value>4</value> </property> <property> <name>mapred.map.tasks</name> <value>4</value> </property> <property> <name>reduce.map.tasks</name> <value>4</value> </property> <property> <name>mapred.job.shuffle.merge.percent</name> <value>0.65</value> </property> <property> <name>mapred.task.timeout</name> <value>1200000</value> </property> <property> <name>mapred.child.java.opts</name> <value>-Xms1024M -Xmx2048M</value> </property> <property> <name>mapred.job.reuse.jvm.num.tasks</name> <value>-1</value> </property> <property> <name>mapred.compress.map.output</name> <value>true</value> </property> <property> <name>mapred.map.output.compression.codec</name> <value>com.hadoop.compression.lzo.LzoCodec</value> </property> <property> <name>io.sort.mb</name> <value>800</value> </property> <property> <name>mapred.child.ulimit</name> <value>unlimited</value> </property> <property> <name>io.sort.factor</name> <value>100</value> <description>More streams merged at once while sorting files.</description> </property> <property> <name>mapreduce.admin.map.child.java.opts</name> <value>-Djava.net.preferIPv4Stack=true</value> </property> <property> <name>mapreduce.admin.reduce.child.java.opts</name> <value>-Djava.net.preferIPv4Stack=true</value> </property> <property> <name>mapred.min.split.size</name> <value>0</value> </property> <property> <name>mapred.job.map.memory.mb</name> <value>-1</value> </property> <property> <name>mapred.jobtracker.maxtasks.per.job</name> <value>-1</value> </property> </configuration>
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hbase.rootdir</name> <value>hdfs://namenode:54310/hbase</value> <description>The directory shared by RegionServers. </description> </property> <property> <name>hbase.master</name> <value>slave:60000</value> <description>The host and port that the HBase master runs at. A value of 'local' runs the master and a regionserver in a single process. </description> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> <description>The mode the cluster will be in. Possible values are false: standalone and pseudo-distributed setups with managed Zookeeper true: fully-distributed with unmanaged Zookeeper Quorum (see hbase-env.sh) </description> </property> <property> <name>hbase.zookeeper.quorum</name> <value>slave</value> <description>Comma separated list of servers in the ZooKeeper Quorum. For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com". By default this is set to localhost for local and pseudo-distributed modes of operation. For a fully-distributed setup, this should be set to a full list of ZooKeeper quorum servers. If HBASE_MANAGES_ZK is set in hbase-env.sh this is the list of servers which we will start/stop ZooKeeper on. </description> </property> <property> <name>hbase.zookeeper.property.clientPort</name> <value>2181</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/home/hduser/work/zoo_data</value> <description>Property from ZooKeeper's config zoo.cfg. The directory where the snapshot is stored. </description> </property> </configuration>
请帮帮我,这样我才能提高我的表现。
首先,为什么我们需要Mapreduce程序为这么小的文件(1GB)加载数据到Hbase。
根据我的经验,我使用Jackson流处理了5GB的Json(我不想将所有的Json存储到内存中),并通过使用批处理技术,在8分钟内将Json保存在Hbase中。
我使用hbase放入100000记录的批处理列表对象。
可能需要在两个地方调用此方法
1)批处理100000条记录。
2)批量记录小于100000的处理提醒
public void addRecord(final ArrayList<Put> puts, final String tableName) throws Exception {
try {
final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName));
table.put(puts);
LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK.");
} catch (final Throwable e) {
e.printStackTrace();
} finally {
LOG.info("Processed ---> " + puts.size());
if (puts != null) {
puts.clear();
}
}
}
我读了很多关于瞬间时区的帖子,但我仍然没有找到一个明确的答案: 我有一个用户遍布世界各地的应用程序,无论浏览器的时区如何,我总是希望将日期和时间显示为EST或EDT,以当时有效的为准(“America/New_York”)。moment-timezone.js是否只需要加载America/New_York的数据,因为它是我要显示的唯一时区,还是需要用户所在的所有时区的数据,以便moment-tim
我正在尝试在Java服务器端应用程序中使用ehcache来缓存一些数据。如何在服务器启动时将一些初始数据加载到ehcache。应用程序是具有Spring和数据库连接的基于Web的应用程序。任何人都可以让我知道如何定期刷新这些缓存。 谢啦
我使用talend将数据从as400加载到snowflake,有一个varchar列以yyyymmdd格式存储日期,当我将数据加载到snowflake中时,数据正在被更改,因为as400中的EX19890501值在snowflake中被加载为19890500,我试图在snowflake中保留varchar、date和number数据类型,但仍然是同样的问题,任何人面临同样的问题请帮助我解决。
问题内容: 我正在尝试通过JQUERY AJAX调用来更新页面加载和选择菜单更改时的高图表。有以[[10,1228800000],[10,1228800000]]格式返回的数据。图表为空白,不对任何数据进行图形处理。 尝试了此处发布的几种解决方案,但没有一个有效。 有什么错误吗?提前致谢。编辑: 最新的代码仍然无法正常工作: 问题答案: 您必须使用文档中描述的系列对象的setData方法。你的情况
问题内容: 我正在尝试通过一种称为的方法设置项目,到目前为止还可以。但是后来我从中创建了一个arrayList,但我不知何故找不到将这些信息存储到JTable中的方法。问题是我找不到设置固定行数的方法 这是我的代码: 上课开始联赛: 足球俱乐部课程: SportsClub课程(摘要): 最后是LeagueManager,它是一个接口: 有人能帮帮我吗?我已经尝试了好几天。谢谢。 问题答案: “问题
我正在使用hazelcast IMap存储我的应用程序数据。 我面临着一个小问题。 问题说明:- 当我启动spring-boot应用程序时,我正在将数据库表数据加载到hazelcast中。 示例:- 但是当我获取相同的数据时,我得到的顺序不同。 那么有没有办法按照插入的顺序获取数据呢?