使用事物TridentTopology 持久化数据到MySQL1、构建拓扑JDBCTopology类packagestorm.trident.mysql;importjava.util.Arrays;importjava.util.Map;importorg.apache.storm.Config;importorg.apache.storm.LocalCluster;importorg.apache.storm.trident.TridentState;importorg.apache.storm.trident.TridentTopology;importorg.apache.storm.trident.operation.BaseFunction;importorg.apache.storm.trident.operation.CombinerAggregator;importorg.apache.storm.trident.operation.TridentCollector;importorg.apache.storm.trident.spout.IBatchSpout;importorg.apache.storm.trident.state.StateType;importorg.apache.storm.trident.testing.FixedBatchSpout;importorg.apache.storm.trident.testing.MemoryMapState;importorg.apache.storm.trident.tuple.TridentTuple;importorg.apache.storm.tuple.Fields;importorg.apache.storm.tuple.Values;/*** 事物Trident-MySQL Topology
*@authormengyao
**/@SuppressWarnings("all")public classJDBCTopology {public static voidmain(String[] args) {
TridentTopology topology= newTridentTopology();//Spout数据源
FixedBatchSpout spout = new FixedBatchSpout(new Fields("tels"), 7,new Values("189111 3"),new Values("135111 7"),new Values("189111 2"),new Values("158111 5"),new Values("159111 6"),new Values("159111 3"),new Values("158111 5")
);
spout.setCycle(false);//State持久化配置属性
JDBCStateConfig config = newJDBCStateConfig();
config.setDriver("com.mysql.jdbc.Driver");
config.setUrl("jdbc:mysql://localhost:3306/test");
config.setUsername("root");
config.setPassword("123456");
config.setBatchSize(10);
config.setCacheSize(10);
config.setType(StateType.TRANSACTIONAL);
config.setCols("tel");
config.setColVals("sum");
config.setTable("tbl_tel");
topology.newStream("spout", spout)
.each(new Fields("tels"), new KeyValueFun(), new Fields("tel", "money"))
.groupBy(new Fields("tel"))
.persistentAggregate(JDBCState.getFactory(config),new Fields("money"), new SumCombinerAgg(), new Fields("sum"));
LocalCluster cluster= newLocalCluster();
cluster.submitTopology("test1", newConfig(), topology.build());
}
}
@SuppressWarnings("all")class KeyValueFun extendsBaseFunction {
@Overridepublic voidexecute(TridentTuple tuple, TridentCollector collector) {
String record= tuple.getString(0);
collector.emit(new Values(record.split("\t")[0], record.split("\t")[1]));
}
}
@SuppressWarnings("all")class SumCombinerAgg implements CombinerAggregator{
@OverridepublicLong init(TridentTuple tuple) {return Long.parseLong(tuple.getString(0));
}
@OverridepublicLong combine(Long val1, Long val2) {
Long val= val1+val2;
System.out.println(val);returnval;
}
@OverridepublicLong zero() {return 0L;
}
}2、构建基于IBackingMap的JDBCState类packagestorm.trident.mysql;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importorg.apache.storm.task.IMetricsContext;importorg.apache.storm.trident.state.OpaqueValue;importorg.apache.storm.trident.state.State;importorg.apache.storm.trident.state.StateFactory;importorg.apache.storm.trident.state.StateType;importorg.apache.storm.trident.state.TransactionalValue;importorg.apache.storm.trident.state.map.CachedMap;importorg.apache.storm.trident.state.map.IBackingMap;importorg.apache.storm.trident.state.map.NonTransactionalMap;importorg.apache.storm.trident.state.map.OpaqueMap;importorg.apache.storm.trident.state.map.TransactionalMap;
@SuppressWarnings("all")public class JDBCState implements IBackingMap{private staticJDBCStateConfig config;
JDBCState(JDBCStateConfig config){this.config =config;
}
@Overridepublic List multiGet(List>keys) {
StringBuilder sqlBuilder= new StringBuilder("SELECT ").append(config.getCols())
.append(","+config.getColVals())
.append(",txid")
.append(" FROM "+config.getTable())
.append(" WHERE ")
.append(config.getCols())
.append("='");
JDBCUtil jdbcUtil= newJDBCUtil(config.getDriver(),config.getUrl(),config.getUsername(),config.getPassword());
List result = new ArrayList();
Map map = null;for (Listlist : keys) {
Object key= list.get(0);
map= jdbcUtil.queryForMap(sqlBuilder.toString()+key+"'");
System.out.println(sqlBuilder.toString()+key+"'"+" 【"+map);
Bean itemBean=(Bean)map.get(key);long txid=0L;long val=0L;if (itemBean!=null) {
val=itemBean.getSum();
txid=itemBean.getTxid();
}if (config.getType()==StateType.OPAQUE) {
result.add(newOpaqueValue(txid, val));
}else if (config.getType()==StateType.TRANSACTIONAL) {
result.add(newTransactionalValue(txid, val));
}else{
result.add(val);
}
}return (List) result;
}
@Overridepublic void multiPut(List> keys, Listvals) {//构建新增SQL
StringBuilder sqlBuilder = new StringBuilder("INSERT INTO ").append(config.getTable())
.append("("+config.getCols())
.append(","+config.getColVals())
.append(",txid")
.append(",time")
.append(") VALUES ");for (int i = 0; i < keys.size(); i++) {
List key =keys.get(i);if (config.getType()==StateType.TRANSACTIONAL) {
TransactionalValue val=(TransactionalValue)vals.get(i);
sqlBuilder.append("(");
sqlBuilder.append(key.get(0));
sqlBuilder.append(",");
sqlBuilder.append(val.getVal());
sqlBuilder.append(",");
sqlBuilder.append(val.getTxid());
sqlBuilder.append(",NOW()");
sqlBuilder.append("),");
}
}
sqlBuilder.setLength(sqlBuilder.length()-1);
System.out.println(sqlBuilder.toString());//新增数据
JDBCUtil jdbcUtil = newJDBCUtil(config.getDriver(),config.getUrl(),config.getUsername(),config.getPassword());
jdbcUtil.insert(sqlBuilder.toString());
}public staticFactory getFactory(JDBCStateConfig config) {return newFactory(config);
}static class Factory implementsStateFactory {private staticJDBCStateConfig config;publicFactory(JDBCStateConfig config) {this.config =config;
}
@Overridepublic State makeState(Map conf, IMetricsContext metrics, int partitionIndex, intnumPartitions) {final CachedMap map = new CachedMap(newJDBCState(config), config.getCacheSize());
System.out.println(config);if(config.getType()==StateType.OPAQUE) {returnOpaqueMap.build(map);
}else if(config.getType()==StateType.TRANSACTIONAL){returnTransactionalMap.build(map);
}else{returnNonTransactionalMap.build(map);
}
}
}
}3、构建基于IBackingMap的JDBCStateConfig配置类packagestorm.trident.mysql;importjava.util.List;importorg.apache.storm.trident.state.StateType;
@SuppressWarnings("all")public classJDBCStateConfig {privateString url;privateString driver;privateString username;privateString password;privateString table;private intbatchSize;privateString cols;privateString colVals;private int cacheSize = 100;private StateType type =StateType.OPAQUE;publicString getUrl() {returnurl;
}public voidsetUrl(String url) {this.url =url;
}publicString getDriver() {returndriver;
}public voidsetDriver(String driver) {this.driver =driver;
}publicString getUsername() {returnusername;
}public voidsetUsername(String username) {this.username =username;
}publicString getPassword() {returnpassword;
}public voidsetPassword(String password) {this.password =password;
}publicString getTable() {returntable;
}public voidsetTable(String table) {this.table =table;
}public intgetBatchSize() {returnbatchSize;
}public void setBatchSize(intbatchSize) {this.batchSize =batchSize;
}publicString getCols() {returncols;
}public voidsetCols(String cols) {this.cols =cols;
}publicString getColVals() {returncolVals;
}public voidsetColVals(String colVals) {this.colVals =colVals;
}public intgetCacheSize() {returncacheSize;
}public void setCacheSize(intcacheSize) {this.cacheSize =cacheSize;
}publicStateType getType() {returntype;
}public voidsetType(StateType type) {this.type =type;
}
@OverridepublicString toString() {return "Test2StateConfig [url=" + url + ", driver=" + driver + ", username=" + username + ", password="
+ password + ", table=" + table + ", batchSize=" + batchSize + ", cols=" +cols+ ", colVals=" + colVals + ", cacheSize=" + cacheSize + ", type=" + type + "]";
}
}4、构建JDBC工具类和实体Beanpackagestorm.trident.mysql;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.PreparedStatement;importjava.sql.ResultSet;importjava.sql.SQLException;importjava.util.HashMap;importjava.util.Map;public classJDBCUtil {privateString driver;privateString url;privateString username;privateString password;privateConnection connection;privatePreparedStatement ps;privateResultSet rs;publicJDBCUtil(String driver, String url, String username, String password) {this.driver =driver;this.url =url;this.username =username;this.password =password;
init();
}voidinit(){try{
Class.forName(driver);
}catch(ClassNotFoundException e) {
e.printStackTrace();
}
}public booleaninsert(String sql) {int state = 0;try{
connection=DriverManager.getConnection(url, username, password);
ps=connection.prepareStatement(sql);
state=ps.executeUpdate();
}catch(SQLException e) {
e.printStackTrace();
}finally{try{
ps.close();
connection.close();
}catch(SQLException e) {
e.printStackTrace();
}
}if (state>0) {return true;
}return false;
}public MapqueryForMap(String sql) {
Map result = new HashMap();try{
connection=DriverManager.getConnection(url, username, password);
ps=connection.prepareStatement(sql);
rs=ps.executeQuery();if(rs.next()){
Bean iteBean=new Bean(rs.getString("tel"), rs.getLong("sum"), rs.getLong("txid"), null);
result.put(rs.getString("tel"), iteBean);
}
}catch(SQLException e) {
e.printStackTrace();
}finally{try{
ps.close();
connection.close();
}catch(SQLException e) {
e.printStackTrace();
}
}returnresult;
}publicString getDriver() {returndriver;
}public voidsetDriver(String driver) {this.driver =driver;
}publicString getUrl() {returnurl;
}public voidsetUrl(String url) {this.url =url;
}publicString getUsername() {returnusername;
}public voidsetUsername(String username) {this.username =username;
}publicString getPassword() {returnpassword;
}public voidsetPassword(String password) {this.password =password;
}
}packagestorm.trident.mysql;public classBean {privateString tel;private longsum;private longtxid;privateString time;publicBean(){
}public Bean(String tel, long sum, longtxid, String time) {super();this.tel =tel;this.sum =sum;this.txid =txid;this.time =time;
}publicString getTel() {returntel;
}public voidsetTel(String tel) {this.tel =tel;
}public longgetSum() {returnsum;
}public void setSum(longsum) {this.sum =sum;
}public longgetTxid() {returntxid;
}public void setTxid(longtxid) {this.txid =txid;
}publicString getTime() {returntime;
}public voidsetTime(String time) {this.time =time;
}
@OverridepublicString toString() {return "Bean [tel=" + tel + ", sum=" + sum + ", txid=" + txid + ", time=" + time + "]";
}
}