当前位置: 首页 > 知识库问答 >
问题:

如何在将数据发送到另一个应用程序时实施重试策略?

潘嘉颖
2023-03-14

我正在开发将数据发送到zeromq的应用程序。以下是我的应用程序的功能:

  • 我有一个类SendToZeroMQ,它将数据发送到zeromq

这个想法非常简单,我必须确保我的重试策略工作正常,这样我就不会丢失数据。这是非常罕见的,但如果我们没有收到确认。

我正在考虑构建两种类型的RetryPolicies,但我无法理解如何在此处构建与我的程序相对应的:

  • RetryNTimes:在这种情况下,它将在每次重试之间的特定睡眠中重试N次,之后,它将删除记录。
  • ExponentialBackoffRetry:在这种情况下,它将指数级地继续重试。我们可以设置一些最大重试限制,之后它不会重试并会删除记录。

下面是我的SendToZeroMQ类,该类将数据发送到zeromq,还可以从后台线程每隔30秒重试一次,并启动responseplurrunnable,它将永远运行:

public class SendToZeroMQ {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
  private final Cache<Long, byte[]> retryQueue =
      CacheBuilder
          .newBuilder()
          .maximumSize(10000000)
          .concurrencyLevel(200)
          .removalListener(
              RemovalListeners.asynchronous(new CustomListener(), executorService)).build();

  private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
  }

  public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;
  }

  private SendToZeroMQ() {
    executorService.submit(new ResponsePoller());
    // retry every 30 seconds for now
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        for (Entry<Long, byte[]> entry : retryQueue.asMap().entrySet()) {
          sendTo(entry.getKey(), entry.getValue());
        }
      }
    }, 0, 30, TimeUnit.SECONDS);
  }

  public boolean sendTo(final long address, final byte[] encodedRecords) {
    Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }
    return sendTo(address, encodedRecords, liveSockets.get().getSocket());
  }

  public boolean sendTo(final long address, final byte[] encodedByteArray, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedByteArray);
    boolean sent = msg.send(socket);
    msg.destroy();
    // adding to retry queue
    retryQueue.put(address, encodedByteArray);
    return sent;
  }

  public void removeFromRetryQueue(final long address) {
    retryQueue.invalidate(address);
  }
}

下面是我的应答器类,它轮询来自zeromq的所有确认。如果我们从zeromq获得确认,那么我们将从重试队列中删除该记录,这样它就不会被重试,否则它将被重试。

public class ResponsePoller implements Runnable {
  private static final Random random = new Random();

  @Override
  public void run() {
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PULL);
    String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
    client.setIdentity(identity.getBytes(ZMQ.CHARSET));
    client.bind("tcp://" + TestUtils.getIpaddress() + ":8076");

    PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};

    while (!Thread.currentThread().isInterrupted()) {
      // Tick once per second, pulling in arriving messages
      for (int centitick = 0; centitick < 100; centitick++) {
        ZMQ.poll(items, 10);
        if (items[0].isReadable()) {
          ZMsg msg = ZMsg.recvMsg(client);
          Iterator<ZFrame> it = msg.iterator();
          while (it.hasNext()) {
            ZFrame frame = it.next();
            try {
                long address = TestUtils.getAddress(frame.getData());
                // remove from retry queue since we got the acknowledgment for this record
                SendToZeroMQ.getInstance().removeFromRetryQueue(address);               
            } catch (Exception ex) {
                // log error
            } finally {
              frame.destroy();
            }
          }
          msg.destroy();
        }
      }
    }
    ctx.destroy();
  }
}

问题:

正如您在上面看到的,我正在使用SendToZeroMQ类将codedRecords发送到zeromq,然后根据我们是否从Response sePoller类返回了acknolWedsion,每30秒重试一次。

对于每个codedRecords,都有一个名为address的唯一密钥,这就是我们将从zeromq中获得的确认。

我如何继续扩展这个示例以构建我上面提到的两个重试策略,然后我可以选择我想在发送数据时使用的重试策略。我想出了下面的接口,但后来我无法理解我应该如何实现这些重试策略并在我上面的代码中使用它。

public interface RetryPolicy {
    /**
     * Called when an operation has failed for some reason. This method should return
     * true to make another attempt.
     */
    public boolean allowRetry(int retryCount, long elapsedTimeMs);
}

我可以在这里使用番石榴重试或故障保护,因为这些库已经有很多重试策略,我可以使用吗?


共有3个答案

鄢晔
2023-03-14

这是对您的环境的一个有效的小模拟,展示了如何做到这一点。请注意,这里的Guava缓存是错误的数据结构,因为您对驱逐不感兴趣(我认为)。所以我使用了并发hashmap:

package experimental;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

