我想有一个客户端应用程序与请求/响应语义学调用另一个应用程序,这是一个Kafka流应用程序。
我的客户端应用程序基于此示例(基本上没有变化)。我需要从客户端接收消息的应用程序是Kafka Streams应用程序。但是包含相关id的消息头丢失。
Kafka Streams应用程序是一个简单的拓扑结构,用于测试此。。。
@Bean
public KafkaStreams stream(KafkaStreamsConfiguration kafkaStreamsConfiguration) {
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(REQUEST_TOPIC_NAME)
.groupByKey()
.count()
.toStream()
.mapValues((ValueMapper<Long, String>)String::valueOf)
.to(REPLY_TOPIC_NAME);
return new KafkaStreams(builder.build(), kafkaStreamsConfiguration.asProperties());
}
对于这个POC,我保持它的简单性,让客户机和服务器“同意”主题名称(kRequests
和kReplies
)。所以在这一点上,我只想得到要识别并返回的相关id。
我现在看到的是
2019-10-01 10:55:38.792 WARN 76830 --- [TaskScheduler-1] o.s.k.r.ReplyingKafkaTemplate : Reply timed out for: ProducerRecord(topic=kRequests, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic, value = [107, 82, 101, 112, 108, 105, 101, 115]), RecordHeader(key = kafka_correlationId, value = [101, -4, -35, 41, -127, -66, 69, 37, -117, -127, -95, -92, 38, 79, 73, 127])], isReadOnly = true), key=null, value=foo21074, timestamp=null) with correlationId: [135564972083657938538225367552235620735]
2019-10-01 10:55:38.792 ERROR 76830 --- [TaskScheduler-1] org.KRequestingApplication : Reply timed out
org.springframework.kafka.KafkaException: Reply timed out
at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$0(ReplyingKafkaTemplate.java:257) ~[spring-kafka-2.2.8.RELEASE.jar:2.2.8.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_211]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_211]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_211]
在超时时间内,回复主题上没有具有匹配相关性id的消息。似乎至少使用Kafka Streams DSL无法支持回复KafkaTemplate
。
你的场景没有意义;您的KStream正在对多个输入进行分组;请求/回复为1请求1回复。
这个很好用。。。
@SpringBootApplication
@EnableKafkaStreams
public class So58193901Application {
private static final String REQUEST_TOPIC_NAME = "requests";
private static final String REPLY_TOPIC_NAME = "replies";
public static void main(String[] args) {
SpringApplication.run(So58193901Application.class, args);
}
@Bean
public KStream<byte[], byte[]> stream(StreamsBuilder builder) {
KStream<byte[], byte[]> stream = builder.stream(REQUEST_TOPIC_NAME);
stream
.mapValues(val -> new String(val).toUpperCase().getBytes())
.to(REPLY_TOPIC_NAME);
return stream;
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name(REQUEST_TOPIC_NAME).partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name(REPLY_TOPIC_NAME).partitions(1).replicas(1).build();
}
@Bean
public ReplyingKafkaTemplate<String, String, String> template(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer = factory.createContainer(REPLY_TOPIC_NAME);
return new ReplyingKafkaTemplate<>(pf, replyContainer);
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
System.out.println(template.sendAndReceive(new ProducerRecord<>(REQUEST_TOPIC_NAME, "bar", "foo"))
.get(10, TimeUnit.SECONDS).value());
};
}
}
英文原文:http://emberjs.com/guides/application/the-application-template/ 应用模板是应用启动的时候默认渲染的模板。 你应该把你的header、footer和其他装饰性的内容放在应用模板里面。另外,应用模版中至少需要一个{{outlet}}占位符,以便路由能根据当前的URL将适当的模版渲染进来。 下面是一个应用模板的例子: 1 2 3
介绍 TensorFlow template application表示通用的TensorFlow应用代码,用户可以直接使用这些模板而不需要编写TensorFlow应用代码。 用户训练数据一般都是稠密的CSV格式,或稀疏的LIBSVM格式或图片,这些数据都可以转成TFRecords,模型本身则可以使用代码生成,通过不同的超参数组合可以实现通过的TensorFlow应用,用户直接下载模板或者使用Xi
问题内容: 我了解C 中模板的方面与Java和C#中的泛型不同。C#是一种形式,Java使用类型擦除,C 使用鸭子类型,等等。C 模板可以做很多事情,而Java和C#泛型则做不到(例如,模板专业化)。但是 Java泛型可以做很多事情,而C#和C ++则做不到(例如,使泛型族的有界类型参数成为现实 ),而 C#泛型可以做的很多事情Java和C 不能做(例如运行时通用反射)。 [编辑:显然Java泛型
问题内容: 是否有Flask或Jinja2配置标记/扩展名,可在呈现模板后自动最小化HTML输出? 问题答案: 找到了一种更好的方法。你可以使用以下方法缩小所有页面:
我的机器使用Rails 5。因为一些业务,我必须创建新的rails 4应用程序。我使用以下语法生成rails 4应用程序: 然而,新的应用程序结构是为rails 5构建的。我的问题是:我如何使用rails 5来生成rails 4结构应用程序。 谢谢
模板提供了一种简便的方式,将展现逻辑从控制器和业务逻辑中分离出来。 一般来说,模板包含应用程序的 HTML 代码,但也可以使用其他的格式,例如 XML 。 模板通常也被称为「视图」, 而它是 模型-视图-控制器 (MVC) 软件架构模式第二个元素的 一部份 。