这是使用注释代码示例的Spring集成执行器通道的后续问题。
我试图通过在“公共频道”中发布一条消息并阅读味精中设置的REPLY_CHANNEL来测试用红色突出显示的框。
“公共通道”是发布-订阅通道。REPLY_通道是一个队列通道。
由于这是一个JUnit测试,我已经模拟了jdbcTemboard、数据源和Impl以忽略任何DB调用。
我的问题是:当我在“公共频道”上发布消息时,我在REPLY\u频道上没有收到任何消息。junit一直在等待响应。
我应该改变什么才能在REPLY_CHANNEL上得到回应?
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(loader = AnnotationConfigContextLoader.class) --------- 1
@ActiveProfiles("test")
public class QueuetoQueueTest {
@Configuration
static class ContextConfiguration { ------------------------------------- 2
@Bean(name = "jdbcTemplate")
public JdbcTemplate jdbcTemplate() {
JdbcTemplate jdbcTemplateMock = Mockito.mock(JdbcTemplate.class);
return jdbcTemplateMock;
}
@Bean(name = "dataSource")
public DataSource dataSource() {
DataSource dataSourceMock = Mockito.mock(DataSource.class);
return dataSourceMock;
}
@Bean(name = "entityManager")
public EntityManager entityManager() {
EntityManager entityManagerMock = Mockito.mock(EntityManager.class);
return entityManagerMock;
}
@Bean(name = "ResponseChannel")
public QueueChannel getReplyQueueChannel() {
return new QueueChannel();
}
//This channel serves as the 'common channel' in the diagram
@Bean(name = "processRequestSubscribableChannel")
public MessageChannel getPublishSubscribeChannel() {
return new PublishSubscribeChannel();
}
}
@Mock
DBStoreDaoImpl dbStoreDaoImpl;
@Test
public void testDBConnectivity() {
Assert.assertTrue(true);
}
@InjectMocks -------------------------------------------------------------- 3
StoretoDBConfig storetoDBConfig = new StoretoDBConfig();
@Autowired
@Qualifier("ResponseChannel")
QueueChannel ResponseChannel;
@Autowired
@Qualifier("processRequestSubscribableChannel")
MessageChannel processRequestSubscribableChannel;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
}
@Test
public void outboundtoQueueTest() {
try {
when(dbStoreDaoImpl.storeToDB(any()))
.thenReturn(1); ----------------------------------------------- 4
//create message
Message message = (Message<String>) MessageBuilder
.withPayload("Hello")
.setHeader(MessageHeaders.REPLY_CHANNEL, ResponseChannel)
.build();
//send message
processRequestSubscribableChannel.send(message);
System.out
.println("Listening on InstructionResponseHandlertoEventProcessorQueue");
//wait for response on reply channel
Message<?> response = ResponseChannel.receive(); ----------------------- 5
System.out.println("***************RECEIVED: "
+ response.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
>
为JUnit加载“ContextConfiguration”,以便不访问数据库。
这是如何在JUnit中按照https://spring.io/blog/2011/06/21/spring-3-1-m2-testing-with-configuration-classes-and-profiles
在config类中,我们模拟jdbcTemplate、dataSource、entityManager,并定义发布请求的“公共通道”和ResponseChannel。
将jdbcTemplate、dataSource mock注入StoretoDBConfig,使数据库不会被击中
模拟DaoImpl类,以便忽略DB调用
测试块在这里,因为在REPLY\u通道上没有响应
更新代码:
Code inside 5 (the class that reads from common channel):
@Configuration
class HandleRequestConfig {
//Common channel - refer diagram
@Autowired
PublishSubscribeChannel processRequestSubscribableChannel;
//Step 9 - This channel is used to send queue to the downstream system
@Autowired
PublishSubscribeChannel forwardToExternalSystemQueue;
public void handle() {
IntegrationFlows.from("processRequestSubscribableChannel") // Read from 'Common channel'
.wireTap(flow->flow.handle(msg -> System.out.println("Msg received on processRequestSubscribableChannel"+ msg.getPayload())))
.handle(RequestProcessor,"validateMessage") // Perform custom business logic - no logic for now, return the msg as is
.wireTap(flow->flow.handle(msg -> System.out.println("Msg received on RequestProcessor"+ msg.getPayload())))
.channel("forwardToExternalSystemQueue"); // Post to 'Channel to another system'
}
}
//Code inside step 8 - 'Custom Business Logic'
@Configuration
class RequestProcessor {
public Message<?> validateMessage(Message<?> msg) {
return msg;
}
}
我试图达到的目标:我有单独的业务逻辑jUnit测试用例。我试图测试当请求发布到“公共通道”时,响应是在“另一个系统的通道”上接收到的。
为什么我不能使用原始的ApplicationContext:因为它连接到DB,我不希望我的JUnit连接到DB或使用嵌入式数据库。我希望忽略对数据库的任何调用。
我已经将回复通道设置为“ResponseChannel”,“自定义业务逻辑”不应该将其响应发送到“ResponseChannel”吗?
如果我必须在不同的频道上收听响应,我愿意这样做。我只想测试我在“公共频道”上发送的消息是否在“其他系统的频道”上收到。
更新2:回答阿特姆的问题。谢谢阿特姆的建议。
测试配置中是否包含“HandlerRequestConfig”我们不能直接调用handle()方法。相反,我认为如果我在“processRequestSubscribableChannel”上发布,将调用HandlerRequestConfig中的handle()方法,因为它侦听同一个通道。这是错的吗?如何测试HandlerRequestConfig。handle()方法?
我在HandleRequestConfig(代码更新)的每个步骤的末尾添加了wiretap。我发现没有打印任何窃听信息。这意味着我发布的消息甚至没有到达输入通道“processRequestSubscribableChannel”。我做错了什么?
注意:我尝试删除配置中的“processRequestSubscribableChannel”bean(以便使用applicationContext中的实际“processRequestSubscribableChannel”)。我收到了一个不满意的依赖项错误-预计至少有1个配置PublishSubscribeChannel的bean。
更新3:已发布请求的详细信息。
@RunWith(SpringRunner.class)
@SpringBootTest
public class QueuetoQueueTest {
// Step 1 - Mocking jdbcTemplate, dataSource, entityManager so that it doesn't connect to the DB
@MockBean
@Qualifier("jdbcTemplate")
JdbcTemplate jdbcTemplate;
@MockBean
@Qualifier("dataSource")
public DataSource dataSource;
@MockBean
@Qualifier("entityManager")
public EntityManager entityManager;
@Bean(name = "ResponseChannel")
public PublishSubscribeChannel getReplyQueueChannel() {
return new PublishSubscribeChannel();
}
//Mocking the DB class
@MockBean
@Qualifier("dbStoreDaoImpl")
DBStoreDaoImpl dbStoreDaoImpl ;
//Inject the mock objects created above into the flow that stores data into the DB.
@InjectMocks
StoretoDBConfig storetoDBConfig = new StoretoDBConfig();
//Step 2 - Injecting MessageChannel used in the actual ApplicationContext
@Autowired
@Qualifier("processRequestSubscribableChannel")
MessageChannel processRequestSubscribableChannel;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
}
@Test
public void outboundtoQueueTest() {
try {
when(dbStoreDaoImpl.storeToDB(any()))
.thenReturn(1);
//create message
Message message = (Message<?>) MessageBuilder
.withPayload("Hello")
.build();
//send message - this channel is the actual channel used in ApplicationContext
processRequestSubscribableChannel.send(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
更新1:StoretoDBConfig中的代码
@Configuration
@EnableIntegration
public class StoretoDBConfig {
@Autowired
DataSource dataSource;
/*
* Below code is irrelevant to our current problem - Including for reference.
*
* storing into DB is delegated to a separate thread.
*
* @Bean
* public TaskExecutor taskExecutor() {
* return new SimpleAsyncTaskExecutor();
* }
*
* @Bean(name="executorChannelToDB")
* public ExecutorChannel outboundRequests() {
* return new ExecutorChannel(taskExecutor());
* }
* @Bean(name = "DBFailureChannel")
* public static MessageChannel getFailureChannel() {
* return new DirectChannel();
* }
* private static final Logger logger = Logger
* .getLogger(InstructionResponseHandlerOutboundtoDBConfig.class);
*/
@Bean
public IntegrationFlow handle() {
/*
* Read from 'common channel' - processRequestSubscribableChannel and send to separate thread that stores into DB.
*
/
return IntegrationFlows
.from("processRequestSubscribableChannel")
.channel("executorChannelToDB").get();
}
}
存储在单独线程上的DB中的代码:
@Repository
public class DBStoreDaoImpl implements DBStoreDao {
private JdbcTemplate jdbcTemplate;
@Autowired
public void setJdbcTemplate(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
@Transactional(rollbackFor = Exception.class)
@ServiceActivator(inputChannel = "executorChannelToDB")
public void storetoDB(Message<?> msg) throws Exception {
String insertQuery ="Insert into DBTable(MESSAGE) VALUES(?)";
jdbcTemplate.update(insertQuery, msg.toString());
}
}
请向我们展示订阅了该公共频道
的内容。您的图表不知何故与您向我们展示的内容无关。您演示的代码未满。
reportyChannel
的真正问题是,某些东西确实必须向它发送消息。如果您的流只是单向的——发送、存储和没有返回,那么您确实不会为此获得任何东西。这就是为什么要显示这些通道适配器。
观察消息过程的最佳方法是打开组织的调试日志记录。springframework。集成类别。
虽然我看到您声明这些通道是在ContextConfiguration中的,并且实际上没有任何订阅者订阅
getRequestChannel。因此,没有人会使用您的消息,当然,也没有人会向您发送回复。
请重新考虑您的测试类,以使用真实的应用程序上下文。否则,如果你真的不测试你的流量,那么你想要实现什么是完全不清楚的。。。
我在我们的项目中引入了spring集成,而不是遗留集成架构。该体系结构支持发送者和累犯。每个发件人可以配置3个目的地。 null Spring integration gateways看起来很合适。我可以使用default-request-channel来表示主流,error-channel来表示失败流。备份流的问题。如何复制网关传入消息并将其放置到备份通道? 更准确地说,这里是一个测试和代码。
但我得到的错误如下: POM: 我按以下方式配置入站网关: 并且,服务激活器: 顺便说一句,只有当我在服务激活器中删除outputChannel="outputChannel"时,它才有效。 这个问题有什么解释吗,我有什么误解吗?
我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方
我想了解Spring集成中如何处理消息:串行或并行。特别是我有一个带有轮询器和HTTP出站网关的入站通道适配器。我猜拆分器、变压器、标头丰富器等不会产生自己的线程。 我可能错过了,但是这些细节在留档的某个地方指定了吗? 还可以通过编程方式获取系统中的所有频道吗?
可以在运行时向spring integration dsl注册MessageSources吗? 在我的例子中,我想创建多个FileReadingMessageSources(基于UI的输入),然后将有效负载发送到特定的通道/jms路由(从元数据或用户输入读取) 另一个问题是,是否可以动态注册IntegrationFlows?
我需要实现一个由多个步骤组成的集成流程,每个步骤都可以由不同数量的处理器(插件)执行。 到目前为止我所拥有的: 预期的行为如下: 通过网关发送第一个请求 一切正常,但结果不是预期的,我只收到2个(随机)项目,而不是4个。 我认为问题在于聚合器仅在两个项目之后触发发布,因为“step/2”通道中的“apply sequence”覆盖了“step/1”中的“apply sequence”。所以问题是: