嗨,我是新来的斯托姆和Kafka。我使用的是storm 1.0.1和kafka 0.10.0,我们有一个kafkaspout可以接收来自kafka主题的java bean。我花了几个小时来寻找正确的方法。发现很少文章是有用的,但没有一个方法为我工作到目前为止。
public class StormTopology {
public static void main(String[] args) throws Exception {
//Topo test /zkroot test
if (args.length == 4) {
System.out.println("started");
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig kafkaConf1 = new SpoutConfig(hosts, args[1], args[2],
args[3]);
kafkaConf1.zkRoot = args[2];
kafkaConf1.useStartOffsetTimeIfOffsetOutOfRange = true;
kafkaConf1.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
kafkaConf1.scheme = new SchemeAsMultiScheme(new KryoScheme());
KafkaSpout kafkaSpout1 = new KafkaSpout(kafkaConf1);
System.out.println("started");
ShuffleBolt shuffleBolt = new ShuffleBolt(args[1]);
AnalysisBolt analysisBolt = new AnalysisBolt(args[1]);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("kafkaspout", kafkaSpout1, 1);
//builder.setBolt("counterbolt2", countbolt2, 3).shuffleGrouping("kafkaspout");
//This is for field grouping in bolt we need two bolt for field grouping or it wont work
topologyBuilder.setBolt("shuffleBolt", shuffleBolt, 3).shuffleGrouping("kafkaspout");
topologyBuilder.setBolt("analysisBolt", analysisBolt, 5).fieldsGrouping("shuffleBolt", new Fields("trip"));
Config config = new Config();
config.registerSerialization(VehicleTrip.class, VehicleTripKyroSerializer.class);
config.setDebug(true);
config.setNumWorkers(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(args[0], config, topologyBuilder.createTopology());
// StormSubmitter.submitTopology(args[0], config,
// builder.createTopology());
} else {
System.out
.println("Insufficent Arguements - topologyName kafkaTopic ZKRoot ID");
}
}
KafKaProducer:
public class StreamKafkaProducer {
private static Producer producer;
private final Properties props = new Properties();
private static final StreamKafkaProducer KAFKA_PRODUCER = new StreamKafkaProducer();
private StreamKafkaProducer(){
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.abc.serializer.MySerializer");
producer = new org.apache.kafka.clients.producer.KafkaProducer(props);
}
public static StreamKafkaProducer getStreamKafkaProducer(){
return KAFKA_PRODUCER;
}
public void produce(String topic, VehicleTrip vehicleTrip){
ProducerRecord<String,VehicleTrip> producerRecord = new ProducerRecord<>(topic,vehicleTrip);
producer.send(producerRecord);
//producer.close();
}
public static void closeProducer(){
producer.close();
}
}
Kyro串行器:
public class DataKyroSerializer extends Serializer<Data> implements Serializable {
@Override
public void write(Kryo kryo, Output output, VehicleTrip vehicleTrip) {
output.writeLong(data.getStartedOn().getTime());
output.writeLong(data.getEndedOn().getTime());
}
@Override
public Data read(Kryo kryo, Input input, Class<VehicleTrip> aClass) {
Data data = new Data();
data.setStartedOn(new Date(input.readLong()));
data.setEndedOn(new Date(input.readLong()));
return data;
}
public class KryoScheme implements Scheme {
private ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
protected Kryo initialValue() {
Kryo kryo = new Kryo();
kryo.addDefaultSerializer(Data.class, new DataKyroSerializer());
return kryo;
};
};
@Override
public List<Object> deserialize(ByteBuffer ser) {
return Utils.tuple(kryos.get().readObject(new ByteBufferInput(ser.array()), Data.class));
}
@Override
public Fields getOutputFields( ) {
return new Fields( "data" );
}
}
public class AnalysisBolt implements IBasicBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
private String topicname = null;
public AnalysisBolt(String topicname) {
this.topicname = topicname;
}
public void prepare(Map stormConf, TopologyContext topologyContext) {
System.out.println("prepare");
}
public void execute(Tuple input, BasicOutputCollector collector) {
System.out.println("execute");
Fields fields = input.getFields();
try {
JSONObject eventJson = (JSONObject) JSONSerializer.toJSON((String) input
.getValueByField(fields.get(1)));
String StartTime = (String) eventJson.get("startedOn");
String EndTime = (String) eventJson.get("endedOn");
String Oid = (String) eventJson.get("_id");
int V_id = (Integer) eventJson.get("vehicleId");
//call method getEventForVehicleWithinTime(Long vehicleId, Date startTime, Date endTime)
System.out.println("==========="+Oid+"| "+V_id+"| "+StartTime+"| "+EndTime);
} catch (Exception e) {
e.printStackTrace();
}
}
java.lang.IllegalStateException: Spout 'kafkaspout' contains a
non-serializable field of type com.abc.topology.KryoScheme$1, which
was instantiated prior to topology creation.
com.minda.iconnect.topology.KryoScheme$1 should be instantiated within
the prepare method of 'kafkaspout at the earliest.
您的ThreadLocal不可序列化。最好的解决方案是使序列化程序既可序列化又可线程安全。如果这是不可能的,那么我看到两个备选方案,因为没有准备方法,因为你会得到一个螺栓。
我试图将一个角度函数转换为可观察模式,因为它的当前实现与它有一些异步性。为了讨论这个问题,我们举一个简单的例子。 可以通过以下方式将其转换为使用可观察对象: 我所面临的问题(据我所知)是针对无法访问内部选择语句的情况。 如果使用常规主题,订阅函数肯定不会得到任何值,因为事件的顺序是: 函数被调用 主题已创建 值已设置 调用函数订阅,因此仅在此事件发生后获取值 如果使用了BehaviorSubjec
我在gke(gcp)上部署了一个nodejs docker图像应用程序。这个应用程序只是一个等待通知的消息订阅者。我创建了一个主题(pubsub)。iam试图添加订阅,以便通知我的应用程序。但我没能做到。我试图在google credentials(API&Services/credentials)中添加ip(域),但这个过程真的很可疑。为什么要验证部署在gcp中的应用程序??有什么办法吗?谢谢
我会尽量详细解释“请帮忙。我打开了一个新项目。在新项目中,“主题”部分打开了两次“一亮一暗”。我在应用程序中添加了黑色文字。文字看起来是白色的,因为我的手机是黑色主题。这很好,但令人费解。我只有一种颜色。xml文件(不适用于夜间版本)文本是如何变为白色的?这很好,但我添加的图标(矢量“xml”)并将其设置为灰色,但现在它无法理解。图标的颜色在黑暗中从灰色变为黑色。我想让他换成白人。我还打开了颜色。
正在启动lib\main。IA Emulator上AOSP上的dart处于调试模式。。。正在运行Gradle任务“assembleDebug”。。。警告:插件路径_provider_android需要android SDK版本31。警告:插件共享的_首选项_android需要android SDK版本31。一个或多个插件需要更高的Android SDK版本。通过将以下内容添加到D:\App Dev
我试图查找一个JMS TopicConnectionFactory使用JNDI在WebSphere应用服务器。我已经在服务器中正确地完成了jndi设置。我已经在服务器中部署了我的应用程序。 我为发送方/接收方/监听器设计了3个java类。在服务器中部署WAR之后,我首先尝试将接收器代码作为独立的java应用程序运行。然而,它失败了,错误如下: 线程“main”javax中出现异常。命名。NoIni
JVM堆分为两个空间,旧一代的空间和年轻一代的空间。在主要GC之后,经过压缩/扫描过程,旧一代会有空闲空间,我想知道我们在主要GC期间获得的空闲空间是否仍然属于旧一代空间,或者旧一代的空闲空间可以移动到年轻一代的空间? 换言之,我想问的是,老一代的空间和年轻一代的空间是否有固定的大小/边界。 提前谢谢你,林
这是我的身份验证服务和部署。 那是我的入口文件。 当我运行Commnand:时,我收到的消息是: StartHost失败,但将重试:设置:获取ssh主机端口:获取“minikube”的端口22:docker container inspect-f“'{(index(index.NetworkSettings.Ports“22/tcp”)0).HostPort}}'”minikube:退出状态1 <
更新 这是一个专门针对几个if项目的问题,因此,如果需要,请询问更多细节,我将尽力提供。 问题:我使用的WordPress主题称为白石(http://themeforest.net/item/white-rock-restaurant-winery-theme/3317744)。此主题与WooCommerce没有内置兼容性。然而,我已经设法让99%的事情与一些代码更改和其他插件一起工作。当您浏览到