public class Supervisor extends UntypedActor {
private static SupervisorStrategy strategy =
new OneForOneStrategy(3, Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof Exception) {
return restart();
}else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o), getSelf());
} else {
unhandled(o);
}
}
}
public class Child extends UntypedActor {
public void onReceive(Object o) throws Exception {
if (o instanceof String) {
Object response = someFunction( (String) message);//this function returns either successfull messgae as string or exception
if(response instanceOf Exception) {
throw (Exception) response;
}
else
getSender().tell(response, getSelf())
}else {
unhandled(o);
}
}
}
创作演员:
Props superprops = Props.create(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
child.tell("serVice_url", ActorRef.noSender());
对于service_url,如果出现故障,我希望重复这个过程,但它没有发生。如果在creatng actor中将下一行写为child.tell(“service_url_2”,actorref.nosender());
,那么这一行将被删除,但是我希望在特定的时间(假设3次)内处理相同的操作(发生失败),并且在它们之间有一个定义的间隔。请引导我实现这一点。
我想我已经开发了一个方法。尽管我仍然需要在生产级别上进行测试。我将答案写在下面,因为它可能对试图实现相同目标的人有所帮助。如果有人找到了更好的方法,那么他/她是受欢迎的。这里要提到的是,通过这种方法,Supervisor在一个时间范围内处理特定次数(假设3次)的相同操作(带有发生失败的消息)。我无法定义它们之间的间隔。这是代码。主管班。
public class MyUntypedActor extends UntypedActor {
//here I have given Max no retrilas as 10.I will controll this number from logic as per my own requirements.But user given number of retrials can not exceed 10.
private static SupervisorStrategy strategy = new AllForOneStrategy(10, Duration.create(5, TimeUnit.MINUTES),
new Function<Throwable, SupervisorStrategy.Directive>() {
@Override
public SupervisorStrategy.Directive apply(Throwable t) {
if (t instanceof Exception) {
//System.out.println("exception" + "*****" + t.getMessage() + "***" + t.getLocalizedMessage());
return restart();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o), getSelf());
} else {
unhandled(o);
}
}
}
我们将在其中编写逻辑的子类。
public class Child extends UntypedActor {
//Through preRestart it will push the message for which exception occured before the restart of the child
@Override
public void preRestart(final Throwable reason, final scala.Option<Object> message) throws Exception {
System.out.println("reStarting :::" + message.get());
SetRules.setRemainingTrials(SetRules.remainingTrials + 1);
getSelf().tell(message.get(), getSender());
};
public void onReceive(Object o) throws Exception {
if (o instanceof Exception) {
throw (Exception) o;
} else if (o instanceof Integer) {
} else if (o.equals("get")) {
getSender().tell("get", getSelf());
} else if (o instanceof String) {
try {
// here either we can write our logic directly or for a better
// approach can call a function where the logic will be excuted.
getSender().tell("{\"meggase\":\"Succesfull after " + SetRules.remainingTrials + " retrials\"}",
getSelf());
} catch (Exception ex) {
if (SetRules.remainingTrials == SetRules.noOfRetries) {
getSender().tell("{\"meggase\":\"Failed to connect after " + SetRules.noOfRetries + " retrials\"}",
getSelf());
} else {
Exception value1 = ex;
throw (Exception) value1;
}
}
} else {
unhandled(o);
}
}
}
SetRules类具有关于用户的信息,它提供noOfRetrials,并且还存储关于通过remainingTrials重试的每个状态下的检索次数的信息
public class SetRules {
public static int noOfRetries;
public static int remainingTrials;
public SetRules(int noOfRetries, int remainingTrials) {
super();
SetRules.noOfRetries = noOfRetries;
SetRules.remainingTrials = remainingTrials;
}
public int getRemainingTrials() {
return remainingTrials;
}
public static void setRemainingTrials(int remainingTrials) {
SetRules.remainingTrials = remainingTrials;
}
}
Props superprops = Props.create(MyUntypedActor.class);
SetRules setRules=new SetRules(3,0);
ActorSystem system = ActorSystem.create("helloakka");
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), Duration.create(5, "minutes"));
Future<Object> future = Patterns.ask(child, service_Url, new Timeout(Duration.create(5, "minutes")));
Object result = Await.result(future, Duration.create(5, "minutes"));
System.out.println(result);
在我的JAXWSWeb服务中,当发生异常时,我需要将特定消息发送回客户端,而不是带有异常描述的标准Fault消息。 如何做到这一点? 我使用的是jaxws-rt版本2.1.3 我已经尝试过使用这样的异常映射器,但是没有成功(有些不被调用,这也可能是由于配置错误造成的): 我们使用的服务器是JBoss EAP 6.4。 编辑: 异常映射器方法不适合我的JAXWS Web服务,因为这是用于JAX-RS
问题内容: 我有一个这样的dataFrame,我想每60分钟进行一次分组,然后从06:30开始分组。 我在用: 我得到这个分组: 但我正在寻找这个结果: 我如何告诉该功能以6小时30分开始以一小时为间隔进行分组? 如果 .groupby(pd.TimeGrouper(freq =‘60Min’)) 无法完成此 操作 ,最好的方法是怎么做? 致敬并非常感谢 问题答案: 使用会同中的参数。 指定将使时
至少一次语义:如果生产者从Kafka代理接收到确认(ack),并且acks=all,则表示消息已准确写入Kafka主题一次。但是,如果生产者确认超时或收到错误,它可能会在假定消息未写入Kafka主题的情况下重试发送消息。如果代理在发送ack之前失败,但在消息成功写入Kafka主题之后失败,则此重试会导致消息被写入两次,并因此多次传递给最终使用者。 我知道时间戳是根据消息从生产者发送的时间设置的。如
是否使用@kafkaListener在间隔基中轮询()Kafka消息?如何在特定时间内阻止KafkaListenerEndpointRegistry轮询消息
我正在使用 kafka模板向 kafka 主题发送消息。我遇到了一个要求,如果将消息发送到 kafka 主题时出现故障,那么我应该重试在具有相同偏移量的同一分区上发送消息。请帮助如何使用Kafka模板实现这一点?