我正在通过Kafka快速入门:
http://kafka.apache.org/07/quickstart.html
和基本的消费者群体示例:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer组示例
我已经将消费者和消费者线程池编码如下:
import kafka.consumer.KafkaStream;
import kafka.consumer.ConsumerIterator;
public class Consumer implements Runnable {
private KafkaStream m_stream;
private Integer m_threadNumber;
public Consumer(KafkaStream a_stream, Integer a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext()) {
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
}
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
其他几个方面:我正在使用Spring来管理我的动物园管理员:
import javax.inject.Named;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan("com.truecar.inventory.worker.core")
public class AppConfig {
@Bean
@Named("consumerConfig")
private static ConsumerConfig createConsumerConfig() {
String zookeeperAddress = "127.0.0.1:2181";
String groupId = "inventory";
Properties props = new Properties();
props.put("zookeeper.connect", zookeeperAddress);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
}
我正在用Maven和OneJar Maven插件进行编译。但是,我编译然后运行产生的一个jar,我得到以下错误:
Aug 26, 2013 6:15:41 PM org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider registerDefaultFilters
INFO: JSR-330 'javax.inject.Named' annotation found and supported for component scanning
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: scala/ScalaObject
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
at com.simontuffs.onejar.JarClassLoader.defineClass(JarClassLoader.java:803)
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:710)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2521)
at java.lang.Class.getDeclaredMethods(Class.java:1845)
at org.springframework.core.type.StandardAnnotationMetadata.getAnnotatedMethods(StandardAnnotationMetadata.java:180)
at org.springframework.context.annotation.ConfigurationClassParser.doProcessConfigurationClass(ConfigurationClassParser.java:222)
at org.springframework.context.annotation.ConfigurationClassParser.processConfigurationClass(ConfigurationClassParser.java:165)
at org.springframework.context.annotation.ConfigurationClassParser.parse(ConfigurationClassParser.java:140)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:282)
at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:223)
at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:630)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:461)
at org.springframework.context.annotation.AnnotationConfigApplicationContext.<init>(AnnotationConfigApplicationContext.java:73)
at com.truecar.inventory.worker.core.consumer.ConsumerThreadPool.<clinit>(ConsumerThreadPool.java:31)
at com.truecar.inventory.worker.core.application.Starter.main(Starter.java:20)
... 6 more
Caused by: java.lang.ClassNotFoundException: scala.ScalaObject
at com.simontuffs.onejar.JarClassLoader.findClass(JarClassLoader.java:713)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at com.simontuffs.onejar.JarClassLoader.loadClass(JarClassLoader.java:630)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 27 more
现在,我对Kafka知之甚少,对斯卡拉更是一无所知。我该如何解决这个问题?接下来应该尝试什么?这是已知问题吗?我需要其他依赖吗?以下是我的pom.xml中的Kafka版本:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.0-beta1</version>
</dependency>
更新:我联系了Kafka开发人员邮件列表,他们让我知道scala依赖项的一些特定版本要求。然而,还有一个未记录的log4j依赖项,这会导致另一个运行时(而不是编译时)异常。
Exception in thread "main" java.lang.reflect.InvocationTargetException
Caused by: java.lang.NoSuchMethodError: ch.qos.logback.classic.Logger.filterAndLog(Ljava/lang/String;Lorg/slf4j/Marker;Lch/qos/logback/classic/Level;Ljava/lang/String;[Ljava/lang/Object;Ljava/lang/Throwable;)V
at org.apache.log4j.Category.log(Category.java:333)
at org.apache.commons.logging.impl.Log4JLogger.debug(Log4JLogger.java:177)
另一个更新:
我找到了正确的 log4j 依赖项:
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
但是现在我遇到了一个更加神秘的运行时异常:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
在这一点上,我有了WTF的感觉。所以我添加了另一个依赖项:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
但这暴露了另一个运行时异常:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.simontuffs.onejar.Boot.run(Boot.java:340)
at com.simontuffs.onejar.Boot.main(Boot.java:166)
Caused by: java.lang.NoClassDefFoundError: com/yammer/metrics/core/Gauge
at kafka.consumer.ZookeeperConsumerConnector.createFetcher(ZookeeperConsumerConnector.scala:146)
at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:113)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:64)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
我希望能够启动并运行这个婴儿示例,但也许这是使用beta产品要付出的代价?也许我应该切换到阿帕奇主动MQ。但这听起来不那么有趣。我错过了什么吗?
这似乎行得通:
$ git clone https://github.com/buildlackey/cep
$ cd cep/kafka-0.8.x
$ mvn package
$ mvn exec:java -Dexec.mainClass=TestKafkaProducer
(via哪里可以找到kafka的maven资源库?)
我发现依赖项的配置是有效的:
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>3.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>3.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.0-beta1</version>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
问题是kafka beta的构建方式是使用jar生成的pom无效,maven无法识别它并正确解析,从而获取传递依赖项。我们已经设法通过在pom定义中包含该pom(scala、zk等)中的所有依赖项来缓解这个问题。我们正在等待kafka的下一个beta版本,其中的问题将得到修复。
完整的依赖项列表如下。请注意,您必须根据kafka工件的后缀相应地更改scala版本依赖项。
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.15</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.jopt-simple</groupId>
<artifactId>jopt-simple</artifactId>
<version>3.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.4</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.3</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-annotation</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<version>3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest</artifactId>
<version>1.2</version>
<scope>test</scope>
</dependency>
至于
也许我应该换成Apache Active MQ。但这听起来没那么有趣。我错过了什么吗?
嗯,难道你忘了这是测试版吗?的确,一些不好的事情正在发生,但目前我们正在运行kafka 0.7,没有任何努力。
问题内容: 在这里,它说,“注:意思是‘我不关心这个值’”,但是从JavaScript来了,我不明白是什么意思。 我可以打印这些功能的唯一方法是在参数前使用下划线: 没有下划线,我必须这样写,以避免出现任何错误: 我不理解此下划线用法。我何时,如何以及为什么使用这些下划线? 问题答案: 不同的用例有一些细微差别,但是通常下划线表示“忽略此”。 在声明一个新函数时,下划线告诉Swift调用时该参数不
quickstart 本教程假设你从头开始,没有Kafka和ZooKeeper历史数据。 quickstart_download 下载 0.10.0.0 的正式版本并解压。 > tar -xzf kafka_2.11-0.10.0.0.tgz > cd kafka_2.11-0.10.0.0 quickstart_startserver Kafka依赖ZooKeeper因此你首先启动一个ZooKe
没接触过php,公司上来就要用这东西
环境搭建 创建工程 目录规范 开发调试 页面开发 打包Bundle
Yii 提供了一整套用来简化实现 RESTful 风格的 Web Service 服务的 API。 特别是,Yii 支持以下关于 RESTful 风格的 API: 支持 Active Record 类的通用 API 的快速原型; 涉及的响应格式(在默认情况下支持 JSON 和 XML); 支持可选输出字段的定制对象序列化; 适当的格式的数据采集和验证错误; 集合分页,过滤和排序; 支持 HATEO
快速入门 如果CasperJS已经在你的电脑上安装好,你就可以开始编写你的第一个脚本啦!你可以使用原生JavaScript(或PhantomJS2.0之前版本支持的 CoffeeScript)来编写脚本。 提示 如果你还不习惯使用JavaScript,请阅读这个FAQ条目。 最简单的CasperJS脚本 打开你最爱的编辑器,创建一个名为sample.js的文件,写入以下代码: var casper