我看过其他与我的问题相关的帖子,但没有一个答案帮助我解决了我的问题。
我试图遵循下面的示例:https://github.com/garyrussell/spring-integration-samples/tree/master/medition/tcp-client-server-multiplex
显然,我的理解是错误的,因为我没有看到响应如何返回到网关,而且它不起作用。
我可以看到套接字正在打开,但它在消息发送后立即被关闭,因此反序列化器返回一个EOF:null错误。
>
是否设置了TcpReceivingChannelAdapter错误?
@EnableIntegration
@IntegrationComponentScan
@Configuration
public class TcpMultiPlexConfig implements ApplicationListener<TcpConnectionEvent> {
protected final static Logger LOGGER = LoggerFactory.getLogger(TcpMultiPlexConfig.class);
@Value("${engine.port}")
private int port;// = 55001;
@Value("${engine.address}")
private String ipAddress;// = "192.168.1.1";
@Value("${engine.timeout}")
private int timeout;
@Override
public void onApplicationEvent(TcpConnectionEvent tcpEvent) {
TcpConnection source = (TcpConnection) tcpEvent.getSource();
if (tcpEvent instanceof TcpConnectionOpenEvent) {
LOGGER.info("********* Socket Opened " + source.getConnectionId());
} else if (tcpEvent instanceof TcpConnectionCloseEvent) {
LOGGER.info("*********** Socket Closed " + source.getConnectionId());
}
}
@MessagingGateway(defaultRequestChannel="input")
public interface MultiPlexGateway {
String send(@Payload String in, @Header("CORRELATION_ID") String transactionId);
}
// TODO the request and response are being put together
@Bean
@ServiceActivator(inputChannel = "input")
public BridgeHandler bridge() {
BridgeHandler bridge = new BridgeHandler();
bridge.setOutputChannelName("toAggregatorClient");
bridge.setOrder(1);
return bridge;
}
@Bean
public PublishSubscribeChannel input() {
return new PublishSubscribeChannel();
}
@Bean
public DirectChannel toAggregatorClient() {
return new DirectChannel();
}
@Bean
public DirectChannel noResponseChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel toTransformerClient() {
return new DirectChannel();
}
@Bean
public TcpReceivingChannelAdapter inAdapterClient() {
TcpReceivingChannelAdapter receivingAdapter = new TcpReceivingChannelAdapter();
receivingAdapter.setConnectionFactory(clientConnectionFactory());
receivingAdapter.setOutputChannel(toAggregatorClient());
receivingAdapter.setClientMode(true);
return receivingAdapter;
}
@Bean
@ServiceActivator(inputChannel = "input")
public TcpSendingMessageHandler outAdapterClient() {
TcpSendingMessageHandler outAdapter = new TcpSendingMessageHandler();
outAdapter.setOrder(2);
outAdapter.setConnectionFactory(clientConnectionFactory());
outAdapter.setClientMode(true);
return outAdapter;
}
@Bean(name ="clientCFMP")
public AbstractClientConnectionFactory clientConnectionFactory() {
TcpNetClientConnectionFactory tcp = new TcpNetClientConnectionFactory(this.ipAddress , this.port);
tcp.setSerializer(new DefaultSerializer()); // out
// byte delimeter = "\n".getBytes()[0];
// ElasticByteArrayRawSingleTerminatorSerializer deserializer = new ElasticByteArrayRawSingleTerminatorSerializer(delimeter);
// DefaultDeserializer deserializer = new DefaultDeserializer();
MyDefaultDeserializer deserializer = new MyDefaultDeserializer();
tcp.setDeserializer(deserializer);
tcp.setSoTimeout(timeout);
tcp.setSingleUse(false);
MapMessageConverter mc = new MapMessageConverter();
mc.setHeaderNames("CORRELATION_ID");
tcp.setMapper(new MessageConvertingTcpMessageMapper(mc));
return tcp;
}
@MessageEndpoint
public static class MyConverters {
@Transformer(inputChannel="toTransformerClient", outputChannel = "resultToString")
public byte[] getResponse(MessageGroup payload) {
// byte[] result = null;
List<Message<?>>list = new ArrayList<>(payload.getMessages());
byte[] result = (byte[]) list.get(1).getPayload();
// LOGGER.info(result);
return result;
}
@Transformer(inputChannel="resultToString")
public String convertResult(byte[] bytes) {
String result = new String(bytes);
LOGGER.info("*********** RESULT => " + result);
return result;
}
@ServiceActivator(inputChannel = "noResponseChannel")
public MessageTimeoutException noResponse(String input) {
throw new MessageTimeoutException("****** No response received for => " + input);
}
}
@Bean
@ServiceActivator(inputChannel = "toAggregatorClient", outputChannel = "toTransformerClient")
public FactoryBean<MessageHandler> aggregatorFactoryBean() {
AggregatorFactoryBean afb = new AggregatorFactoryBean ();
afb.setExpireGroupsUponCompletion(true);
afb.setExpireGroupsUponTimeout(true);
afb.setGroupTimeoutExpression(new ValueExpression<>(this.timeout));
afb.setCorrelationStrategy(new HeaderAttributeCorrelationStrategy("CORRELATION_ID"));
afb.setReleaseStrategy(new MessageCountReleaseStrategy(2));
afb.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
afb.setSendPartialResultOnExpiry(false);
afb.setMessageStore(new SimpleMessageStore());
afb.setDiscardChannel(noResponseChannel());
return afb;
}
@Service
public class MultiPlexGatewayTransmission <T extends EngineData> extends AbstractMultiPlexEngineTransmission {
public MultiPlexGatewayTransmission(MultiPlexGateway gateway) {
super(gateway);
}
@Override
public T request(EngineData request, Class<? extends EngineData> clazz) {
String response = gateway.send(JaxbUtils.marshall(request), request.getApi().getMessageId());
gateway.send(JaxbUtils.marshall(request), request.getApi().getMessageId());
if(response == null || response.isEmpty()) {
return null;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("MPGateway response ::: " + response.trim());
}
@SuppressWarnings("unchecked")
T clientResponse = (T) JaxbUtils.unmarshall(response, clazz);
if (LOGGER.isDebugEnabled()) {
// LOGGER.debug("*** Unmarshall response ::: " + clientResponse);
}
return clientResponse;
}
测试用例:
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("test")
public class ITGetClientsTest extends AbstractEngineTest {
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
// @Autowired
// private GatewayTransmission<ClientsResponse> transmission;
@Autowired
private MultiPlexGatewayTransmission<ClientsResponse> transmission;
@Test
public void testGetClients() {
LOGGER.info("Gateway test testGetClients... ");
Api api = new Api();
api.setIp("192.168.1.1");
api.setMessageId(UUID.randomUUID().toString());
api.setVersion("1.0");
api.setUserToken(token);
ClientsRequest request = new ClientsRequest();
request.setApi(api);
ClientsResponse response = (ClientsResponse) transmission.request(request, ClientsResponse.class);
Assert.assertTrue(response != null);
Assert.assertTrue(!response.getClient().isEmpty());
LOGGER.info(Arrays.deepToString(response.getClient().toArray()));
}
}
我没有详细看你的代码;现在已经很晚了,而且是一个周末,但是请参阅下面的答案,了解使用入站/出站连接ID来关联请求/应答的更简单的技术。
如何通过注释而不是常规配置文件配置入站通道适配器?我可以为会话工厂定义bean,如下所示: 如何配置通过注释下给出的入站通道适配器? 我正在寻找的是在应用程序启动时连接所有bean,然后公开一些方法来开始轮询服务器,处理它们,然后从本地删除它们,类似于 其中getPollableChannel()为我提供了用于轮询的bean。
我发现了一个xml配置的入站适配器示例,但我并不完全理解。配置指定REST请求设置请求方法、使用的格式等。 我认为,从Spring集成的角度来看,响应应该更加重要,因为响应实际上是为消息通道提供信息的。我说得对吗? HTTP入站适配器用作消息endpoint(实际上是消息起始点),它调用HTTP请求,例如REST服务的URL。”http://myRest/transfer/next“-向SI消息通
我正在重构一个传统的基于Spring Batch XML的应用程序,以使用注释配置。我想了解如何将以下XML文件转换为基于注释的配置,并保持相同的关注分离。 为了便于讨论,这里有一个简单的例子。 job-config-1.xml job-config-2.xml job-config-3。xml 我想从XML配置转移到Java配置。我想为每个XML创建3个作业配置类。比如说JobConfig1。j
我想根据“配置文件”设置注释的值。 让我举个例子来解释; 在上面的例子中,我们可以看到活动的“配置文件”是PROD,但是假设我们想要使用DEV配置文件,我们将不得不注释来自PROD的@Table注释,并取消注释DEV@Table注释。 如果这只针对一个实体,那不会是一个问题,但我有很多实体都有这种情况,所以我不认为这是处理这种即兴“简介”的方式。 你知道有什么办法可以解决这种情况吗?
问题内容: 为Java 6注释处理器设置Eclipse项目编译器配置的最佳方法是什么? 我的解决方案是手动设置和文件。这有点麻烦: 在factorypath文件中引用处理器jar 配置蚀注解处理器输出目录在属性) 将Eclipse注释处理器输出目录添加为源文件夹 一个问题是,Eclipse生成的源将使用maven进行编译。Only 是可靠的,因为它删除了Eclipse生成的源文件。(Eclipse
我有两个片段:(1)图书馆片段,(2)书片段 图书馆碎片通过RecyclerView显示所有可用的书籍。用户可以在每个RecyclerView项目上设置标签,这将把LiveData设置为相应的图书。同时,书籍片段将被打开,并显示该书的内容。 我在ViewHolder类中设置了一个onClickListener,它位于图书馆片段的RecyclerView. Adapter中。因此,当单击一个项目时,