当前位置: 首页 > 工具软件 > Jafka > 使用案例 >

jafka环境搭建步骤--实例可用

巫朝明
2023-12-01

1 搭建实例

A 下载包

B 修改 jafka.conf修改 jafka_home修改绝对路径

set.JAFKA_HOME=/opt/apps/jafka-1.2.1

我发现不改也没事

C 运行jafkamq

sh ./bin/server.sh conf/server.properties

d 运行和停止

./run.sh start 

…../ console

./run.sh stop

E java代码—非常重要端口问题

默认端口放在了conf/server.properties文件下的端口

默认应该都是9002,

package com;

 

import java.util.Properties;

 

import com.sohu.jafka.producer.Producer;

importcom.sohu.jafka.producer.ProducerConfig;

importcom.sohu.jafka.producer.StringProducerData;

importcom.sohu.jafka.producer.serializer.StringEncoder;

 

public class JJProducer {

         publicstatic void main(String[] args) throws Exception {

             Properties props = new Properties();

             props.put("broker.list","0:10.16.xxx.xx:9002");

             props.put("serializer.class",StringEncoder.class.getName());

             ProducerConfig config = newProducerConfig(props);

             Producer<String, String> producer =new Producer<String, String>(config);

             StringProducerData data = newStringProducerData("demo");

             for(int i=0;i<20;i++) {

                 data.add("Hello world #"+i);

             }

             try {

                 long start =System.currentTimeMillis();

                 for (int i = 0; i < 20; i++) {

                     producer.send(data);

                 }

                 long cost = System.currentTimeMillis()- start;

                 System.out.println("send 20message cost: "+cost+" ms");

             } finally {

                 producer.close();

             }

         }

}

 

package com;

 

import com.sohu.jafka.api.FetchRequest;

import com.sohu.jafka.consumer.SimpleConsumer;

importcom.sohu.jafka.message.ByteBufferMessageSet;

importcom.sohu.jafka.message.MessageAndOffset;

import com.sohu.jafka.utils.Utils;

 

public class JJConsumer {

        

         publicstatic void main(String[] args) throws Exception {

 

                   SimpleConsumerconsumer = new SimpleConsumer("10.16.xxx.xx", 9002);

                   //

                   longoffset = 0;

                   while(true) {

                       FetchRequest request = newFetchRequest("demo", 0, offset);

                       ByteBufferMessageSet  set = consumer.fetch(request);

                       L.l.info("set size"+set);

                       for (MessageAndOffset msg : set) {

                          System.out.println(Utils.toString(msg.message.payload(),"UTF-8"));

                           offset = msg.offset;

                          L.l.info("message:"+Utils.toString(msg.message.payload(),"UTF-8"));

                       }

                   }

         }

}

 

 

2 搭建以zookeeper来整合

其他一样,

首先先要启动zookeeper的,

 

B :事实上这句启动会错误,而且根本不需要,直接发送接收

首先要zookeeper已经建立好

$bin/zookeeper-server.sh conf/zookeeper.properties

C:进行发送

bin/producer-console.sh --zookeeper 1.1.1.1:2184 --topic demo

D:进行接收

sh ./bin/consumer-console.sh --zookeeper 1.1.1.1:2184 --topic demo --from-beginning

3 kafka支持流了,就是输入是topic,输出也是是个jar包,和其他流框架有些类似

4 kafka connect快速实现数据进出,否则需要依赖于其他控件

5 支持adminClient api,:::

The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects.,

可以参考的 增删 topic

https://blog.csdn.net/lisi1129/article/details/54744018

6 配置属性可编程和 在配置文件中写:

http://kafka.apache.org/documentation/#configuration

7 kafk 的监控机制:

https://github.com/smartloli/kafka-eagle

源码分析可以参考 https://www.cnblogs.com/smartloli/p/6817565.html

8 问题,有时间要理解常见的概念

 类似资料: