A 下载包
B 修改 jafka.conf修改 jafka_home修改绝对路径
set.JAFKA_HOME=/opt/apps/jafka-1.2.1
我发现不改也没事
C 运行jafkamq
sh ./bin/server.sh conf/server.properties
d 运行和停止
./run.sh start
…../ console
./run.sh stop
E java代码—非常重要端口问题
默认端口放在了conf/server.properties文件下的端口
默认应该都是9002,
package com;
import java.util.Properties;
import com.sohu.jafka.producer.Producer;
importcom.sohu.jafka.producer.ProducerConfig;
importcom.sohu.jafka.producer.StringProducerData;
importcom.sohu.jafka.producer.serializer.StringEncoder;
public class JJProducer {
publicstatic void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("broker.list","0:10.16.xxx.xx:9002");
props.put("serializer.class",StringEncoder.class.getName());
ProducerConfig config = newProducerConfig(props);
Producer<String, String> producer =new Producer<String, String>(config);
StringProducerData data = newStringProducerData("demo");
for(int i=0;i<20;i++) {
data.add("Hello world #"+i);
}
try {
long start =System.currentTimeMillis();
for (int i = 0; i < 20; i++) {
producer.send(data);
}
long cost = System.currentTimeMillis()- start;
System.out.println("send 20message cost: "+cost+" ms");
} finally {
producer.close();
}
}
}
package com;
import com.sohu.jafka.api.FetchRequest;
import com.sohu.jafka.consumer.SimpleConsumer;
importcom.sohu.jafka.message.ByteBufferMessageSet;
importcom.sohu.jafka.message.MessageAndOffset;
import com.sohu.jafka.utils.Utils;
public class JJConsumer {
publicstatic void main(String[] args) throws Exception {
SimpleConsumerconsumer = new SimpleConsumer("10.16.xxx.xx", 9002);
//
longoffset = 0;
while(true) {
FetchRequest request = newFetchRequest("demo", 0, offset);
ByteBufferMessageSet set = consumer.fetch(request);
L.l.info("set size"+set);
for (MessageAndOffset msg : set) {
System.out.println(Utils.toString(msg.message.payload(),"UTF-8"));
offset = msg.offset;
L.l.info("message:"+Utils.toString(msg.message.payload(),"UTF-8"));
}
}
}
}
其他一样,
首先先要启动zookeeper的,
B :事实上这句启动会错误,而且根本不需要,直接发送接收
首先要zookeeper已经建立好
$bin/zookeeper-server.sh conf/zookeeper.properties
C:进行发送
bin/producer-console.sh --zookeeper 1.1.1.1:2184 --topic demo
D:进行接收
sh ./bin/consumer-console.sh --zookeeper 1.1.1.1:2184 --topic demo --from-beginning
3 kafka支持流了,就是输入是topic,输出也是是个jar包,和其他流框架有些类似
4 kafka connect快速实现数据进出,否则需要依赖于其他控件
5 支持adminClient api,:::
The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects.,
可以参考的 增删 topic
https://blog.csdn.net/lisi1129/article/details/54744018
6 配置属性可编程和 在配置文件中写:
http://kafka.apache.org/documentation/#configuration
7 kafk 的监控机制:
https://github.com/smartloli/kafka-eagle
源码分析可以参考 https://www.cnblogs.com/smartloli/p/6817565.html
8 问题,有时间要理解常见的概念