我试图使用下面的API与Kafka火花流。我必须用spark流avro序列化数据,数据位于Kafka中。
static <K,V,KD extends kafka.serializer.Decoder<K>,VD extends kafka.serializer.Decoder<V>,R>
JavaInputDStream<R> createDirectStream(JavaStreamingContext jssc, Class<K> keyClass, Class<V> valueClass, Class<KD> keyDecoderClass, Class<VD> valueDecoderClass, Class<R> recordClass, java.util.Map<String,String> kafkaParams, java.util.Map<kafka.common.TopicAndPartition,Long> fromOffsets, Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler)
:: Experimental :: Create an input stream that directly pulls messages from Kafka Brokers without using any receiver.
JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, topicMap,
(Function<MessageAndMetadata<String, String>, String>) MessageAndMetadata::message);
错误:未解决的编译问题:KafkaUtils类型中的方法createDirectStream(JavaStreamingContext,Class,Class,Class,Class,Class,Class,Class,Class,Class,Map,Map,Function,r>)不适用于参数(JavaStreamingContext,Class,Class,Class,Class,Class,Class,Class,Class,Class,Class,Class,Class,Class,Class,Class,Class,Class,Class,Class,
试试这个。
JavaInputDStream<byte[]> directKafkaStream = KafkaUtils.createDirectStream(jssc, String.class, byte[].class,
StringDecoder.class, DefaultDecoder.class, byte[].class, kafkaParams, fromOffset,
(Function<MessageAndMetadata<String, byte[]>, byte[]>) MessageAndMetadata::message);
这里有一篇为Kafka,阿夫罗和Spark写的文章。
问题内容: 该符号在PHP中是什么意思? 问题答案: PHP提供了一些控制结构的替代语法。即,是否,for,foreach和switch。在每种情况下,替代语法的基本形式都是将左大括号更改为冒号(:),将右大括号更改为endif;,endwhile;,endfor;,endforeach;或endswitch;。
本文向大家介绍JSP中的scriptlet是什么,其语法是什么?,包括了JSP中的scriptlet是什么,其语法是什么?的使用技巧和注意事项,需要的朋友参考一下 脚本可以包含任意数量的JAVA语言语句,变量或方法声明或在页面脚本语言中有效的表达式。 以下是Scriptlet的语法- 您可以编写与上述语法等效的XML,如下所示- 您编写的任何文本,HTML标记或JSP元素都必须在scriptlet
问题内容: 我正在研究Swing程序中文本组件的结构。 据我了解,本质上分为视图和模型。该模型是实现的类的实例,该类包含所有文本并提供操作文本的方法,而View则以可视方式呈现文本。 但是我不知道使用an的确切位置,方式和原因。我不确定是否封装(“拥有”)模型(),或者文档是否封装了模型。而且不确定所有这些视图在哪里适合。 有两个问题: 1- 请描述视图之间的关系和在。什么封装了什么,什么与什么相
我几天前听说了Docker的事,想过去看看。 但事实上,我不知道这个“容器”的用途是什么? 什么是容器? 它能取代一个专门用于开发的虚拟机吗? 简单地说,在公司中使用Docker的目的是什么?主要的优势?
问题内容: LogCat在Eclipse中做什么? 如何使用?我以前从未在Eclipse中使用过log cat,所以我不明白。 问题答案: 这是一种从设备和开发计算机上的应用程序交换信息的好方法。 要使用Logcat,请首先导入您的项目。现在,您可以从项目中调用静态类Log来开始记录。如下所示,Logcat具有不同级别的日志记录。调试时,我们仅使用Debug(D)记录进度。当然,当您想记录实际错误
问题内容: 除了让编译器检查超类是否具有该方法之外,是否有任何其他理由来注释方法? 问题答案: 如您所描述的,@ Override创建一个编译时检查,以确保方法被覆盖。这对于确保尝试覆盖时不会出现愚蠢的签名问题非常有用。 例如,我看到以下错误: 此类按编写方式进行编译,但是将@Override标记添加到equals方法将导致编译错误,因为它不会覆盖Object上的equals方法。这是一个简单的错