当前位置: 首页 > 知识库问答 >
问题:

如何正确读取点火缓存

柯英奕
2023-03-14

我有以下应用程序(我对这个框架很陌生),我想看到缓存大小(增加),因为它从队列中读取消息,但它一直保持为0。

    KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

    Ignition.setClientMode(true);

    Ignite ignite = Ignition.start();

    Properties settings = new Properties();
    // Set a few key parameters
    settings.put("bootstrap.servers", "localhost:9092");
    settings.put("group.id", "test");
    settings.put("zookeeper.connect", "localhost:2181");
    settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    // Create an instance of StreamsConfig from the Properties instance
    kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);

    IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");

    IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache");
    // allow overwriting cache data
    stmr.allowOverwrite(true);

    kafkaStreamer.setIgnite(ignite);
    kafkaStreamer.setStreamer(stmr);

    // set the topic
    kafkaStreamer.setTopic("test");

    // set the number of threads to process Kafka streams
    kafkaStreamer.setThreads(1);

    // set Kafka consumer configurations
    kafkaStreamer.setConsumerConfig(config);

    // set decoders
    StringDecoder keyDecoder = new StringDecoder(null);
    StringDecoder valueDecoder = new StringDecoder(null);

    kafkaStreamer.setKeyDecoder(keyDecoder);
    kafkaStreamer.setValueDecoder(valueDecoder);

    kafkaStreamer.start();

    while (true) {

        System.out.println(cache.metrics().getSize());
        Thread.sleep(200);
    }

有人能告诉我缺失了什么/错了什么吗?

谢谢!

共有2个答案

百里骏
2023-03-14

由于性能原因,默认情况下禁用指标。您可以使用配置文件中的< code > cache configuration . setstatistics enabled(true)或< code>statisticsEnabled属性来启用度量。

刘弘新
2023-03-14

可能您没有使用足够的条目来填充IgniteDataStreamer缓冲区。尝试设置刷新超时:

stmr.autoFlushFrequency(1000);
 类似资料:
  • 我使用以下方法将csv文件读入Spark: df=spark.read.format(file_type).options(header='true',quote='\"',ignoreleadingwhitespace='true',inferschema='true').load(file_location) 这是正常行为还是读错了? 更新:我将标记问题作为回答,因为下面的提示是有用的。然而,

  • 我发现其他人也有同样的问题,他们的问题通过在InputStreamReader构造函数中指定UTF-8来解决: 以UTF-8形式读取InputStream 这对我不起作用,我也不知道为什么。无论我尝试什么,我总是得到转义的unicode值(斜杠-U+十六进制),而不是实际的语言字符。我在这里做错了什么?提前道谢! 请注意:这不是字体问题。我之所以知道这一点,是因为如果我对同一个文件使用Resour

  • 我想以UTF-8快速地逐行读取大的csv文件(大约~1GB)。我已经为它创建了一个类,但它不能正常工作。UTF-8从2字节解码西里尔符号。我使用字节缓冲区来读取它,例如,它有10个字节的长度。因此,如果文件中的符号由10和11字节组成,它将无法正常解码:(

  • 似乎可以从Android(本机)访问;但我仍在努力让这一切顺利进行。 在我的React本机应用程序中,我有如下内容: 百货商店 用户减速器 从商店获取用户 现在的问题开始时,我试图让相同的用户从Android,我所有的挫折尝试这是我有: 由于用户认为它是一个对象,我不知道这是否是获取的正确方法,我尝试获取,但没有成功。 我开始认为我应该使用react原生sqlite存储,在那里我可以知道我的数据结

  • 我有以下代码来读取java文件,并打印出行。我通过两种方式实现了它: 使用流: 使用循环: 我被告知这是错误的,使用缓冲读取器是错误地使用了语言的特性。有没有更好的方法,我想知道使用语言功能的正确方法。

  • 问题内容: 我现在使用的代码: 似乎工作正常,但我不确定在将ByteBuffer返回池之前是否需要ByteBuffer。我什至不确定要使用。文档中没有太多关于它的内容。 问题答案: 读取请求正文的一种更简单的方法是将其分派到一个工作线程,该工作线程可以使用。 有两种方法:使用或文档中所示的调度模式。这是使用的示例: 在基本上没有派遣你。