当前位置: 首页 > 知识库问答 >
问题:

反序列化 Json 文件并使用水槽沉入 HDFS

乐正远
2023-03-14

我有一个假脱机目录,所有json文件都在其中,每秒钟都会有传入的文件被添加到这个目录中,我必须反序列化传入的json文件,获取requires字段并将其附加到HDFS目录中。

我所做的是我创建了一个 html" target="_blank">flume conf 文件,其中将假脱机目录中的文件作为源,并使用 1 个接收器将 json 文件直接放入 HDFS 中。

我必须在Sink之前将这个json转换成结构化格式,并将其放入HDFS。最重要的是,这不是一个twitter数据。我必须实现纯水槽。

我使用下面的水槽配置来完成工作:

agent_slave_1.channels.fileChannel1_1.type = file 
agent_slave_1.channels.fileChannel1_1.capacity = 200000
agent_slave_1.channels.fileChannel1_1.transactionCapacity = 1000
agent_slave_1.sources.source1_1.type = spooldir

agent_slave_1.sources.source1_1.spoolDir = /home/cloudera/runs/
agent_slave_1.sources.source1_1.fileHeader = false
agent_slave_1.sources.source1_1.fileSuffix = .COMPLETED
agent_slave_1.sinks.hdfs-sink1_1.type = hdfs
agent_slave_1.sinks.hdfs-sink1_1.hdfs.path =hdfs://localhost.localdomain:8020/user/cloudera/runs_scored/
agent_slave_1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollSize = 268435456
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollInterval = 0
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollCount = 50000000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text

agent_slave_1.sinks.hdfs-sink1_1.hdfsfileType = DataStream
agent_slave_1.sources.source1_1.channels = fileChannel1_1
agent_slave_1.sinks.hdfs-sink1_1.channel = fileChannel1_1

agent_slave_1.sinks =  hdfs-sink1_1
agent_slave_1.sources = source1_1
agent_slave_1.channels = fileChannel1_1

但我不知道如何使用反序列化器。

有人能帮我想一下如何反序列化传入的Json文件吗?如果我需要用java写任何代码请帮助我,我需要使用什么接口?如果可能的话,给点提示。

共有1个答案

施越彬
2023-03-14

最好的猜测是编写一个自定义拦截器,将您的JSON转换为所需的HDFS格式。它还具有填充标头的好处,这些标头可以在您的 hdfs 路径中使用。

下面是配置拦截器的方法:

agent_slave_1.sources.source1_1.interceptors = my_intercptor
agent_slave_1.sources.source1_1.interceptors.my_intercptor.type = com.mycompany.MyInteceptor

该类将如下所示:

public class MyInteceptor implements Interceptor, Interceptor.Builder {

    private MyInteceptor interceptor;

    @Override
    public void initialize() {


    }

    @Override
    public Event intercept(Event event) {
        String bjson = event.getBody()));
        // decode your json, e.g. Jackson
        MyDecodedJsonObject record; // pseudo class
        event.getHeaders().put("timestamp", record.getTimestamp().toString());
        String newBody = record.getA() + "\t" + record.getB();
        event.setBody(newBody.getBytes())
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        for (Iterator<Event> iterator = events.iterator(); iterator.hasNext();) {
            Event next = intercept(iterator.next());
            if (next == null) {
                iterator.remove();
            }
        }
        return events;
    }

    @Override
    public void close() {


    }

    @Override
    public Interceptor build() {
        return interceptor;
    }

    @Override
    public void configure(Context context) {

        interceptor = new MyInteceptor();
    }

}

别忘了把这个类打包在一个jar中,并把它放到水槽的lib目录中。

 类似资料:
  • 问题内容: 我将反序列化的json文件的结构如下所示; 我创建了一个类,该类具有文件名作为JavaScriptSerializer的属性。我将用于反序列化json的代码如下; ``` using (var reader = new StreamReader(twitpicResponse.GetResponseStream())) { ``` 最好的情况是什么? 问题答案: 您需要创建一个包含用户

  • 问题内容: 我是C ++的新手。使用序列化和反序列化类型数据的最简单方法是什么。我发现了一些使用示例,但它们对我来说是晦涩的。 问题答案: 请注意,将键解释为路径,例如,将对“ ab” =“ z”放置将创建{“ a”:{“ b”:“ z”}} JSON,而不是{“ ab”:“ z”} 。否则,使用是微不足道的。这是一个小例子。

  • 1) json: 2) 代码: 错误消息为: 我检查了json文件,它是有效的。 我不知道发生了什么事。

  • 问题内容: 我有一个WCF服务定义如下: 所述类的定义为: ILayoutService的实现如下: 我实现了,所以我可以得到将被(反)序列化的原义JSON。我正在使用它来测试通过jQuery调用此服务: 返回的布局是VB代码中调用返回的确切JSON 。出于某种原因,当我调用Web服务,输入参数在什么都不是。反序列化一定会失败。知道为什么吗? 问题答案: 仅凭您提供的细节很难分辨。 这是我的建议:

  • 问题内容: 我在反序列化以下json数组时遇到麻烦(对不起,大小): 如果将其粘贴到json-viewer中,则会得到以下结构: 现在,包含具有坐标的数组的数组具有可变大小。所以我想在Java中,整个对象应该是一个数组,其中包含数组的集合,每个数组都包含一个。就像是 但是gson不接受这一点。我收到以下错误消息: 这似乎很奇怪,因为对我来说好像不像一个数组。但这可能使我感到困惑,或多或少地迷路了…

  • 我收到来自第3方服务提供商的JSON响应,其中包含一系列对象。当我尝试使用Jackson api反序列化JSON时。我收到以下异常 我的回答是 我的POJO课是这样的 我正在尝试使用以下代码反序列化JSON 如果我试着去做 它在BEGIN_对象本身失败。 如何使用数组读取和反序列化JSON。我应该编写自己的反序列化器吗? 编辑如果我使用JSON字符串而不是流,那么我就能够取回所有Java对象。但为