代码如下
for (;;) {
byte[] replyBytes = client.connect()
.flatMap(new Func1<ObservableConnection<byte[], byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(final ObservableConnection<byte[], byte[]> connection) {
connection.writeBytesAndFlush(frame.getDownwardBytes());
return connection.getInput();
}
})
.timeout(1, TimeUnit.SECONDS) // 等待一秒
.retry(3) // 超时后重试3次
.onErrorReturn(new Func1<Throwable, byte[]>() {
@Override
public byte[] call(Throwable throwable) {
logger.error("{}", throwable);
return null;
}
}) // 3次后仍然失败则返回null
.take(1)
.toBlocking()
.first();
if (replyBytes != null) {
<span style="white-space:pre"> </span>//获取数据
} else {
}
}