我有以下应用程序(我对这个框架很陌生),我想看到缓存大小(增加),因为它从队列中读取消息,但它一直保持为0。
KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();
Ignition.setClientMode(true);
Ignite ignite = Ignition.start();
Properties settings = new Properties();
// Set a few key parameters
settings.put("bootstrap.servers", "localhost:9092");
settings.put("group.id", "test");
settings.put("zookeeper.connect", "localhost:2181");
settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Create an instance of StreamsConfig from the Properties instance
kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);
IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");
IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache");
// allow overwriting cache data
stmr.allowOverwrite(true);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
// set the topic
kafkaStreamer.setTopic("test");
// set the number of threads to process Kafka streams
kafkaStreamer.setThreads(1);
// set Kafka consumer configurations
kafkaStreamer.setConsumerConfig(config);
// set decoders
StringDecoder keyDecoder = new StringDecoder(null);
StringDecoder valueDecoder = new StringDecoder(null);
kafkaStreamer.setKeyDecoder(keyDecoder);
kafkaStreamer.setValueDecoder(valueDecoder);
kafkaStreamer.start();
while (true) {
System.out.println(cache.metrics().getSize());
Thread.sleep(200);
}
有人能告诉我缺失了什么/错了什么吗?
谢谢!
由于性能原因,默认情况下禁用指标。您可以使用配置文件中的< code > cache configuration . setstatistics enabled(true)或< code>statisticsEnabled属性来启用度量。
可能您没有使用足够的条目来填充IgniteDataStreamer
缓冲区。尝试设置刷新超时:
stmr.autoFlushFrequency(1000);
我使用以下方法将csv文件读入Spark: df=spark.read.format(file_type).options(header='true',quote='\"',ignoreleadingwhitespace='true',inferschema='true').load(file_location) 这是正常行为还是读错了? 更新:我将标记问题作为回答,因为下面的提示是有用的。然而,
我发现其他人也有同样的问题,他们的问题通过在InputStreamReader构造函数中指定UTF-8来解决: 以UTF-8形式读取InputStream 这对我不起作用,我也不知道为什么。无论我尝试什么,我总是得到转义的unicode值(斜杠-U+十六进制),而不是实际的语言字符。我在这里做错了什么?提前道谢! 请注意:这不是字体问题。我之所以知道这一点,是因为如果我对同一个文件使用Resour
我想以UTF-8快速地逐行读取大的csv文件(大约~1GB)。我已经为它创建了一个类,但它不能正常工作。UTF-8从2字节解码西里尔符号。我使用字节缓冲区来读取它,例如,它有10个字节的长度。因此,如果文件中的符号由10和11字节组成,它将无法正常解码:(
似乎可以从Android(本机)访问;但我仍在努力让这一切顺利进行。 在我的React本机应用程序中,我有如下内容: 百货商店 用户减速器 从商店获取用户 现在的问题开始时,我试图让相同的用户从Android,我所有的挫折尝试这是我有: 由于用户认为它是一个对象,我不知道这是否是获取的正确方法,我尝试获取,但没有成功。 我开始认为我应该使用react原生sqlite存储,在那里我可以知道我的数据结
我有以下代码来读取java文件,并打印出行。我通过两种方式实现了它: 使用流: 使用循环: 我被告知这是错误的,使用缓冲读取器是错误地使用了语言的特性。有没有更好的方法,我想知道使用语言功能的正确方法。
问题内容: 我现在使用的代码: 似乎工作正常,但我不确定在将ByteBuffer返回池之前是否需要ByteBuffer。我什至不确定要使用。文档中没有太多关于它的内容。 问题答案: 读取请求正文的一种更简单的方法是将其分派到一个工作线程,该工作线程可以使用。 有两种方法:使用或文档中所示的调度模式。这是使用的示例: 在基本上没有派遣你。