我目前正在学习Scala
消费者应能够处理以下任务:
我找到了一个非常好的文档,用Java创建这个消费者(https://cwiki.apache.org/confluence/display/KAFKA/0.8.0SimpleConsumer示例)。
有没有人有一个创建这个simpleconsumer的示例Scala代码,或者如果您可以参考一些文档,为我指出正确的方向,我将不胜感激。
我使用scala构建了一个简单的kafka消费者和生产者。
消费者:
package com.kafka
import java.util.concurrent._
import java.util.{Collections, Properties}
import com.sun.javafx.util.Logging
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import scala.collection.JavaConversions._
class Consumer(val brokers: String,
val groupId: String,
val topic: String) extends Logging {
val props = createConsumerConfig(brokers, groupId)
val consumer = new KafkaConsumer[String, String](props)
var executor: ExecutorService = null
def shutdown() = {
if (consumer != null)
consumer.close()
if (executor != null)
executor.shutdown()
}
def createConsumerConfig(brokers: String, groupId: String): Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props
}
def run() = {
consumer.subscribe(Collections.singletonList(this.topic))
Executors.newSingleThreadExecutor.execute(new Runnable {
override def run(): Unit = {
while (true) {
val records = consumer.poll(1000)
for (record <- records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset())
}
}
}
})
}
}
object Consumer extends App{
val newArgs = Array("localhost:9092", "2","test")
val example = new Consumer(newArgs(0), newArgs(1), newArgs(2))
example.run()
}
制作人:
package com.kafka
import java.util.{Date, Properties}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
object Producer extends App{
val newArgs = Array("20","test","localhost:9092")
val events = newArgs(0).toInt
val topic = newArgs(1)
val brokers = newArgs(2)
val props = new Properties()
props.put("bootstrap.servers", brokers)
props.put("client.id", "producer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val t = System.currentTimeMillis()
for (nEvents <- Range(0, events)) {
val key = "messageKey " + nEvents.toString
val msg = "test message"
val data = new ProducerRecord[String, String](topic, key, msg)
//async
//producer.send(data, (m,e) => {})
//sync
producer.send(data)
}
System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t))
producer.close()
}
下面是用Scala编写的简单Kafka消费者的示例代码。经过几次尝试和错误后,它开始工作。
package com.Kafka.Consumer
import kafka.api.FetchRequest
import kafka.api.FetchRequestBuilder
import kafka.api.PartitionOffsetRequestInfo
import kafka.common.ErrorMapping
import kafka.common.TopicAndPartition
import kafka.javaapi._
import kafka.javaapi.consumer.SimpleConsumer
import kafka.message.MessageAndOffset
import java.nio.ByteBuffer
import java.util.ArrayList
import java.util.Collections
import java.util.HashMap
import java.util.List
import java.util.Map
import SimpleExample._
//remove if not needed
import scala.collection.JavaConversions._
object SimpleExample {
def main(args: Array[String]) {
val example = new SimpleExample()
val maxReads = java.lang.Integer.parseInt(args(0))
val topic = args(1)
val partition = java.lang.Integer.parseInt(args(2))
val seeds = new ArrayList[String]()
seeds.add(args(3))
val port = java.lang.Integer.parseInt(args(4))
try {
example.run(maxReads, topic, partition, seeds, port)
} catch {
case e: Exception => {
println("Oops:" + e)
e.printStackTrace()
}
}
}
def getLastOffset(consumer: SimpleConsumer,
topic: String,
partition: Int,
whichTime: Long,
clientName: String): Long = {
val topicAndPartition = new TopicAndPartition(topic, partition)
val requestInfo = new HashMap[TopicAndPartition, PartitionOffsetRequestInfo]()
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1))
val request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion, clientName)
val response = consumer.getOffsetsBefore(request)
if (response.hasError) {
println("Error fetching data Offset Data the Broker. Reason: " +
response.errorCode(topic, partition))
return 0
}
val offsets = response.offsets(topic, partition)
offsets(0)
}
}
class SimpleExample {
private var m_replicaBrokers: List[String] = new ArrayList[String]()
def run(a_maxReads: Int,
a_topic: String,
a_partition: Int,
a_seedBrokers: List[String],
a_port: Int) {
val metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition)
if (metadata == null) {
println("Can't find metadata for Topic and Partition. Exiting")
return
}
if (metadata.leader == null) {
println("Can't find Leader for Topic and Partition. Exiting")
return
}
var leadBroker = metadata.leader.host
val clientName = "Client_" + a_topic + "_" + a_partition
var consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName)
var readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime, clientName)
var numErrors = 0
//while (a_maxReads > 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName)
}
val req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset,
100000)
.build()
val fetchResponse = consumer.fetch(req)
if (fetchResponse.hasError) {
numErrors += 1
val code = fetchResponse.errorCode(a_topic, a_partition)
println("Error fetching data from the Broker:" + leadBroker +
" Reason: " +
code)
if (numErrors > 5) //break
if (code == ErrorMapping.OffsetOutOfRangeCode) {
readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime, clientName)
//continue
}
consumer.close()
consumer = null
leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port)
//continue
}
numErrors = 0
var numRead = 0
for (messageAndOffset <- fetchResponse.messageSet(a_topic, a_partition)) {
val currentOffset = messageAndOffset.offset
if (currentOffset < readOffset) {
println("Found an old offset: " + currentOffset + " Expecting: " +
readOffset)
//continue
}
readOffset = messageAndOffset.nextOffset
val payload = messageAndOffset.message.payload
val bytes = Array.ofDim[Byte](payload.limit())
payload.get(bytes)
println(String.valueOf(messageAndOffset.offset) + ": " + new String(bytes, "UTF-8"))
numRead += 1
// a_maxReads -= 1
}
if (numRead == 0) {
try {
Thread.sleep(1000)
} catch {
case ie: InterruptedException =>
}
}
//}
if (consumer != null) consumer.close()
}
private def findNewLeader(a_oldLeader: String,
a_topic: String,
a_partition: Int,
a_port: Int): String = {
for (i <- 0 until 3) {
var goToSleep = false
val metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition)
if (metadata == null) {
goToSleep = true
} else if (metadata.leader == null) {
goToSleep = true
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader.host) && i == 0) {
goToSleep = true
} else {
return metadata.leader.host
}
if (goToSleep) {
try {
Thread.sleep(1000)
} catch {
case ie: InterruptedException =>
}
}
}
println("Unable to find new leader after Broker failure. Exiting")
throw new Exception("Unable to find new leader after Broker failure. Exiting")
}
private def findLeader(a_seedBrokers: List[String],
a_port: Int,
a_topic: String,
a_partition: Int): PartitionMetadata = {
var returnMetaData: PartitionMetadata = null
for (seed <- a_seedBrokers) {
var consumer: SimpleConsumer = null
try {
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup")
val topics = Collections.singletonList(a_topic)
val req = new TopicMetadataRequest(topics)
val resp = consumer.send(req)
val metaData = resp.topicsMetadata
for (item <- metaData; part <- item.partitionsMetadata){
if (part.partitionId == a_partition) {
returnMetaData = part
//break
}
}
} catch {
case e: Exception => println("Error communicating with Broker [" + seed + "] to find Leader for [" +
a_topic +
", " +
a_partition +
"] Reason: " +
e)
} finally {
if (consumer != null) consumer.close()
}
}
if (returnMetaData != null) {
m_replicaBrokers.clear()
for (replica <- returnMetaData.replicas) {
m_replicaBrokers.add(replica.host)
}
}
returnMetaData
}
}
我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?
我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我
我是Kafka的新手,运行一个简单的Kafka消费者/生产者的例子,就像在Kafka消费者和KafkaProducer上给出的那样。当我从终端运行消费者时,消费者正在接收消息,但我不能使用Java代码监听。我也在StackoverFlow上搜索了类似的问题(链接: Link1,Link2),并尝试了解决方案,但似乎没有什么对我有用。kafka版本:和相应的maven依赖在pom中使用。 Java生
null 我在这一页上读到以下内容: 使用者从任何单个分区读取,允许您以与消息生成类似的方式扩展消息消耗的吞吐量。 也可以将使用者组织为给定主题的使用者组-组内的每个使用者从唯一分区读取,并且组作为一个整体使用来自整个主题的所有消息。 如果使用者多于分区,则某些使用者将空闲,因为它们没有可从中读取的分区。 如果分区多于使用者,则使用者将从多个分区接收消息。 如果使用者和分区的数量相等,则每个使用者
我刚刚开始玩弄《Spring-Cloud-Stream》中的Kafka活页夹。 我配置了一个简单的消费者: 但当我启动应用程序时,我看到在启动日志中创建了三个独立的消费者配置: 我发现这些配置之间唯一不同的是客户机。id。 除此之外,我不知道为什么只有一个消费者有三种配置。 是因为我也在运行吗? 这是我的:
假设我有一个服务,它通过kafka-rest-proxy来消费消息,并且总是在同一个消费者组上。我们还可以说,它正在消耗一个有一个分区的主题。当服务启动时,它在kafka-rest-proxy中创建一个新的使用者,并使用生成的使用者url,直到服务关闭。当服务重新启动时,它将在kafka-rest-proxy中创建一个新的消费者,并使用新的url(和新的消费者)进行消费。 > 因为kafka每个分