pom
<!-- https://mvnrepository.com/artifact/org.msgpack/msgpack -->
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack</artifactId>
<version>0.6.12</version>
</dependency>
demo
@Test
public void test01() throws Exception {
String msg = "饱食者当常忆袁公 www.2345.com";
MessagePack messagePack = new MessagePack();
// 序列化
final byte[] bytes = messagePack.write(msg);
// 反序列化
// 饱食者当常忆袁公 www.2345.com
final String s = messagePack.read(bytes, String.class);
// 反序列化
// "饱食者当常忆袁公 www.2345.com"
//final String s = messagePack.read(bytes).toString();
// 反序列化
// "???????? www.2345.com"
//final String s0 = messagePack.read(bytes).toString();
//final String s = new String(s0.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8);
System.out.println(s);
}
@Test
public void test02() throws Exception{
String msg = "{\"pId\":9527,\"pName\":\"华安\",\"isMarry\":true}";
MessagePack messagePack = new MessagePack();
// 序列化
final byte[] bytes = messagePack.write(msg);
// 反序列化
// {"pId":9527,"pName":"华安","isMarry":true}
String s = messagePack.read(bytes, Templates.TString);
System.out.println(s);
}
@Test
public void test03() throws Exception{
String msg = "{\"pId\":9527,\"pName\":\"华安\",\"isMarry\":true}";
MessagePack messagePack = new MessagePack();
// 序列化
final byte[] bytes = messagePack.write(msg);
// 反序列化
// "{\"pId\":9527,\"pName\":\"华安\",\"isMarry\":true}"
final String s = messagePack.read(bytes).toString();
System.out.println(s);
}
cn.king.kfk01.producer.Producer
package cn.king.kfk01.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.msgpack.MessagePack;
import java.io.IOException;
import java.util.Properties;
/**
* @author: wjl@king.cn
* @time: 2020/11/15 22:50
* @version: 1.0.0
* @description: MessagePack 序列化
*/
public class Producer {
public static void main(String[] args) throws IOException {
// 创建kfk的生产者配置信息
Properties properties = new Properties();
// 指定连接的kfk集群, broker-list
properties.put("bootstrap.servers", "aliyun:9092");
// key序列化
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// 创建生产者对象
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(properties);
// 发送数据
MessagePack messagePack = new MessagePack();
for (int i = 0; i < 10; i++) {
String msg = "饱食者当常忆袁公。 www.2345.com";
final byte[] bytes = messagePack.write(msg);
producer.send(new ProducerRecord<>("test_topic_2021524", bytes), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (null == exception) {
// 如果发送成功, 打印消息的分区编号和偏移量
System.out.println(metadata.partition() + "--" + metadata.offset());
} else {
// 如果发送失败, 打印堆栈信息
exception.printStackTrace();
}
}
});
}
// 关闭连接
producer.close();
}
}
cn.king.kfk01.consumer.Consumer
package cn.king.kfk01.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.msgpack.MessagePack;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
/**
* @author: wjl@king.cn
* @time: 2020/11/16 0:41
* @version: 1.0.0
* @description: MessagePack反序列化
*/
public class Consumer {
public static void main(String[] args) {
// 创建消费者配置信息
Properties properties = new Properties();
// 连接的集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "aliyun:9092");
// 开启自动提交offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 自动提交的延迟. 1秒
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100");
properties.put("auto.offset.reset", "earliest");
// k v 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
// 消费者组. 使用kfk控制台时会给我们自动分配, 但是代码中需要我们手动指定
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kfk_consumer_group_" + System.currentTimeMillis());
// 创建一个消费者. 泛型是读取到数据的k/v类型.
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(properties);
// 订阅主题 没有返回值. 订阅一个不存在的主题也是可以的, 但是会有一个警告.
consumer.subscribe(Arrays.asList("test_topic_2021524"));
// 死循环拉取数据.
MessagePack messagePack = new MessagePack();
for (; ; ) {
// 获取数据. 参数拉取的延迟时间.
ConsumerRecords<String, byte[]> records = consumer.poll(100);
// 解析并打印ConsumerRecords
records.forEach(record -> {
final byte[] bytes = record.value();
String s = null;
try {
s = messagePack.read(bytes).toString();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(s);
});
}
}
}