当前位置: 首页 > 知识库问答 >
问题:

Apache beam 2.34.0 SQSIO非法变异异常

董琦
2023-03-14

我正在尝试以批处理模式从sqs队列读取数据,并使用Apache beam 2.34.0和AWS beam SDK v1写入本地文件,这会引发非法的变异异常。

public class SqsReader {

    public void run(String[] args) {

        SqsReaderOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().
                as(SqsReaderOptions.class);
        Pipeline p = this.getPipeline(args);

        p.apply(SqsIO.read().withQueueUrl(options.getSourceQueueUrl())
                        .withMaxNumRecords(options.getNumberOfRecords()))
                .apply(ParDo.of(new SqsMessageToJson()))
                .apply(TextIO.write()
                        .to(options.getLocalOutputLocation())
                        .withNumShards(options.getNumShards()));

        p.run().waitUntilFinish();
    }

    public static void main(String[] args) throws IOException {
        new SqsReader().run(args);
    }

    public static class SqsMessageToJson extends DoFn<Message, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String message = Objects.requireNonNull(c.element()).getBody();
            c.output(message);
        }
    }
}

我得到了以下例外

Jan 10, 2022 11:37:05 AM org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector verifyUnmodifiedThrowingCheckedExceptions
WARNING: Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, maxNumRecords=1, maxReadTime=null}
Coder of type class org.apache.beam.sdk.coders.SerializableCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element Shard{source=org.apache.beam.sdk.io.aws.sqs.SqsUnboundedSource@5f19451c, maxNumRecords=1, maxReadTime=null}

Exception in thread "main" org.apache.beam.sdk.util.IllegalMutationException: PTransform SqsIO.Read/Read(SqsUnboundedSource)/Read/ParMultiDo(Read) mutated value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2FXnTVQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}} after it was output (new value was ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: DeVRF8vQATm1f+rHIvR3eaejlRHksL1R7WE4zDT7lSwdIs9gJCYKXFXnTVQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}}). Values must not be mutated in any way after being output.
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:137)
    at org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:231)
    at org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:163)
    at org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:292)
    at org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:194)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:131)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj2KQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}} mutated illegally, new value was ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQ==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}}. Encoding was rO.
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.illegalMutation(MutationDetectors.java:158)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodifiedThrowingCheckedExceptions(MutationDetectors.java:153)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.verifyUnmodified(MutationDetectors.java:128)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:127)
    ... 10 more
Caused by: org.apache.beam.sdk.util.IllegalMutationException: Value ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQEBj=,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}} mutated illegally, new value was ValueWithRecordId{id=[98, 55, 50, 51, 56, 51, 102, 57, 45, 97, 52, 100, 56, 45, 52, 99, 100, 50, 45, 97, 49, 55, 49, 45, 48, 57, 100, 48, 100, 53, 50, 51, 99, 50, 54, 51], value={MessageId: b72383f9-a4d8-4cd2-a171-09d0d523c263,ReceiptHandle: AQE==,MD5OfBody: 38db8cbd101e4c1cfbf47e31c2aaab75,Body: {"test-key": "test-value"},Attributes: {SentTimestamp=1641794775474},MessageAttributes: {requestTimeMsSinceEpoch={StringValue: 1641794824800,StringListValues: [],BinaryListValues: [],}}}}. Encoding was rO2Mw.

其中相同的代码在apache束2.31.0中工作没有任何问题。我在这里错过了什么?

共有2个答案

施彦
2023-03-14

Beam 2.34.0中的输入/输出比Beam 2.31.0复杂得多。对于Beam 2.34.0,deleteBatch逻辑根据飞行状态过滤要删除的消息。但是,在扩展逻辑中有一些假设,其中飞行状态被修改为排除假定已过期或即将过期的消息。这些消息不是由I/O明确请求从sqs中删除,也不是由I/O本身删除(I/O可能正在处理一条本应过期等待重新发送的消息)。

