String brokerUrl = "tcp://101.11.102.125:31316";
String topic = "sometpoic";
String kafkaBrokerUrl = "101.11.102.125:1012";
String kafkaTopic = "kafka_Topic";
@Bean
public DefaultMessageListenerContainer listenerContainer() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory();
ActiveMQTopic mqTopic = new ActiveMQTopic(topic);
conFactory.setBrokerURL(brokerUrl);
container.setConnectionFactory(conFactory);
container.setDestination(mqTopic);
container.setSessionTransacted(true);
return container;
}
@Bean
public MessageChannel jmsInChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public MessageChannel jmsOutChannel() {
return MessageChannels.publishSubscribe().get();
}
And this is my JMS adapter flow............
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
return IntegrationFlows
.from(Jms.messageDriverChannelAdapter(listenerContainer())
.autoStartup(true))
.channel(jmsInChannel())
.get();
}
<int:header-enricher input-channel="jmsInChannel" output-channel="jmsOutChannel">
<int:header name="kafkaBrokerUrl" value="${kafka.url}"></int:header>
<int:header name="kafkaTopic" value="${kafka.topic}"></int:header>
<int:service-activator input-channel="jmsOutChannel" ref="KafkaProducer" method="produceToJmsKafka"/>
<bean id="KafkaProducer" class="com.david.jms.JmsKafkaProducer"/>
那么如何将上面的xml代码转换为类似的DSL特定代码呢?
After getting the compilation error I have tried like this...
@SuppressWarnings("unchecked")
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
return IntegrationFlows
.from(Jms.messageDriverChannelAdapter(listenerContainer())
.autoStartup(true))
.channel(jmsInChannel())
.enrichHeaders(new MapBuilder()
.put("brokerid", brokerid)
.put("topic", topic)
.put("source", source)
.put("fileType", fileType))
.handle("KafkaProducer", "produceToJmsKafka")
.get();
}
@Bean
public JmsProducer KafkaProducer() {
return new JmsProducer();
}
那可能是这样的:
@Value("${kafka.url}")
private String kafkaBrokerUrl;
@Value("${kafka.topic}")
private String kafkaTopic;
....
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
return IntegrationFlows
.from(Jms.messageDriverChannelAdapter(listenerContainer())
.autoStartup(true))
.channel(jmsInChannel())
.enrichHeaders(new StringStringMapBuilder()
.put("kafkaBrokerUrl", kafkaBrokerUrl)
.put("kafkaTopic", kafkaTopic))
.handle("KafkaProducer", "produceToJmsKafka")
.get();
}
从这里看,我没有理由使用那些MessageChannel
bean,特别是publishsubscribe()
。
另一方面,从DSL1.1开始,我们提供了Spring Integration Kafka适配器的实现。
流是Gateway-->RequsetChannel-->Transform-->OutboundMailChannl-->MailSender(MailAdapter)
适配器目录和文件结构 适配器目录和文件结构布局的例子: /application/libraries/Driver_name Driver_name.php drivers Driver_name_subclass_1.php Driver_name_subclass_2.php Driver_name_subclass_3.php 注意: 为了在大小写敏感的文件系统上维持兼容性,这个 Drive
我已经用ImageView和TextView制作了抽屉菜单,它可以run.However,当我想用两个ImageView和一个TextView创建抽屉菜单时,它会用我的第二个ImageView显示错误(“尝试调用虚拟方法”)(我代码中的变量名称是“img”) 我的代码如下: 这是我的日志: 09-16 22:25:38.458:E/AndroidRuntime(14966):致命例外:主 09-1
在 Netty 4.1 中,是否可以创建子通道/管道或嵌套通道/管道? 例如,我有一个管道,其中安装了各种编解码器,用于协议/序列化。在管道结束时,我将一条消息传递给我的应用程序级逻辑。过度简化的示例: 然后,这个应用逻辑的句柄方法可以执行它想要的任何顺序逻辑。这清楚地分离了我的应用程序级别逻辑,但是,我想利用Netty的ChannelPipeline提供的“拦截过滤器模式的高级形式”。我想将管道
我有一个模型对象,它是在多次转换和解析之后填充的。现在,我需要使用spring集成将模型中的消息属性发送给kafka。我可以使用messageKey方法构造键,但如何从m.getPayload()之类的模型中获取实际消息。getMessage()并将其发送给Kafka。