import com.hp.hpl.jena.sparql.algebra.Algebra; //导入方法依赖的package包/类
@Override
/*
* (non-Javadoc)
* @see backtype.storm.task.IBolt#execute(backtype.storm.tuple.Tuple)
* We assume that each tuple here is a set of graphs, result of the windowing bolt.
*/
public void execute(Tuple input) {
ArrayList graphList = (ArrayList) input.getValue(0);
//Graph graph = (Graph) input.getValue(0);
QueryIterator queryIter = null;
for (Graph g : graphList) {
queryIter = Algebra.exec(this.opBGP, g);
while (queryIter.hasNext()) {
Binding binding = queryIter.nextBinding();
Values values = new Values();
for (String str : this.outputFields) {
Var aux = Var.alloc(str);
//Var obsValue = Var.alloc("obsValue");
//Var time = Var.alloc("time");
//Var location = Var.alloc("location");
Node auxNode = binding.get(aux);
//Node obsValueNode = binding.get(obsValue);
//Node timeNode = binding.get(time);
//Node locationNode = binding.get(location);
//System.out.println(obsId + ": " + FmtUtils.stringForNode(obsIdNode) + ", " +
//obsValue + ": " + FmtUtils.stringForNode(obsValueNode) + ", " + time + ": " + FmtUtils.stringForNode(timeNode)
//+ ", " + location + ": " + FmtUtils.stringForNode(locationNode));
values.add(FmtUtils.stringForNode(auxNode));
}
collector.emit(values);
//collector.emit(new Values(FmtUtils.stringForNode(obsIdNode), FmtUtils.stringForNode(obsValueNode),
//FmtUtils.stringForNode(timeNode), FmtUtils.stringForNode(locationNode)));
}
}
collector.ack(input);
}