class Experimental {
  /** Return the desired backoff delay in millis for the given retry number, which is 1-based. */
  interface RetryStrategy {
    long getDelayMs(int retry);
  }

  enum ConstantBackoff implements RetryStrategy {
    INSTANCE;
    @Override
    public long getDelayMs(int retry) {
      return 1000L;
    }
  }

  enum ExponentialBackoff implements RetryStrategy {
    INSTANCE;
    @Override
    public long getDelayMs(int retry) {
      return 100 + (1L << retry);
    }
  }

  static class Sender {
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
    private final ConcurrentMap<Long, Retrier> pending = new ConcurrentHashMap<>();

    /** Send the given data with given address on the given socket. */
    void sendTo(long addr, byte[] data, int socket) {
      System.err.println("Sending " + Arrays.toString(data) + "@" + addr + " on " + socket);
    }

    private class Retrier implements Runnable {
      private final RetryStrategy retryStrategy;
      private final long addr;
      private final byte[] data;
      private final int socket;
      private int retry;
      private Future<?> future; 

      Retrier(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {
        this.retryStrategy = retryStrategy;
        this.addr = addr;
        this.data = data;
        this.socket = socket;
        this.retry = 0;
      }

      synchronized void start() {
        if (future == null) {
          future = executorService.submit(this);
          pending.put(addr, this);
        }
      }

      synchronized void cancel() {
        if (future != null) {
          future.cancel(true);
          future = null;
        }
      }

      private synchronized void reschedule() {
        if (future != null) {
          future = executorService.schedule(this, retryStrategy.getDelayMs(++retry), MILLISECONDS);
        }
      }

      @Override
      synchronized public void run() {
        sendTo(addr, data, socket);
        reschedule();
      }
    }

    long getVerifiedAddr() {
      System.err.println("Pending messages: " + pending.size());
      Iterator<Long> i = pending.keySet().iterator();
      long addr = i.hasNext() ? i.next() : 0;
      return addr;
    }

    class CancellationPoller implements Runnable {
      @Override
      public void run() {
        while (!Thread.currentThread().isInterrupted()) {
          try {
            Thread.sleep(1000);
          } catch (InterruptedException ex) { 
            Thread.currentThread().interrupt();
          }
          long addr = getVerifiedAddr();
          if (addr == 0) {
            continue;
          }
          System.err.println("Verified message (to be cancelled) " + addr);
          Retrier retrier = pending.remove(addr);
          if (retrier != null) {
            retrier.cancel();
          }
        }
      }
    }

    Sender initialize() {
      executorService.submit(new CancellationPoller());
      return this;
    }

    void sendWithRetriesTo(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {
      new Retrier(retryStrategy, addr, data, socket).start();
    }
  }

  public static void main(String[] args) {
    Sender sender = new Sender().initialize();
    for (long i = 1; i <= 10; i++) {
      sender.sendWithRetriesTo(ConstantBackoff.INSTANCE, i, null, 42);
    }
    for (long i = -1; i >= -10; i--) {
      sender.sendWithRetriesTo(ExponentialBackoff.INSTANCE, i, null, 37);
    }
  }
}
傅阳
2023-03-14

这不是一个完美的方法,但也可以通过下面的方法来实现。

public interface RetryPolicy {
public boolean allowRetry();
public void decreaseRetryCount();

}

创建两个实现。重复

public class RetryNTimes implements RetryPolicy {

private int maxRetryCount;
public RetryNTimes(int maxRetryCount) {
    this.maxRetryCount = maxRetryCount;
}

public boolean allowRetry() {
    return maxRetryCount > 0;
}

public void decreaseRetryCount()
{
    maxRetryCount = maxRetryCount-1;
}}

对于指数备份重试

public class ExponentialBackoffRetry implements RetryPolicy {

private int maxRetryCount;
private final Date retryUpto;

public ExponentialBackoffRetry(int maxRetryCount, Date retryUpto) {
    this.maxRetryCount = maxRetryCount;
    this.retryUpto = retryUpto;
}

public boolean allowRetry() {
    Date date = new Date();
    if(maxRetryCount <= 0 || date.compareTo(retryUpto)>=0)
    {
        return false;
    }
    return true;
}

public void decreaseRetryCount() {
    maxRetryCount = maxRetryCount-1;
}}

您需要在SendToZeroMQ类中进行一些更改

public class SendToZeroMQ {

private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
private final Cache<Long,RetryMessage> retryQueue =
        CacheBuilder
                .newBuilder()
                .maximumSize(10000000)
                .concurrencyLevel(200)
                .removalListener(
                        RemovalListeners.asynchronous(new CustomListener(), executorService)).build();

private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
}

public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;
}

private SendToZeroMQ() {
    executorService.submit(new ResponsePoller());
    // retry every 30 seconds for now
    executorService.scheduleAtFixedRate(new Runnable() {
        public void run() {
            for (Map.Entry<Long, RetryMessage> entry : retryQueue.asMap().entrySet()) {
                RetryMessage retryMessage = entry.getValue();
                if(retryMessage.getRetryPolicy().allowRetry())
                {
                    retryMessage.getRetryPolicy().decreaseRetryCount();
                    entry.setValue(retryMessage);
                    sendTo(entry.getKey(), retryMessage.getMessage(),retryMessage);

                }else
                {
                    retryQueue.asMap().remove(entry.getKey());
                }
            }
        }
    }, 0, 30, TimeUnit.SECONDS);
}



public boolean sendTo(final long address, final byte[] encodedRecords, RetryMessage retryMessage) {
    Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
        return false;
    }
    if(null==retryMessage)
    {
        RetryPolicy retryPolicy = new RetryNTimes(10);
        retryMessage = new RetryMessage(retryPolicy,encodedRecords);
        retryQueue.asMap().put(address,retryMessage);
    }
    return sendTo(address, encodedRecords, liveSockets.get().getSocket());
}

