本章介绍如何执行最常见的 Chronicle Queue 操作。 这包括从队列中写入和读取数据的各种方式。 数据可以是多种类型,包括文本、数字或序列化的 blob。 无论数据类型如何,它都存储为一系列字节。
在继续之前,应该注意 Chronicle 将写入行为称为 appending,将读取行为称为 tailing。 根据这个术语,写入队列是使用 appender 完成的,而从队列中读取是使用 tailer 完成的。
这些概念的详细信息在相应的章节中进行了介绍:
还有一个单独的章节描述了可以使用读/写代理管理数据。
使用 appender 将数据附加(写入)到 Chronicle Queue 的末尾。消息始终按照 appender 写入它们的顺序写入。您可以让多个 appender 将消息写入同一队列,在这种情况下,消息由 不同的appenders交错。
从队列实例中获取一个 appender,如下所示:
try (ChronicleQueue queue = ChronicleQueue.singleBuilder("trades").build()) {
final ExcerptAppender appender = queue.acquireAppender();
}
然后可以使用writeText(CharSequence text)
方法编写一条简单的文本消息。
String message = "Hello World!";
appender.writeText(message);
然而,有多种附加数据的方法,从高级抽象(如writeText
和写入代理)到低级 API,甚至直接写入原始内存。这些替代方案在 更多详细信息,请参见以下部分。
writeText()
等高级方法是直接调用 appender.writingDocument()
的便捷方法,但两种方法本质上实现的是同一件事。writeText(CharSequence text)
的实现如下所示:
void writeText(CharSequence text) {
try (DocumentContext dc = writingDocument()) {
dc.wire().bytes().append8bit(text);
}
}
如上所示,可以使用文档上下文直接追加数据:
try (final DocumentContext dc = appender.writingDocument()) {
dc.wire().write().text(“Your text data“);
}
当 try-with-resources 块到达 close 时,数据的长度被写入标头。您还可以使用 DocumentContext
来查找刚刚分配数据的索引(见下文)。索引 稍后可用于移动到/查找此摘录。
try (final DocumentContext dc = appender.writingDocument()) {
dc.wire().write().text(“your text data“);
System.out.println("your data was store to index="+ dc.index());
}
注意: 每个 Chronicle Queue 摘录都有一个唯一的索引。
您还可以编写带有关联标签的文本消息
appender.writeDocument(wire -> wire.write("FirstName").text("Rob"));
appender.writeDocument(wire -> wire.write("LastName").text("Rob"));
您还可以选择编写“自我描述消息”。 此类消息可以支持架构(schema)更改。 在调试或诊断问题时,它们也更容易理解。
appender.writeDocument(w -> w.write("trade").marshallable(
m -> m.write("timestamp").dateTime(now)
.write("symbol").text("EURUSD")
.write("price").float64(1.1101)
.write("quantity").float64(15e6)
.write("side").object(Side.class, Side.Sell)
.write("trader").text("peter")));
您还可以选择将对象附加到队列。 在这种情况下,我们建议该类实现 net.openhft.chronicle.wire.Marshallable
并覆盖 toString
方法以实现更高效的序列化。 请参见下面的示例:
static class Person implements Marshallable {
String name;
int age;
@Override
public String toString() {
return Marshallable.$toString(this);
}
}
然后可以使用 appender.writeDocument()
编写 Person
的一个实例,如下所示:
SingleChronicleQueue queue = SingleChronicleQueueBuilder.builder().path(Files.createTempDirectory("queue").toFile()).build();
ExcerptAppender appender = queue.acquireAppender();
Person person = new Person();
person.name = "Rob";
person.age = 40;
appender.writeDocument(person);
注意: 在 读取对象 中了解如何从队列中读取对象.
如果您想更好地控制对象的序列化方式,您可以选择将“原始数据”附加到队列。
您可以编写自描述的“原始数据”。 类型总是正确的; position 是这些值含义的唯一指示。
appender.writeDocument(w -> w
.getValueOut().int32(0x123456)
.getValueOut().int64(0x999000999000L)
.getValueOut().text("Hello World"));
您可以编写非自描述的“原始数据”。 您的读者必须知道此数据的含义以及所使用的类型。
appender.writeBytes(b -> b
.writeByte((byte) 0x12)
.writeInt(0x345678)
.writeLong(0x999000999000L)
.writeUtf8("Hello World"));
在最低的抽象级别,您会获得原始内存的地址,您可以在其中编写任何您想要的内容。
// Unsafe low level
appender.writeBytes(b -> {
long address = b.address(b.writePosition());
Unsafe unsafe = UnsafeMemory.UNSAFE;
unsafe.putByte(address, (byte) 0x12);
address += 1;
unsafe.putInt(address, 0x345678);
address += 4;
unsafe.putLong(address, 0x999000999000L);
address += 8;
byte[] bytes = "Hello World".getBytes(StandardCharsets.ISO_8859_1);
unsafe.copyMemory(bytes, Jvm.arrayByteBaseOffset(), null, address, bytes.length);
b.writeSkip(1 + 4 + 8 + bytes.length);
});
读取队列遵循与写入相同的模式,除了有可能在您尝试读取它时没有消息。
与其他 Java 队列解决方案不同,消息在使用 tailer 读取时不会丢失。 这在下面的“使用tailer从队列中读取”部分中有更详细的介绍。
开始读取
try (ChronicleQueue queue = ChronicleQueue.singleBuilder(path + "/trades").build()) {
final ExcerptTailer tailer = queue.createTailer();
}
您可以根据消息的内容将每条消息转换为方法调用,并让 Chronicle Queue 自动反序列化方法参数。 调用 reader.readOne()
将自动跳过(过滤掉)任何与您的方法阅读器不匹配的消息。
// reading using method calls
RiskMonitor monitor = System.out::println;
MethodReader reader = tailer.methodReader(monitor);
// read one message
assertTrue(reader.readOne());
SingleChronicleQueue queue = SingleChronicleQueueBuilder.builder().path(Files.createTempDirectory("queue").toFile()).build();
ExcerptAppender appender = queue.acquireAppender();
Person person = new Person();
ExcerptTailer tailer = queue.createTailer();
tailer.readDocument(person);
注意: 字段的名称、类型和顺序不必匹配。
assertTrue(tailer.readDocument(w -> w.read("trade").marshallable(
m -> {
LocalDateTime timestamp = m.read("timestamp").dateTime();
String symbol = m.read("symbol").text();
double price = m.read("price").float64();
double quantity = m.read("quantity").float64();
Side side = m.read("side").object(Side.class);
String trader = m.read("trader").text();
// do something with values.
})));
这将检查类型是否正确,并根据需要进行转换。
assertTrue(tailer.readDocument(w -> {
ValueIn in = w.getValueIn();
int num = in.int32();
long num2 = in.int64();
String text = in.text();
// do something with values
}));
assertTrue(tailer.readBytes(in -> {
int code = in.readByte();
int num = in.readInt();
long num2 = in.readLong();
String text = in.readUtf8();
assertEquals("Hello World", text);
// do something with values
}));
assertTrue(tailer.readBytes(b -> {
long address = b.address(b.readPosition());
Unsafe unsafe = UnsafeMemory.UNSAFE;
int code = unsafe.getByte(address);
address++;
int num = unsafe.getInt(address);
address += 4;
long num2 = unsafe.getLong(address);
address += 8;
int length = unsafe.getByte(address);
address++;
byte[] bytes = new byte[length];
unsafe.copyMemory(null, address, bytes, Jvm.arrayByteBaseOffset(), bytes.length);
String text = new String(bytes, StandardCharsets.UTF_8);
assertEquals("Hello World", text);
// do something with values
}));
注意: 每个tailer都会看到每条消息。
可以添加一个抽象来过滤消息,或者将消息分配给一个消息处理器。 但是,一般来说,对于一个主题,您只需要一个主要的“tailer”,可能还有一些用于监控等的辅助“tailer”。
由于Chronicle Queue没有对其主题进行分区,所以您可以获得该主题中所有消息的总排序。跨主题,不能保证排序;如果您想从一个消耗多个主题的系统中确定地重放,我们建议从该系统的输出中重放。
Chronicle Queue tailers 可以创建File Handlers,每当调用关联的 chronicle 队列的 close()
方法或 Jvm 运行垃圾收集时,File Handlers就会被清理。 如果您正在编写没有 GC 暂停的代码并且您明确想要清理File Handlers,则可以调用以下命令:
((StoreTailer)tailer).releaseResources()
ExcerptTailer.toEnd()
toEnd()
方法。 当tailer方向为FORWARD
(默认情况下,或由ExcerptTailer.direction
方法设置)时,调用toEnd()
会将tailer放置在队列中最后一个现有记录的之后。 在这种情况下,tailer 现在已准备好读取附加到队列的任何新记录。 在任何新消息附加到队列之前,不会有新的 DocumentContext
可供读取:
// 在新消息附加到队列之前,这将是false的
boolean messageAvailable = tailer.toEnd().readingDocument().isPresent();
如果需要从队列的尾部向后读取,那么tailer可以设置为向后读取:
ExcerptTailer tailer = queue.createTailer();
tailer.direction(TailerDirection.BACKWARD).toEnd();
向后读取时,toEnd()
方法会将尾部移动到队列中的最后一条记录。 如果队列不为空,则将有一个 DocumentContext
可供读取:
// 如果队列中至少有一条消息,则为true
boolean messageAvailable = tailer.toEnd().direction(TailerDirection.BACKWARD).
readingDocument().isPresent();
又名 tailers。
当应用程序重新启动时,tailer可以从它原来的位置继续,这可能很有用。
try (ChronicleQueue cq = SingleChronicleQueueBuilder.binary(tmp).build()) {
ExcerptTailer atailer = cq.createTailer("a");
assertEquals("test 0", atailer.readText());
assertEquals("test 1", atailer.readText());
assertEquals("test 2", atailer.readText()); // <1>
ExcerptTailer btailer = cq.createTailer("b");
assertEquals("test 0", btailer.readText()); // <3>
}
try (ChronicleQueue cq = SingleChronicleQueueBuilder.binary(tmp).build()) {
ExcerptTailer atailer = cq.createTailer("a");
assertEquals("test 3", atailer.readText()); // <2>
assertEquals("test 4", atailer.readText());
assertEquals("test 5", atailer.readText());
ExcerptTailer btailer = cq.createTailer("b");
assertEquals("test 1", btailer.readText()); // <4>
}
<1>: Tailer “a” last reads message 2
<2>: Tailer “a” next reads message 3
<3>: Tailer “b” last reads message 0
<4>: Tailer “b” next reads message 1
这来自RestartableTailerTest
,其中有两个 tailer,每个都有一个唯一的名称。 这些 Tailer 将它们的索引存储在 Queue 本身中,并且在 Tailer 使用toStart()
、toEnd()
、moveToIndex()
或读取消息时维护此索引。
注意: direction()
不会在重新启动时保留,只会保留下一个要读取的索引。
注意: 只有当调用DocumentContext.close()
时,tailer 的索引才会更新。如果错误阻止了这一点,则在每次重新启动时将读取相同的消息。
Chronicle Queue 以二进制格式存储其数据,文件扩展名为cq4
:
\�@πheader∂SCQStoreÇE��»wireType∂WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂SCQSRollÇ*���∆length¶ÄÓ6�∆format
ÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•�ÃindexSpacingÀindex2Indexé����ß��������…lastIndexé�
���ß��������fllastAcknowledgedIndexReplicatedé�����ߡˇˇˇˇˇˇˇ»recovery∂TimedStoreRecoveryÇ���…timeStampèèèß
���������������������������������������������������������������������������������������������
���������������������������������������������������������������������������������������������
���������������������������������������������������������������������������������������������
���������������������������������������������������������������������������������������������
�����������������������������������������������������������������
这通常有点难读,所以最好将cq4
文件转储为文本。这还可以帮助您解决生产问题,因为它可以让您看到队列中存储了什么以及以什么顺序存储的内容。
您可以使用net.openhft.chronicle.queue.main.DumpMain
将队列转储到终端。或net.openhft.chronicle.queue.ChronicleReaderMain.DumpMain
执行简单的转储到终端,而ChronicleReaderMain
处理更复杂的操作,例如跟踪队列。它们都可以通过下面描述的多种方式从命令行运行。
如果您有一个包含 Chronicle-Queue 工件的项目 pom 文件,则可以使用以下命令读取cq4
文件:
$ mvn exec:java -Dexec.mainClass="net.openhft.chronicle.queue.main.DumpMain" -Dexec.args="myqueue"
在上面的命令中 myqueue 是包含你的 .cq4 文件的目录
您还可以手动设置任何依赖文件。 这需要 4.5.3 或更高版本的chronicle-queue.jar
,并且所有依赖文件都存在于类路径中。 下面列出了依赖的jar:
$ ls -ltr
total 9920
-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-5.20.108.jar
-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-2.20.104.jar
-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-2.20.114.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.30.jar
-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-simple-1.7.30.jar
-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-2.20.105.jar
-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-2.20.101.jar
-rw-r--r-- 1 robaustin staff 344235 28 Jul 15:05 affinity-3.20.0.jar
-rw-r--r-- 1 robaustin staff 124332 28 Jul 15:05 commons-cli-1.4.jar
-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:06 19700101-02.cq4
**提示:**要找出包含的 jar 版本,请参阅 chronicle-bom.
一旦依赖项出现在类路径上,您就可以运行:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.main.DumpMain 19700101-02.cq4
这会将19700101-02.cq
”文件转储为文本,如下所示:
!!meta-data #binary
header: !SCQStore {
wireType: !WireType BINARY,
writePosition: 0,
roll: !SCQSRoll {
length: !int 3600000,
format: yyyyMMdd-HH,
epoch: !int 3600000
},
indexing: !SCQSIndexing {
indexCount: !short 4096,
indexSpacing: 4,
index2Index: 0,
lastIndex: 0
},
lastAcknowledgedIndexReplicated: -1,
recovery: !TimedStoreRecovery {
timeStamp: 0
}
}
...
# 4198044 bytes remaining
注意: 上面的示例没有显示任何用户数据,因为没有将用户数据写入此示例文件。
在 Chonicle-Queue/bin
文件夹中还有一个名为 dump_queue.sh
的脚本,它在 shaded jar 中收集了所需的依赖项,并使用它通过 DumpMain
转储队列。 该脚本可以从 Chronicle-Queue
根文件夹运行,如下所示:
$ ./bin/dump_queue.sh <file path>
记录Chronicle queue内容的第二个工具是’ Chronicle ereadermain '(在Chronicle queue项目中)。如前所述,除了将文件内容打印到控制台之外,它还能够执行一些操作。例如,它可以用于跟踪队列,以检测何时添加了新消息(类似于 $tail -f
)。
下面是配置ChronicleReaderMain
的命令行界面:
usage: ChronicleReaderMain
-d <directory> Directory containing chronicle queue files
-e <exclude-regex> Do not display records containing this regular
expression
-f Tail behaviour - wait for new records to arrive
-h Print this help and exit
-i <include-regex> Display records containing this regular expression
-l Squash each output message into a single line
-m <max-history> Show this many records from the end of the data set
-n <from-index> Start reading from this index (e.g. 0x123ABE)
-r <interface> Use when reading from a queue generated using a MethodWriter
-s Display index
-w <wire-type> Control output i.e. JSON
与DumpQueue
一样,您需要在类路径中出现上面示例中的类。 这可以再次通过手动添加它们然后运行来实现:
$ java -cp chronicle-queue-5.20.108.jar net.openhft.chronicle.queue.ChronicleReaderMain -d <directory>
另一种选择是使用 Maven shade 插件创建一个 Uber Jar。 它配置如下:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<includes>
<include>net/openhft/**</include>
<include>software/chronicle/**</include>
</includes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Uber jar 存在后,您可以通过以下方式从命令行运行ChronicleReaderMain
:
java -cp "$UBER_JAR" net.openhft.chronicle.queue.ChronicleReaderMain "19700101-02.cq4"
Lastly, there is a script for running the reader named queue_reader.sh
which again is located in the Chonicle-Queue/bin
-folder. It automatically gathers the needed dependencies in a shaded jar and uses it to run ChronicleReaderMain
. The script can be run from the Chronicle-Queue
root folder like this:
$ ./bin/queue_reader.sh <options>
Chronicle v4.4+ 支持使用代理来写入和读取消息。 您首先定义一个异步“接口”,其中所有方法都具有:
一个简单的异步接口
import net.openhft.chronicle.wire.SelfDescribingMarshallable;
interface MessageListener {
void method1(Message1 message);
void method2(Message2 message);
}
static class Message1 extends SelfDescribingMarshallable {
String text;
public Message1(String text) {
this.text = text;
}
}
static class Message2 extends SelfDescribingMarshallable {
long number;
public Message2(long number) {
this.number = number;
}
}
要写入队列,您可以调用实现此接口的代理。
SingleChronicleQueue queue1 = ChronicleQueue.singleBuilder(path).build();
MessageListener writer1 = queue1.acquireAppender().methodWriter(MessageListener.class);
// call method on the interface to send messages
writer1.method1(new Message1("hello"));
writer1.method2(new Message2(234));
这些调用产生的消息可以按如下方式转储。
# position: 262568, header: 0
--- !!data #binary
method1: {
text: hello
}
# position: 262597, header: 1
--- !!data #binary
method2: {
number: !int 234
}
要读取消息,您可以提供一个读取器,它用您所做的相同调用调用您的实现。
// a proxy which print each method called on it
MessageListener processor = ObjectUtils.printAll(MessageListener.class)
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1.createTailer().methodReader(processor);
assertTrue(reader1.readOne());
assertTrue(reader1.readOne());
assertFalse(reader1.readOne());
运行这个例子输出:
method1 [!Message1 {
text: hello
}
]
method2 [!Message2 {
number: 234
}
]
**注意:**有关更多详细信息,请阅读文章 Using Method Reader/Writers 和 查看 MessageReaderWriterTest。
原文链接: (https://docs.chronicle.software/chronicle-queue/chronicle-queue/queue-operations/queue-operations.html)
<<<<<<<<<<<< [完] >>>>>>>>>>>