已存档https://issues.apache.org/jira/browse/BEAM-13627.

虽然我不确定在同一捆绑包中使用新的接收句柄再次拉取同一条消息是否会导致突变检测问题,因为接收句柄是消息哈希代码的一部分,除非突变检测器中存在哈希冲突。

TL;DR:调试过程

突变是在SqsUnboundedSource中检测到的,不是由管道中的任何其他代码引起的。

此处是报告警告和引发异常的代码。

唯一更改的字段是收据句柄。此处记录了:

如果您多次收到消息,则每次收到消息时,您都会得到不同的接收句柄。请求删除邮件时,必须提供最近收到的回执句柄(否则,邮件可能不会被删除)。

Beam 2.31.0和Beam 2.34.0之间没有aws\U java\U sdk\u版本更改。所以AWS SDK不应该是罪魁祸首。

SqsUnboundedReader的波束2.31.0和波束2.34.0之间有显著变化。

要多次接收消息,该消息必须在第一次接收后未被删除。删除逻辑在SqsCheckpoint Mark中调用。

李洋
2023-03-14

此问题似乎是由不确定的编码器(SerializableCoder.of(Message.class))以及在批处理模式下使用SQS阅读器引起的。批处理模式是使用BoundedReadFromUnboundedSource实现的,这已知会导致问题。不鼓励使用它。

您可以按照BEAM-13631了解修复SQS消息编码器的进度。

目前,我无法告诉您2.31和2.34之间的哪些更改触发了该问题。但可能不是SQS IO本身的更改。我将继续进一步调查并希望稍后提供更新。

现在,我建议尝试几件事:

>

  • 首先,尽量避免使用批处理模式(因此既不设置maxNumRecords也不设置maxReadTime)。我很有信心这解决了您的问题。

    自从最近版本的Beam以来,AWSSDKv2beam-sdks-java-io-amazon-web-services2有一个单独的模块(因此我上面的问题)。它使用自定义消息类进行传输,而不是AWSSDK,编码应该是确定性的。但是,当我最近开始研究SDKv2 IO时,我注意到它上的其他一些错误:重试无效的收据句柄,SQS客户端关闭得太早。

    如果其中任何一个有帮助,请告诉我。

  •  类似资料:
    • 我在代码上收到一条错误消息,以查找支付200美元佣金的员工的总工资。一旦输入了所有员工的总销售额,就应该打印出属于每个不同薪酬类别的员工销售额。下面是代码: 这是我收到的确切错误消息: 我相信这与双重转换有关,但我不确定这有什么问题?有没有人能帮我搞清楚哪里出问题了(它编译没有错误)?我也尝试过只有双精度(包括数组),但这并没有解决问题。

    • 我正在处理一个非常简单的point类,但我得到了一个错误,我无法确定字符串/双值问题发生的位置或如何修复它。 编辑 我忘记添加我正在接收的错误:

    • 问题内容: 我正在Ubuntu 16.04上使用最新版本的Elasticsearch,但在将数据放到上面时遇到了一个小问题。 这是我的json文档(相关部分) 这是当我尝试“ PUT http:// localhost:9200 / aws ” 时从ES返回的响应 在我看来,ES认为“ clockSpeed”是某种设置…?我希望使用动态映射来加快此过程,而不是先映射所有文档,然后将其导入ES。 有

    • 在Java jar上运行JUnit5测试并加载依赖项时,会出现警告 当我去看dumpstream时,它充满了评论,比如: 解决了依赖项加载问题,但未解决损坏的流。

    • 我可能还不够清楚--情况是,我的现有代码不支持异步,我希望使用新的库,如System.net.http和只支持异步方法的AWS SDK。因此,我需要弥补这一差距,并能够拥有可以同步调用的代码,然后可以在其他地方调用异步方法。 我读了很多书,有很多次有人问这个问题,也有人回答这个问题。 从非异步方法调用异步方法