我们在Spring Boot的基础上开发了一个内部公司框架,我们希望通过Spring Cloud Stream支持Kafka Streams。我们需要自动向所有出站消息注入一些头。我们通过标准的Spring Cloud Stream Kafka Binder注册了一个定制的通道拦截器
,实现了这一点,但这不适用于Kafka Streams,因为它们似乎遵循不同的路径。
对于Spring Cloud Stream Kafka Streams binder,是否有任何等效于Channel Interceptor
?
我找到了这个定制器/配置器:
java prettyprint-override"> @Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
return factoryBean -> {
factoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
@Override
public void configureBuilder(final StreamsBuilder builder) {
}
@Override
public void configureTopology(final Topology topology) {
}
});
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(final KafkaStreams kafkaStreams) {
}
});
};
}
我的最后一个想法是使用配置拓扑
方法自动修改拓扑
,并在最后一个接收节点之前插入一个转换器
,但为了添加这个新节点,我必须指定父节点,所以我需要知道最后一个接收节点的名称,也许所有节点的名称都是由Kafka Streams自动生成的。。。唯一的方法是使用拓扑。descripe()方法,并可能解析字符串输出。。。与简单的通道拦截器相比,这听起来太复杂了。
有什么想法吗?
您可以将一个producerInterceptor
添加到流配置中。
/**
* A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before
* they are published to the Kafka cluster.
* <p>
* This class will get producer config properties via <code>configure()</code> method, including clientId assigned
* by KafkaProducer if not specified in the producer config. The interceptor implementation needs to be aware that it will be
* sharing producer config namespace with other interceptors and serializers, and ensure that there are no conflicts.
* <p>
* Exceptions thrown by ProducerInterceptor methods will be caught, logged, but not propagated further. As a result, if
* the user configures the interceptor with the wrong key and value type parameters, the producer will not throw an exception,
* just log the errors.
* <p>
* ProducerInterceptor callbacks may be called from multiple threads. Interceptor implementation must ensure thread-safety, if needed.
* <p>
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
*/
public interface ProducerInterceptor<K, V> extends Configurable {
您可以在那里修改记录的标题。
(您也可以将此技术用于消息通道绑定器,而不是通道拦截器
)。
问题内容: 我知道(其中e是一个异常)会打印发生的异常,但是,我试图找到与Java等效的python,它可以将异常确切地跟踪到发生的那一行,并打印出整个轨迹。 谁能告诉我Python 的等效功能吗? 问题答案: 在块内执行此操作时,它将自动使用当前异常。有关更多信息,请参见http://docs.python.org/library/traceback.html。
问题内容: 我正在使用Java进行编译器设计项目。进行了词法分析(使用jflex),我想知道哪种yacc类工具最适合(最有效,最易用等)进行语法分析,为什么这样做。 问题答案: 如果您特别想要类似YACC的行为(表驱动),那么我所知道的唯一一个就是CUP。 在Java世界中,似乎有更多的人倾向于ANTLR或JavaCC之类的递归下降解析器。 而且效率很少是选择解析器生成器的原因。
问题内容: 具有向量化if / else的语义(类似于Apache Spark的/ DataFrame方法)。我知道我可以在pandas上使用,但通常会定义自己的API来代替原始函数使用,通常使用/更为方便。 果然,我发现了。但是,乍一看,它具有完全不同的语义。我找不到一种方法来重写使用pandas的最基本的示例: 我是否缺少明显的东西?还是将熊猫命名为一个完全不同的用例,尽管名称与相同? 问题答
问题内容: Javascript中有与Java 方法等效的方法吗? 更新1 那么,零延迟会和完全一样吗? 问题答案: 如果要异步运行某些内容( 稍后 ),请尝试 JavaScript是单线程的。如果要在事件处理程序之外运行一些耗时(CPU密集型)的任务,则可以使用上面的技术来执行此操作,但是它仍然会占用事件处理线程(导致UI冻结)。 在浏览器中运行CPU密集型任务通常是一个坏主意(网络工作者可能会
问题内容: 我有一行用于SQL Server的代码,它采用的日期列为“ YYYYMMDD”,其中DD为00,并将00转换为01,以便它与datetime一起使用。我希望能够使用MySQL 当前适用于SQL Server的代码: 但是isdate在MySQL中无效,该怎么解决? 问题答案: 您可以尝试使用STR_TO_DATE函数。如果表达式不是日期,时间或日期时间,则返回。
问题内容: 我有这个装饰器: 该代码仅能在linux上执行任何操作,就像在Windows上一样。在Windows中也可以使用此代码的最简单方法是什么? 问题答案: 它不是很漂亮,但是我不得不以跨平台的方式做类似的事情,于是我想到了使用单独的线程。基于信号的系统无法在所有平台上可靠地工作。 此类的使用可以包装在装饰器中,也可以制成上下文处理程序。 YMMV。
问题内容: 我们曾经声明要在类之间传递数据,如下所示: 现在没有类,如何在类之间传递数据? 问题答案: Swift不区分属性和实例变量(即属性的基础存储)。要定义属性,只需在类的上下文中声明一个变量。 swift类只是ClassName.swift文件。 您将一个类和属性声明为 您可以通过点表示法访问属性值。作为Xcode6测试4的,也有访问修饰符(,并在SWIFT)。默认情况下,每个属性都是。有
问题内容: 我正在将一些旧的PHP代码从mysql移植到MySQLi,但遇到了一个小问题。 有没有等效的旧功能? 当您处理多于1行时,我知道它比其他函数要慢,但是很多时候我只有1个结果和1个字段。使用它,我可以将4行压缩为1行。 旧代码: 所需代码: 但是没有这样的事情。:( 有什么我想念的吗?还是我必须吸收它并制作所有东西: 问题答案: PHP 5.4现在支持函数数组解引用,这意味着您可以执行以