当前位置: 首页 > 知识库问答 >
问题:

Storm java.io.NotSerializableException:运行拓扑时

陆洲
2023-03-14

我终于觉得我有了一个在redis数据库上写的toopology。我有一个插销要打印,还有一个插销要插入Redis。但当我尝试启动拓扑时,会出现以下错误:

...5333 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
5376 [main] INFO  b.s.d.supervisor - Starting supervisor with id 1917ef54-0f16-47b8-86ea-b6722aa33c68 at host amnor-A88XPLUS
5405 [main] ERROR o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Thread Thread[main,5,main] died
java.lang.RuntimeException: java.io.NotSerializableException: Storm.practice.Storm.Prova.ProvaTopology
    at backtype.storm.utils.Utils.javaSerialize(Utils.java:91) ~[storm-core-0.10.0.jar:0.10.0]
    at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:107) ~[storm-core-0.10.0.jar:0.10.0]
    at Storm.practice.Storm.Prova.ProvaTopology.main(ProvaTopology.java:383) ~[classes/:?]
Caused by: java.io.NotSerializableException: Storm.practice.Storm.Prova.ProvaTopology
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_91]
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[?:1.7.0_91]
    at backtype.storm.utils.Utils.javaSerialize(Utils.java:87) ~[storm-core-0.10.0.jar:0.10.0]
    ... 2 more
package Storm.practice.Storm.Prova;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.testing.TestWordSpout;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.base.BaseRichSpout;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

import org.apache.storm.redis.bolt.AbstractRedisBolt;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisLookupMapper;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.redis.trident.state.RedisState;
import org.apache.storm.redis.trident.state.RedisStateQuerier;
import org.apache.storm.redis.trident.state.RedisStateUpdater;
import org.apache.storm.shade.com.google.common.collect.Lists;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
//import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;

/**
 * This is a basic example of a Storm topology.
 */
public class ProvaTopology {

  public static class ProvaBolt extends BaseRichBolt {
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      _collector = collector;
    }

    public void execute(Tuple tuple) {
      _collector.emit(tuple, new Values(tuple.getString(0) + "  :-)"));
      _collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("Morts"));
    }


  }
  public class ProvaSpout extends BaseRichSpout {
      SpoutOutputCollector _collector;
      //Random _rand;
      private String fileName;
      //private SpoutOutputCollector _collector;
      private BufferedReader reader;
      private AtomicLong linesRead;

      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        try {
            fileName= (String)"/home/prova.txt";
            reader = new BufferedReader(new FileReader(fileName));
            // read and ignore the header if one exists
          } catch (Exception e) {
            throw new RuntimeException(e);
          }
       // _rand = new Random();
      }

      public void nextTuple() {
        Utils.sleep(100);


      try {
            String line = reader.readLine();
            if (line != null) {
              long id = linesRead.incrementAndGet();
              System.out.println("Finished reading line, " + line);
              _collector.emit(new Values((String)line));
            } else {
              System.out.println("Finished reading file, " + linesRead.get() + " lines read");
              Thread.sleep(10000);
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
      }

      public void ack(Object id) {
      }

      public void fail(Object id) {
      }

      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("Morts"));
      }

    }

  public class RedisBolt implements IRichBolt {

        protected String channel = "Somriures";
        //    protected String configChannel;
        protected OutputCollector collector;
        //    protected Tuple currentTuple;
        //    protected Logger log;
        protected JedisPool pool;
        //    protected ConfigListenerThread configListenerThread;

        public RedisBolt(){}
        public RedisBolt(String channel) {

        //  log = Logger.getLogger(getClass().getName());
        //  setupNonSerializableAttributes();
        }

        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
        this.collector = collector;
        pool = new JedisPool("localhost");
        }



        public void execute(Tuple tuple) {
        String current = tuple.getString(0);
        if(current != null) {
            //      for(Object obj: result) {
            publish(current);
            collector.emit(tuple, new Values(current));
            //      }
            collector.ack(tuple);
        }
        }

        public void cleanup() {
        if(pool != null) {
            pool.destroy();
        }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(channel));
        }

        public void publish(String msg) {
        Jedis jedis = pool.getResource();
        jedis.publish(channel, msg);
        pool.returnResource(jedis);
        }

        protected void setupNonSerializableAttributes() {

        }

        public Map getComponentConfiguration() {
        return null;
        }
    }



  public class PrinterBolt extends BaseBasicBolt {

      public void execute(Tuple tuple, BasicOutputCollector collector) {
          System.out.println(tuple);
      }

      public void declareOutputFields(OutputFieldsDeclarer ofd) {
      }

  }


    public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
   ProvaTopology Pt = new ProvaTopology();
   JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
            .setHost("127.0.0.1").setPort(666).build();



    builder.setSpout("Morts", Pt.new ProvaSpout(), 10);//emisorTestWordSpout
    builder.setBolt("happy", new ProvaBolt(), 3).shuffleGrouping("Morts");// de on llig?
    builder.setBolt("meal", new ProvaBolt(), 2).shuffleGrouping("happy");// de on llig?
    builder.setBolt("bd", Pt.new RedisBolt(), 2).shuffleGrouping("meal");// de on llig?
    builder.setBolt("print", Pt.new PrinterBolt(), 2).shuffleGrouping("meal");
   // builder.setBolt("StoreM", (storeMapperS));
    Config conf = new Config();
    conf.setDebug(true);

    if (args != null && args.length > 0) {
      conf.setNumWorkers(5);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                                   //WithProgressBar
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }
}

共有1个答案

长孙昀
2023-03-14

这里的例外很明显。如果您只是查看了java.io.NotSerializableException的文档,就会看到正在打印的消息是不可序列化的类。要修复,只需让拓扑类实现serializable:

public class ProvaTopology implements Serializable {
    ...
}

这是需要的,以便Storm可以序列化您的拓扑并将其发送到Nimbus执行。由于您的Bolts和Spout扩展或实现了Storm提供的类或接口,您将不必担心将它们标记为可序列化的,因为这些父类和接口已经这样做了。

 类似资料:
  • ~/src/storm-0.8.1/bin/storm jar/root/src/storm-starter/target/storm-starter-0.0.1-snapshot-jar-with-dependencies.jar storm.starter.exclamationtopology演示 我试着运行它,它说问题是与nimbus连接,但我的storm客户端(和supervisor在同

  • 我正在使用相同版本的petrel 0.9.3和apache storm。当我尝试运行拓扑时,会出现以下错误:

  • > 灵光升起 StormUI启动 我使用的两个工人都起来了 Zookeper已启动 我和暴风一起跑 Storm罐myjar.jar MyClass Nimbus提交拓扑 该拓扑的日志文件不会出现在workers中。 我在supervisor.log上的worker中有以下日志: 所以我确信我与nimbus有连接问题,但是worker中的属性文件是: 错误在哪里,我如何修复它? 谢了!

  • 在生产集群上运行 Topology 类似于在 本地模式 下运行.以下是步骤: 1)定义 Topology (如果使用 Java 定义, 则使用 TopologyBuilder ) 2)使用 StormSubmitter 将 topology 提交到集群. StormSubmitter 以 topology 的名称, topology 的配置和 topology 本身作为输入.例如: Config

  • 我有一个应用程序,是基于Spring启动,SpringKafka和Kafka流。当应用程序启动时,它会创建带有默认主题列表的kafka流拓扑。我需要做的是在运行时编辑/重新创建拓扑。例如,当应用程序已经运行时,有新的主题名称出现,我想将此主题添加到我的拓扑结构中。目前,我正在考虑以某种方式删除现有的拓扑,关闭并清理KafkaStreams,在创建拓扑但使用新主题名称的地方运行逻辑,并再次启动Kaf

  • 众所周知,Storm拓扑可以有多个喷口/螺栓。当我们发布Storm拓扑时,我们必须定义喷口和螺栓之间的依赖关系。我想知道我可以在拓扑运行时注册新螺栓吗?