当前位置: 首页 > 知识库问答 >
问题:

无法通过JDBC源连接器触发自定义生成器拦截器

穆招
2023-03-14

我已经创建了一个自定义的生产者拦截器(Audit病人拦截器),它接受一些自定义配置(application_id,类型等)。)。我已经从奥迪生产者拦截器项目生成了一个罐子,并将罐子放在Kafka连接中 /usr/share/java/monitoring-interceptors.当我尝试发布具有以下配置的JDBC-Source连接器时,我的审计拦截器没有被触发。

{
"name": "jdbc-source-xx-xxxx-xxx-xxx",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:sqlserver://{{ip}}:1433;databaseName=XX;useNTLMv2=true",
    "connection.user": "SA",
    "connection.password": "Admin1234",
    "producer.interceptor.classes": "com.optum.payer.common.kafka.audit.interceptor.AuditProducerInterceptor",
    "topic.prefix": "MyTestTopic",
    "query": "SELECT ID, chart_id, request_id, UpdatedDate FROM xxx.xxx WITH (NOLOCK)",
    "mode": "timestamp",
    "timestamp.column.name": "UpdatedDate",
    "producer.audit.application.id": "HelloApplication",
    "producer.audit.type": "test type",
    "poll.interval.ms": "10",
    "tasks.max": "1",
    "batch.max.rows": "100",
    "validate.non.null": "false",
    "numeric.mapping":"best_fit",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://{{ip}}:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://{{ip}}:8081"
    
}}

正如您在配置中看到的,我在连接器配置中添加了以下道具来触发自定义拦截器。但我在Kafka Connect中没有看到任何与AuditProducerInterceptor相关的日志。

"producer.interceptor.classes": "com.optum.payer.common.kafka.audit.interceptor.AuditProducerInterceptor"
"producer.audit.application.id": "HelloApplication",
"producer.audit.type": "test type"

我尝试在 kafka-connect 配置中添加这三个配置,我能够触发拦截器。但是我想通过 JDBC 源连接器触发拦截器,以便我可以通过连接器传递自定义 props(application_id、类型等)。请帮我解决这个问题

共有1个答案

龚弘业
2023-03-14

如果在Connect worker中允许了客户端覆盖(默认情况下启用),您将需要使用< code>producer.override前缀

来自文档

从2.3.0开始,可以通过分别使用Kafka源或Kafka接收器的前缀producer.override.consumer.override.为每个连接器单独配置客户端配置覆盖。

 类似资料:
  • null 我尝试将@priority(interceptor.priority.platform_beform)和@prematching也放入我的过滤器中,但即使是在OIDC启动后也会调用。 另外,是否有任何方法支持扩展quarkus oidc逻辑以包括自定义代码? 我无法获得oidc和keycloak-auth拦截器的优先级(知道这些可以帮助我决定过滤器的优先级)。请帮忙。

  • 本文向大家介绍SpringBoot定义过滤器、监听器、拦截器的方法,包括了SpringBoot定义过滤器、监听器、拦截器的方法的使用技巧和注意事项,需要的朋友参考一下 一、自定义过滤器 创建一个过滤器,实现javax.servlet.Filter接口,并重写其中的init、doFilter、destory方法。 二、自定义监听器 创建一个过滤器,实现ServletContextListener接口

  • 问题内容: 我正在使用Java EE 6和Jboss AS7.1,并尝试使用拦截器绑定(来自jboss网站的示例)。 我有一个InterceptorBinding注解: 拦截器: 还有一个豆: 但是拦截器没有被称为。。。 在编写此代码时将调用拦截器: 谢谢你的帮助。 问题答案: 您是否按照参考示例中的说明启用了拦截器? 缺省情况下,bean档案没有通过拦截器绑定绑定的已启用拦截器。必须通过将侦听器

  • 我正在尝试使用spring rest模板POST w/自定义拦截器将一个大文件从一个微服务发布到另一个微服务,如下所示: (我是否使用SimpleClientHttpask estFactory或HttpComponentsClientHttpask estFactory没有区别) 添加拦截器会在调用getRequestFactory时创建一个新的侦听ClientHttPrequestFactor

  • 我希望向sbt添加一个自定义源生成器,并将其与scalapb(Scala协议缓冲区生成器)一起使用。每一个都独立工作。然而,当两者结合在一起时,项目在清理后第一次编译失败。如果我再次运行compile,它就会成功。 错误消息: 要再现此错误,您需要在src/main/protobuf中至少有一个proto文件。 我的自定义任务和scalapb这两个源生成器会发生冲突,这让我感到困惑。它们不应该都写

  • 我有一个Flume组件在监听Syslog流。我做了一个自定义的拦截器来修改调用,但它不起作用。我做错了什么?谢谢你,Andrea 拦截器是一个编译良好的JAR文件,位于@FLUME_HOME/bin目录中 系统将事件记录在文件中而不修改它们,这是相关的DEBUG日志: