当前位置: 首页 > 知识库问答 >
问题:

CreateDirectStream不接受正确的参数-Spark Streaming+Kafka

狄峰
2023-03-14

我有一个应用程序,它发送序列化的Twitter数据到一个Kafka主题。到目前为止一切都很好。

使用者应用程序应该读取数据并对其进行反序列化。现在,当我调用kafkautils.createDirectStream时,我认为我放入了正确的参数(正如您在抛出的错误中看到的那样),所以我不明白为什么它不能工作。

KafkaUtils类型中的方法createDirectStream(JavaStreamingContext,class-k-,class-v-,class-kd-,class-vd-,Map-String,string-,set-string-)不适用于参数(JavaStreamingContext,class-string-,class-status-,class-string-,class-string,string-,set-string-)

检查Spark Javadoc时,我的params对我来说仍然是正确的。

我的代码是:

Set<String> topics = new HashSet<>();
topics.add("twitter-test");
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Duration(duration));
Map<String, String> props = new HashMap<>();
//some properties...
JavaPairInputDStream messages =  KafkaUtils.createDirectStream(jssc, String.class, Status.class, org.apache.kafka.common.serialization.StringDeserializer.class, stream_data.StatusDeserializer.class, props, topics);

状态序列化程序代码:

public class StatusSerializer implements Serializer<Status> {

  @Override public byte[] serialize(String s, Status o) {

           try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                ObjectOutputStream oos = new ObjectOutputStream(baos);
                oos.writeObject(o);
                oos.close();
                byte[] b = baos.toByteArray();
                return b;
            } catch (IOException e) {
                return new byte[0];
            }
        }

      @Override public void close() {

      }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {


    }

}

共有1个答案

苍温文
2023-03-14

看起来问题出在“stream_data.statusDeserializer.class”上。你能请这个自定义反序列化程序类的代码。另外,请您看看这个用Scala for Kafka API0.10编写的Kafka Spark消费者:自定义AVRO反序列化器。

在KafkaParam参数中包括以下内容。

key.deserializer -> classOf[StringDeserializer]
value.deserializer -> classOf[StatusDeserializer]
 类似资料:
  • 问题内容: 这段代码: 产生此错误: OPTable是一个字母数字字符串,它是从另一个数据库查询中构建的,该数据库查询包含我要从中选择的表名。 以下代码在同一脚本中也可以正常工作。 我猜想以这种方式构建sql语句并没有什么大不了的,但是我只是不明白为什么它不接受??参数。我什至在使用?的同一脚本中还有另一个查询。参数化并可以正常工作。但是,使用raw_input函数生成用于工作查询的参数。这两个字

  • 我和这里的用户遇到了同样的问题:log4j类型genericopobjectpool不接受参数,但我一直无法找到这个问题的答案。Log4j2在附加到rollingfile时起作用,但此处列出的方法:https://logging.apache.org/log4j/2.x/manual/appenders.html显示错误“GenericObjectPool不接受参数”

  • 问题内容: 我试图通过Python模块进行控制。 我想为当前的运行时以及永久配置添加一个IP地址到受信任的区域。 以下是dbus界面的文档:http : //manpages.ubuntu.com/manpages/wily/man5/firewalld.dbus.5.html 什么有效:运行时配置 我可以将其添加到运行时配置中: 很简单 什么不起作用:永久配置 事实证明,将其添加到永久配置中比较

  • 我希望为我的2D游戏创造平滑的道路。看着这正是我需要的东西。每一篇文章,甚至在上,都给它一个类型,并用构造函数传递所有控制点和一个。这似乎是过时的,不再接受任何类型参数,没有它,它只能与路径工作。构造函数也不接受控制点列表。 这会产生以下错误:

  • 问题内容: 我在装有Linux和Windows的两台PC上运行带有PyDev v2.6插件的Eclipse SDK v3.6。 我想将元组作为参数传递,例如(仅作为示例): 这可以在Linux上运行并给出结果: (2,1) 在Windows上,它会引发错误: 该如何解决呢? 问题答案: 您可能在Windows上运行Python 3.x,在Linux上运行Python2.x。解包元组参数的功能已在P

  • 我为“MainWindow.fxml”文件构建了一个相当简单的控制器来处理按钮。我遵循了本教程。我已经在fxml文档中正确设置了fx:id。然而,在编译时,我得到以下错误警告 java: typejava.beans.EventHandler不接受参数 对于生产线 我的完整控制器类如下所示... 感谢任何帮助,提前感谢。