我正在尝试加入两个不同的CSV fles,它们有一个共同的列,如下所示。
Csv1:(生成流文件)
Emp_Id,Name,Address,Mobile_No
1,Name1,Add1,Mob1 2,Name2,Add2,Mob2
Csv2:(在CSVRecordLookupService配置中作为Lookup CSV给出)
Emp_Id,Salary,Department
1,10k,dev
2,20k,mn
所需输出:
Emp_Id,Name,Address,Mobile_No,Salary,Department
1,Name1,Add1,Mob1,10k,dev
2,Name2,Add2,Mob2,20k,mng
我对LookupRecord处理器的配置是基于这里的描述:https://gist . github . com/ijokarumawak/b 9c 95 a 0d 0 c 86 c 97 FFE aeb 5 ef 95320 b 8 b
但是当我执行流程时,我从日志中看到以下错误:
2020-07-15 19:04:01,603 ERROR [Timer-Driven Process Thread-8] o.a.n.processors.standard.LookupRecord LookupRecord[id=538b171d-0173-1000-fc2e-b228d34dfc53] Failed to process StandardFlowFileRecord[uuid=d32f8354-5849-4c07-b8c3-a00f0bc5abe3,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1594828212922-466, container=default, section=466], offset=214923, length=65],offset=0,name=d32f8354-5849-4c07-b8c3-a00f0bc5abe3,size=65]: org.apache.nifi.processor.exception.ProcessException: Failed to lookup coordinates {key=1} in Lookup Service
org.apache.nifi.processor.exception.ProcessException: Failed to lookup coordinates {key=1} in Lookup Service
at org.apache.nifi.processors.standard.LookupRecord.doResultPathReplacement(LookupRecord.java:395)
at org.apache.nifi.processors.standard.LookupRecord.route(LookupRecord.java:303)
at org.apache.nifi.processors.standard.LookupRecord.route(LookupRecord.java:68)
at org.apache.nifi.processors.standard.AbstractRouteRecord$1.process(AbstractRouteRecord.java:134)
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2324)
at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2292)
at org.apache.nifi.processors.standard.AbstractRouteRecord.onTrigger(AbstractRouteRecord.java:121)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
at org.apache.nifi.lookup.CSVRecordLookupService.lookup(CSVRecordLookupService.java:234)
at org.apache.nifi.lookup.LookupService.lookup(LookupService.java:48)
at sun.reflect.GeneratedMethodAccessor613.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:87)
at com.sun.proxy.$Proxy227.lookup(Unknown Source)
at org.apache.nifi.processors.standard.LookupRecord.doResultPathReplacement(LookupRecord.java:393)
... 18 common frames omitted
到目前为止我所尝试的:
> < li>
更新了< code>CSVRecordWriter中使用的Avroschema(向架构中添加了所需的字段)
{type:"记录","命名空间":"nifi","名称":"JoinedEmp","字段": [ { "名称":"Emp_Id","类型":"字符串" }, { "名称":"名称","类型":"字符串" }, { "名称":"地址","类型":"字符串" }, { "名称":"Mobile_No","类型":"字符串" }, { "名称":"薪水","类型":"字符串" }, { "名称":"部门","类型":"字符串" } ] }
更新了/Emp_Id的结果记录路径
两者都不起作用。
查看了Web中的不同位置的LookupRecord配置和示例,但无法设置我在这里看到的错误。
谁能帮我在这里什么配置是我在这里设置错误。
提前致谢。
注意:根据下面收到的建议更新了我的问题(删除了杂乱的内容)
查找服务希望查找键的值是字符串,但您给它的是整数。这有点烦人,但键的值必须转换为字符串,因此请尝试以下方法获取键
属性的值:
toString( /Emp_Id, “UTF-8”)
如何在SL4J中配置日志记录?我的项目有很多类:class1、class2、Class3....我想做两件事:将所有类记录到一个名为FILE1的文件追加器中,并具有警告级别(class1、class2、class3...)将一个名为class1的类记录到具有调试级别的名为FILE2的文件追加器中。 问题是,当我将class1的记录器配置为具有WARN级别的FILE1 appender时,我不知道如
我希望我的Spring批处理应用程序一次从数据库中读取50条记录,然后将这50条记录发送给处理器,然后发送给写入器。 有人可以告诉我如何做到这一点吗? 我尝试使用JdbcPagingItemReader并将pageSize设置为50,这样可以读取50条记录,但是rowMapper、处理器和编写器一次接收一条记录,而不是获得50条记录。 如何使处理器和写入器在dto中获得50条记录,而不是一次接收一
我想加入2个表 表1: 表2: 我正在做的是 我期望得到的是
由此我从NetBeans得到以下错误消息: 所有异常消息都写入控制台,包括用户日志的消息。所以我知道两个伐木工都在工作。记录器附加器写入文件OK。userLog记录器不会创建任何文件,它使用几乎相同的附加器。 看来我还没有正确地命名一些东西来允许log4j2找到记录器。我试过各种命名的组合来弄对它,但我搞不清问题出在哪里。 我必须在每个类的记录器初始化中包含类名吗? 编辑:我需要包含包名。我花了一
我使用的是log4j2,jar文件如下:log4j-api-2.14.0。jarlog4j-core-2.14.0。jarlog4j-slf4j-impl-2.14.0。罐子 执行以下行:LogManager。getLogger(“com.foo.Bar1”); 使用以下VM参数:-Dlog4j。配置=test1。xml 配置文件test1。xml如下所示: 当java文件启动错误状态记录器重新配
我已经用实现了一个Kafka消费者。我的使用者应该使用事件,然后为每个事件向其他服务发送REST请求。只有当REST服务关闭时,我才想重试。否则,我可以忽略失败的事件。 我的容器工厂配置如下: 我使用来设置异常和相应的重试策略。 如果我有机会了解中的哪个记录失败了,那么我将创建的自定义实现,以检查失败的消息是否可重试(通过使用字段)。如果它是不可重试的,那么我将从列表中删除它以重新查找。 关于如何