当前位置: 首页 > 知识库问答 >
问题:

Spring integration DSL在Java1.7中创建JMS MessageDriver通道适配器

高飞翮
2023-03-14
String brokerUrl = "tcp://101.11.102.125:31316";
String topic = "sometpoic"; 
String kafkaBrokerUrl = "101.11.102.125:1012";
String kafkaTopic = "kafka_Topic";

@Bean
public DefaultMessageListenerContainer listenerContainer() {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory();
    ActiveMQTopic mqTopic = new ActiveMQTopic(topic);           
    conFactory.setBrokerURL(brokerUrl);
    container.setConnectionFactory(conFactory);
    container.setDestination(mqTopic);
    container.setSessionTransacted(true);
    return container;
}
@Bean
public MessageChannel jmsInChannel() {
     return MessageChannels.publishSubscribe().get();
}

@Bean
public MessageChannel jmsOutChannel() {
     return MessageChannels.publishSubscribe().get();
}

And this is my JMS adapter flow............

@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
    return IntegrationFlows
            .from(Jms.messageDriverChannelAdapter(listenerContainer())                      
                   .autoStartup(true))                     
                   .channel(jmsInChannel())                                              
                   .get();
}
<int:header-enricher input-channel="jmsInChannel" output-channel="jmsOutChannel">
<int:header name="kafkaBrokerUrl" value="${kafka.url}"></int:header>
<int:header name="kafkaTopic" value="${kafka.topic}"></int:header>
<int:service-activator input-channel="jmsOutChannel" ref="KafkaProducer" method="produceToJmsKafka"/>
<bean id="KafkaProducer" class="com.david.jms.JmsKafkaProducer"/>

那么如何将上面的xml代码转换为类似的DSL特定代码呢?

  After getting the compilation error I have tried like this...
 @SuppressWarnings("unchecked")
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
    return IntegrationFlows
            .from(Jms.messageDriverChannelAdapter(listenerContainer())                      
                   .autoStartup(true))                     
                   .channel(jmsInChannel())
                   .enrichHeaders(new MapBuilder()
                                .put("brokerid", brokerid)
                                .put("topic", topic)
                                .put("source", source)
                                .put("fileType", fileType))
                   .handle("KafkaProducer", "produceToJmsKafka")
                   .get();
}

@Bean
public JmsProducer KafkaProducer() {
    return new JmsProducer();
}   

共有1个答案

赏逸春
2023-03-14

那可能是这样的:

@Value("${kafka.url}")
private String kafkaBrokerUrl;

@Value("${kafka.topic}")
private String kafkaTopic;

....
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
    return IntegrationFlows
            .from(Jms.messageDriverChannelAdapter(listenerContainer())                      
                   .autoStartup(true))                     
                   .channel(jmsInChannel()) 
                   .enrichHeaders(new StringStringMapBuilder()
                                            .put("kafkaBrokerUrl", kafkaBrokerUrl)
                                            .put("kafkaTopic", kafkaTopic))
                   .handle("KafkaProducer", "produceToJmsKafka")
                   .get();
}

从这里看,我没有理由使用那些MessageChannelbean,特别是publishsubscribe()

另一方面,从DSL1.1开始,我们提供了Spring Integration Kafka适配器的实现。

 类似资料:
  • 流是Gateway-->RequsetChannel-->Transform-->OutboundMailChannl-->MailSender(MailAdapter)

  • 适配器目录和文件结构 适配器目录和文件结构布局的例子: /application/libraries/Driver_name Driver_name.php drivers Driver_name_subclass_1.php Driver_name_subclass_2.php Driver_name_subclass_3.php 注意: 为了在大小写敏感的文件系统上维持兼容性,这个 Drive

  • 我已经用ImageView和TextView制作了抽屉菜单,它可以run.However,当我想用两个ImageView和一个TextView创建抽屉菜单时,它会用我的第二个ImageView显示错误(“尝试调用虚拟方法”)(我代码中的变量名称是“img”) 我的代码如下: 这是我的日志: 09-16 22:25:38.458:E/AndroidRuntime(14966):致命例外:主 09-1

  • 在 Netty 4.1 中,是否可以创建子通道/管道或嵌套通道/管道? 例如,我有一个管道,其中安装了各种编解码器,用于协议/序列化。在管道结束时,我将一条消息传递给我的应用程序级逻辑。过度简化的示例: 然后,这个应用逻辑的句柄方法可以执行它想要的任何顺序逻辑。这清楚地分离了我的应用程序级别逻辑,但是,我想利用Netty的ChannelPipeline提供的“拦截过滤器模式的高级形式”。我想将管道

  • 我有一个模型对象,它是在多次转换和解析之后填充的。现在,我需要使用spring集成将模型中的消息属性发送给kafka。我可以使用messageKey方法构造键,但如何从m.getPayload()之类的模型中获取实际消息。getMessage()并将其发送给Kafka。