我正在尝试返回一个流操作的结果,在本例中为:
其表示为:
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
若要访问我使用的值,请执行以下操作
final AtomicInteger returnValue = new AtomicInteger();
紧随其后的是:
.to(Sink.foreach(x -> {
returnValue.set(x);
System.out.println("got: " + x);
}))
这需要一个阻塞调用来允许流完成,这是不可接受的:
Thread.sleep(2000);
如果我使用:
CompletableFuture<Object> futureValue =
ask(actorRef, Done.done(), Duration.ofMillis(5000)).toCompletableFuture();
System.out.println(futureValue.toCompletableFuture().get().toString());
返回错误:
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://StreamsExamples/system/Materializers/StreamSupervisor-0/$$a-actorRefSource#1663100910]] after [5000 ms]. Message of type [akka.Done$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
在本例中,收件人执行元是源,并在done.done
消息中返回以下内容:
return Optional.of(CompletionStrategy.immediately());
Akka流可以用来返回流中的计算值吗?唯一的替代方法是将计算值存储在DB中,或者当在以下位置计算值时将其发送到Kafka主题:
.to(Sink.foreach(x -> {
?
完成SRC:
import akka.Done;
import akka.actor.ActorRef;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
public class GetStreamValue {
final static akka.actor.ActorSystem system = akka.actor.ActorSystem.create("StreamsExamples");
public static void main(String args[]) throws InterruptedException, ExecutionException {
int bufferSize = 100;
final Source<Integer, ActorRef> source =
Source.actorRef(
elem -> {
// complete stream immediately if we send it Done
if (elem == Done.done()) {
return Optional.of(CompletionStrategy.immediately());
}
else {
return Optional.empty();
}
},
// never fail the stream because of a message
elem -> Optional.empty(),
bufferSize,
OverflowStrategy.dropHead());
final AtomicInteger returnValue = new AtomicInteger();
final ActorRef actorRef = source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.to(Sink.foreach(x -> {
returnValue.set(x);
System.out.println("got: " + x);
}))
.run(system);
Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());
Thread.sleep(2000);
System.out.println("returnValue is "+returnValue);
}
}
我想你可能缺少的是理解Akka Streams中物化价值的概念。浏览文档的这一部分,特别是关于组合物化值的部分。我也尝试在这里解释这个概念(物化价值搜索)。如果你追求物化的价值,那么也许我在这里写的东西会更有意义。
调用source.actorref(..)
返回source
,其中T是流经流的元素的数据类型(在您的示例中是integer
),而actorref
是该source
的物化值。当对runnablegraph
调用run
时,您可以同步获得物化值,这是to(...)
call返回的值。
actorref
是如何按照source.actorref(...)
语义“驱动”流的。
现在的问题是如何获得通过流的数据。在您的示例中,将所有整数
缩减为一个,因此可以使用sink.head,而不是使用sink.foreach(...)
(这对副作用有好处)。您看到,sink
还可以生成物化值,对于sink.head
来说,它物化为流中第一个元素的completionstage
,在您的情况下,该元素是唯一的元素。让我们试试:
final ActorRef actorRef = source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.to(Sink.head())
.run(system);
好吧,那没有太大帮助。您仍然只能获得源
的物化值。要获得接收器
的物化值,我们需要显式地请求它:
final Pair<ActorRef, CompletionStage<Integer>> matVals =
source
.fold(0, (aggr, next) -> aggr + next)
.map(x -> x * x)
.map(x -> x * x)
.toMat(Sink.head(), Keep.both())
.run(system);
现在,我们得到了源
和宿
的物化值。您可以像前面一样通过actorref
驱动流:
final ActorRef actorRef = matVals.first();
Arrays.asList(1, 2, 3).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
Arrays.asList(1,2).forEach(i -> actorRef.tell(i, ActorRef.noSender()));
actorRef.tell(Done.done(), ActorRef.noSender());
您还可以使用completablestage
API从流中获取值。像这样说:
Integer folded = matVals.second().toCompletableFuture().join();
是的,这是阻塞,但您需要在流运行到完成之前以某种方式阻止主线程完成。
我正在学习与Akka溪流一起工作,并且真的很喜欢它,但是物化部分对我来说仍然有些神秘。 引用自http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/client-side/host-level.html#host-level-api ...通过对池客户端流具体化到的HostConnectionPo
使用指南 - 数据报告 - 流量分析 - 访问时长的计算 访问时长指访客每次在网站访问所停留的时长,即从进入第一个页面到离开最后一个页面的时长。 在传统统计工具下,最后一个页面的关闭时间很难得到,百度统计在技术上进行了升级,能够获取到该页面的关闭时间。 然而用户行为具有多样性,当用户快速关闭浏览器、长时间未对页面进行操作或其它网络原因导致的时候,系统会无法获取到页面的关闭信息,从而使最后一个页面的
当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。
准备数据: CREATE TABLE t1 (year YEAR(4), month INT(2) UNSIGNED ZEROFILL, day INT(2) UNSIGNED ZEROFILL); INSERT INTO t1 VALUES(2000,1,1),(2000,1,20),(2000,1,30),(2000,2,2), (2000,2
我不确定这是否是正确的方法,或者即使这是一个好的方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部流式处理所有内容。 有什么想法吗?
问题内容: 我正在为用户帖子构建一个访客计数系统,以显示首页上浏览量最高的帖子。我现在有一个访客计数系统,但是所有页面刷新后都会注册一个视图。我无法使用Google Analytics(分析)。 我需要的是一个访客计数器,该计数器只计算不重复的访客。就我而言,独特意味着一个人一天只能浏览一个帖子?我认为甚至一个星期都可以。你可以在这里写那个php代码吗?如果您也喜欢的话,也可以给我链接一些优秀的教