当前位置: 首页 > 知识库问答 >
问题:

如何在NiFi ValidateRecord处理器/JsonRecordSetWriter中将时间戳序列化为Json字段

慕皓君
2023-03-14

如何将时间戳序列化为NiFi中的Json字段validateRecordprocessor/JSONRecordSetWriter

在输入时,我有一个CSV文件,其中有一个时间戳列,格式为yyyy-mm-dd hh:mm:ss.sss。在我的NiFi流中,我有一个ValidateRecord处理器,它使用CSVReader进行读取,并使用JSONRecordSetWriter作为编写器。它们都使用Avro模式,时间戳字段定义为

"fields" : [ {
    "name" : "timestamp",
    "type" : {
            "type" : "long",
            "logicalType" : "timestamp-millis"
        },
    "doc" : "Type inferred from '2016/10/08 07:51:00.000'"
    }, {
    ...

当一个字段值为2016-10-08 07:51:0.000的记录出现时,NiFi日志中出现一个异常:

2018-10-18 17:05:59,135 ERROR [Timer-Driven Process Thread-1] o.a.n.processors.standard.ValidateRecord ValidateRecord[id=3d44915d-a52a-3eb0-1ae1-7b0cbe4b1a03] Failed to write MapRecord[{timestamp=2016-10-08 07:51:00.0, ...  ] with schema {"type":"record","name":"redfunnel","doc":"Schema generated by Kite","fields":[{"name":"timestamp","type":{"type":"long","logicalType":"timestamp-millis"},"doc":"Type inferred from '2016/10/08 07:51:00.000'"},{ .... }]} as a JSON Object due to java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp): java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp)
java.lang.IllegalStateException: No ObjectCodec defined for the generator, can only serialize simple wrapper types (type passed java.sql.Timestamp)
    at org.codehaus.jackson.impl.JsonGeneratorBase._writeSimpleObject(JsonGeneratorBase.java:556)
    at org.codehaus.jackson.impl.JsonGeneratorBase.writeObject(JsonGeneratorBase.java:317)
    at org.apache.nifi.json.WriteJsonResult.writeRawValue(WriteJsonResult.java:267)
    at org.apache.nifi.json.WriteJsonResult.writeRecord(WriteJsonResult.java:201)
    at org.apache.nifi.json.WriteJsonResult.writeRawRecord(WriteJsonResult.java:149)
    at org.apache.nifi.processors.standard.ValidateRecord.onTrigger(ValidateRecord.java:342)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
    at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

在我的JSONRecordsetWriter的属性中,我尝试将时间戳的编写格式指定为yyyy-mm-dd hh:mm:ss.sss

但不幸的是没有成功,我仍然在NiFi日志中得到相同的异常。

这是否意味着JSONRecordSetWriter默认情况下不能序列化java.time.timestamp,即使它有Timestamp Format属性来进行配置呢?

是否可以使用开箱即用的NiFi组件根据自定义格式编写时间戳,或者我必须修改JSONRecordsetWriter

共有1个答案

卫嘉言
2023-03-14

似乎我找到了一个在我的情况下工作的配置。

我不得不将模式分成两部分:一个用于输入,另一个用于输出。

因此,schema1将时间戳字段定义为:

{
    "name" : "timestamp",
    "type" : "string",
    "doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}
{
    "name" : "timestamp",
    "type" : {
        "type" : "long",
        "logicalType" : "timestamp-millis"
    },
    "doc" : "Type inferred from '2016/10/08 07:51:00.000'"
}
    null

希望这能在类似的情况下对其他人有所帮助。

 类似资料:
  • 问题内容: 我需要将一些对象序列化为JSON并发送到WebService。如何使用org.json库?否则我将不得不使用另一个?这是我需要序列化的类: 我只放置了类的变量和构造函数,但也包含了getter和setter方法。所以如果有人可以帮忙 问题答案: 没有注释的简单方法是使用Gson库 就那么简单:

  • 我刚刚根据新的(ish)Java8时间包将许多日期转换为LocalDateTime。到目前为止,我一直很喜欢这种转换,直到我开始尝试序列化和反序列化。 如何配置Jackson以支持他们?: LocalDateTime --serialize-- 这里有大量关于转换为格式化字符串的资料,但我似乎找不到utc时间戳的现成解决方案。

  • 在用pandas处理数据时,从数据库中读取某一列为时间戳。 用timestamp.strftime('%Y-%m-%d')将其转化为字符串格式的日期。 但遇到空值会报错,请问该如何高效的实现时间戳转化为字符串,同时对空值进行适当处理。 空值的元素为NaTType 源代码如下:

  • 问题内容: 我需要将一些对象序列化为JSON并发送到WebService。如何使用org.json库?否则我将不得不使用另一个?这是我需要序列化的类: 我只放了类的变量和构造函数,但也有getter和setter方法。所以如果有人可以帮忙 问题答案: 没有注释的简单方法是使用Gson库 就那么简单:

  • 问题内容: 刚刚下载了ServiceStack.Text以在我的ASP.NET中使用它。我有很多属性的类,想将其中的五个(字符串,整数,二进制)序列化为JSON。有人可以发布简单的示例如何从我的课程中创建JSon对象吗? 问题答案: 默认情况下,ServiceStack将反序列化POCO的所有公共属性。 如果只想序列化一些属性,则想用[DataContract],[DataMember]属性装饰类

  • 问题内容: 我正在为我的应用程序实现数据库,并且尝试将其“连接”到我的REST接口。数据以JSON形式传入,并带有新的JSON-Support(自Realm 0.76起),我可以随意使用JSON,它会创建适当的对象和RealmLists。 但是我该如何做相反呢?也就是说,采用RealmObject并将其序列化为JSON?它还应该序列化该对象内的任何RealmList。 问题答案: 来自Realm的