事实上,直到现在我还没有成功。您能帮助我,请提供我的详细代码示例,以实现使用Kafka流DSL?
您可以使用处理器API。
您必须实现Transformer接口方法转换:
>
将查找键值,
object SampleApp extends App {
val storeName: String = ???
val builder: StreamsBuilder = new StreamsBuilder()
builder.stream("topicName")(Consumed.`with`(Serdes.String(), Serdes.String()))
.transform[String, String](() => SampleTransformer(storeName))
.to("outputTopic")(Produced.`with`(Serdes.String(), Serdes.String()))
}
case class SampleTransformer(storeName: String)
extends Transformer[String, String, KeyValue[String, String]]
with LazyLogging {
var store: KeyValueStore[String, String] = _
override def init(context: ProcessorContext): Unit = {
super.init(context)
store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[String, String]]
}
override def transform(key: String, newValue: String): String = {
val valueToPass = Option(store.get(key)).map(oldValue => someComputation(oldValue, newValue))
store.put(key, newValue)
valueToPass.orNull
}
def someComputation(oldValue: String, newValue: String): String = ???
override def close(): Unit = {
// Close
}
}
如何在Spark Streaming 2.3.1中把每条记录写到多个kafka主题?换句话说,我得到了5条记录和两个输出kafka主题,我希望这两个输出主题中都有5条记录。 这里的问题不谈论结构化流媒体案例。我正在寻找特定的结构化流媒体。
有没有办法从Kafka主题中删除单个记录?我知道有一个脚本kafka-delete-records.sh删除指定主题和分区上指定偏移量之前的记录,但是我希望能够删除我指定的偏移量上的记录。有办法做到吗? 这不是在Java而是在裸露的Kafka实例上。
,日志记录将进入一个文件; (路径)/service_name/service_name.log 我想用logback复制这种行为,但在logback.xml配置中获取“logger”名称时遇到了真正的困难。它可以在log encoder.pattern中看到,即“%d%-5level%logger{35}-%msg%n”。
我是Hadoop的新手,我的map-reduce代码可以工作,但它不产生任何输出。这里是Map-Reduce的信息: 下面是启动mapreduce作业的代码: 如果有任何帮助,我们将不胜感激。谢谢你
更新:任何人都知道如何强迫另一个流到麦克风音频源。这需要原生android代码。请在这方面帮助我,请参考这个问题,以获得更多关于路由音频的详细信息
问题内容: 有没有办法一次插入多个记录而不是一次插入? 我有一个非常丑陋的耙子任务,正在做以下事情… 这必须非常低效,并且必须有更好的方法… 问题答案: 该方法也将数组作为参数。 但是,它仍然对每个条目执行一个SQL查询,而不是单个SQL查询。它效率更高,因为它只需要在后台创建一个activerecord对象。 如果要同时从同一客户端插入许多行,请使用带有多个VALUES列表的INSERT语句一次