Chronicle Queue的Appender可以多线程,Tailer单线程或者几个线程就行
Maven
<dependency>
<groupId>net.openhft</groupId>
<artifactId>affinity</artifactId>
<version>3.23.2</version>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-queue</artifactId>
<version>5.23.37</version>
</dependency>
Gradle
dependencies {
implementation 'net.openhft:affinity:3.23.2'
implementation "net.openhft:chronicle-queue:5.23.37"
}
String basePath = OS.getTarget() + "/Queue1";
ChronicleQueue queue = ChronicleQueue.singleBuilder(basePath).rollCycle(RollCycles.FIVE_MINUTELY).build();
ChronicleQueue
共享一个实例就行
basePath
队列要保存的目录
RollCycles.FIVE_MINUTELY
队列文件5分钟一滚动
ExcerptAppender appender = queue.acquireAppender();
appender.writeText("Hello World(你好 世界)!--" + i);
ExcerptAppender
不是线程安全的,各个线程里需要创建
针对JSON引用,简单的Text类型最好
ExcerptTailer tailer = queue.createTailer("reader1"); //@wjw_note: 如果是createTailer()方法,没有给定名称,会一直能读到最后的数据而不会移动索引
String readText = null;
while ((readText = tailer.readText()) != null) {
System.out.println("read: " + readText);
}
ExcerptTailer
单线程或者几个线程就行,多了因为锁的争用效率会急剧下降
createTailer
方法必需给一个读取者ID,如果没有给定名称,会一直能读到最后的数据而不会移动索引.这种做法的好处是,可以有任意多个读取者来读取队列的消息而互不干扰,Chronicle Queue创建的队列类似于"扇形队列".
tailer.readText()
多不到数据会返回null
Chronicle Queue 将其数据存储在堆外,因此建议您在使用完 Chronicle Queue 后调用 .close()
以释放资源。
queue.close();
package org.wjw.chronicle.queue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
class TestQueueMain {
ChronicleQueue queue;
@BeforeEach
void setUp() throws Exception {
String basePath = OS.getTarget() + "/Queue1";
queue = ChronicleQueue.singleBuilder(basePath).rollCycle(RollCycles.FIVE_MINUTELY).build();
}
@AfterEach
void tearDown() throws Exception {
queue.close();
}
/**
* 测试最简单的写入字符串
*/
@Test
void testWtite() {
ExcerptAppender appender = queue.acquireAppender();
try {
for (int i = 0; i < 5; i++) {
appender.writeText("Hello World(你好 世界)!--" + i);
}
} finally {
appender.close();
}
}
/**
* 测试最简单的读取字符串
*/
@Test
void testRead() {
ExcerptTailer tailer = queue.createTailer("reader1"); //@wjw_note: 如果是createTailer()方法,没有给定名称,会一直能读到最后的数据而不会移动索引
try {
String readText = null;
while ((readText = tailer.readText()) != null) {
System.out.println("read: " + readText);
}
} finally {
tailer.close();
}
}
}
输出如下:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
read: Hello World(你好 世界)!--0
read: Hello World(你好 世界)!--1
read: Hello World(你好 世界)!--2
read: Hello World(你好 世界)!--3
read: Hello World(你好 世界)!--4
package org.wjw.chronicle.queue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.wjw.chronicle.Person;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.ValueIn;
class TestQueueRawData {
ChronicleQueue queue;
@BeforeEach
void setUp() throws Exception {
String basePath = OS.getTarget() + "/QueueDocument";
queue = ChronicleQueue.singleBuilder(basePath).rollCycle(RollCycles.FIVE_MINUTELY).build();
}
@AfterEach
void tearDown() throws Exception {
queue.close();
}
/**
* 测试读写基本类型
*/
@Test
void testRawData() {
ExcerptAppender appender = queue.acquireAppender();
try {
for (int i = 0; i < 5; i++) {
appender.writeDocument(w -> w
.getValueOut()
.int32(0x123456)
.getValueOut()
.int64(0x999000999000L)
.getValueOut()
.text("Hello World" + System.currentTimeMillis()));
}
} finally {
appender.close();
}
ExcerptTailer tailer = queue.createTailer("reader1"); //@wjw_note: 如果是createTailer()方法,没有给定名称,会一直能读到最后的数据而不会移动索引
try {
boolean read = true;
while (read) {
read = tailer.readDocument(w -> {
ValueIn in = w.getValueIn();
int num = in.int32();
long num2 = in.int64();
String text = in.text();
System.out.printf("num:%d num2:%d text:%s%n", num, num2, text);
});
}
} finally {
tailer.close();
}
}
}
输出如下:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
num:1193046 num2:168843764404224 text:Hello World1668506357443
num:1193046 num2:168843764404224 text:Hello World1668506357443
num:1193046 num2:168843764404224 text:Hello World1668506357443
num:1193046 num2:168843764404224 text:Hello World1668506357443
num:1193046 num2:168843764404224 text:Hello World1668506357443
您还可以选择将对象附加到队列。 在这种情况下,我们建议该类实现 net.openhft.chronicle.wire.Marshallable
并覆盖 toString
方法以实现更高效的序列化。 请参见下面的示例:
可序列化对象:
package org.wjw.chronicle;
import net.openhft.chronicle.wire.Marshallable;
/**
* 该类实现 `net.openhft.chronicle.wire.Marshallable` 并覆盖 `toString` 方法以实现更高效的序列化
*/
public class Person implements Marshallable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public Person() {
super();
}
@Override
public String toString() {
return Marshallable.$toString(this);
}
}
读写测试代码:
package org.wjw.chronicle.queue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.wjw.chronicle.Person;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
class TestQueueObject {
ChronicleQueue queue;
@BeforeEach
void setUp() throws Exception {
String basePath = OS.getTarget() + "/QueueDocument";
queue = ChronicleQueue.singleBuilder(basePath).rollCycle(RollCycles.FIVE_MINUTELY).build();
}
@AfterEach
void tearDown() throws Exception {
queue.close();
}
/**
* 测试读写实现了Marshallable接口的对象
*/
@Test
void testMarshallable() {
ExcerptAppender appender = queue.acquireAppender();
try {
for (int i = 0; i < 5; i++) {
Person person = new Person();
person.setName("Rob");
person.setAge(40 + i);
appender.writeDocument(person);
}
} finally {
appender.close();
}
ExcerptTailer tailer = queue.createTailer("reader1");
try {
Person person2 = new Person();
while (tailer.readDocument(person2)) {
System.out.println(person2);
}
} finally {
appender.close();
}
}
}
输出如下:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
!org.wjw.chronicle.Person {
name: Rob,
age: 40
}
!org.wjw.chronicle.Person {
name: Rob,
age: 41
}
!org.wjw.chronicle.Person {
name: Rob,
age: 42
}
!org.wjw.chronicle.Person {
name: Rob,
age: 43
}
!org.wjw.chronicle.Person {
name: Rob,
age: 44
}
<<<<<<<<<<<< [完] >>>>>>>>>>>>