public boolean sendTo(final long address, final byte[] encodedByteArray, final ZMQ.Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedByteArray);
    boolean sent = msg.send(socket);
    msg.destroy();
    return sent;
}

public void removeFromRetryQueue(final long address) {
    retryQueue.invalidate(address);
}}
蓬英逸
2023-03-14

我无法计算出有关如何使用相关API-s的所有细节,但关于算法,您可以尝试:

  • 重试策略需要为每条消息附加某种状态(至少当前消息被重试的次数,可能是当前延迟)。您需要决定RetryPolicy是否应该保留它本身,或者是否要将其存储在消息中。
  • 您可以有一个html" target="_blank">方法来计算下一次重试何时应该发生(以绝对时间或未来的毫秒数),这将是上述状态的函数
  • 重试队列应包含有关何时重试每条消息的信息。
  • 而不是使用schduleAtFixedrate,在重试队列中找到具有最低when_is_next_retry的消息(可能通过对绝对重试时间戳进行排序并选择第一个),并让执行器Service使用调度time_to_next_retry
  • 重新安排自己
  • 对于每次重试,将其从重试队列中拉出来,发送消息,使用RetryPolicy计算下一次重试应该是什么时候(如果要重试)并用when_is_next_retry的新值插入回重试队列中(如果RetryPolicy返回-1,则可能意味着不再重试消息)
 类似资料:
  • 我使用带有单个文件组件的网页包。 我的菜单标题中有一个Vue实例,用于显示购物车下拉列表: 我在同一页面中有另一个Vue实例(产品目录): 我想$emit一个事件从一个实例到另一个实例:当目录更改时,我想调用ShoppingCart中的函数。 我测试eventHub: 所以我在每个实例上导入事件: 目录中: 在购物车: 但这行不通。仅当$on和$emit位于Vue的同一实例中时,它才起作用。 我认

  • 你好,我想把数据从一个html页面发送到另一个html页面 这是我的index.html页面 这是newpage.html:

  • 我需要将数据从片段发送到另一个活动 我在Homeactive下的LoadsFraank中使用此代码 在另一个活动(LoadActivity)中接收数据 但是意图没有附加条件 请看下面的截图

  • 我有一个应用程序,可以接受一个POST请求。为了请求,我需要定义一些标题、边界和连字符。换句话说,我需要制定完整的请求。我成功地使用了HttpURLConnection。现在,我想从我的Spring应用程序中请求应用程序。 比如说,我有三个应用程序A(传感器)、B(Spring)和C(服务器)。 在这种情况下,B将充当网桥,从a接收请求,对其进行身份验证并将其发送给C。 我不想在Spring中再次

  • 我正在开发两个应用程序。让第一个应用程序是APP1,第二个应用程序为APP2。现在在APP1中,我不提供任何用户权限,如INTERNET权限,但它将发送任何http url,如http://www.google.com我的第二个APP2将包含INTERNET等用户权限。Http请求将从APP1发送到APP2,APP2将响应该请求,然后将结果发送回APP1。最后APP1包含一个Web视图以显示结果。

  • 问题内容: 我正在创建一个Web应用程序,并且很好奇如何在其中将数据发送到MySQL数据库。我有一个在用户按下按钮时调用的函数,我希望该函数以某种方式将数据发送到MySQL服务器。有谁知道如何解决这个问题?我尝试了npm MySQL模块,但似乎连接不正确,因为它是客户端。还有其他方法吗?我需要一个主意才能开始。 问候 问题答案: 您将需要一个服务器来处理来自React应用程序的请求并相应地更新数据