当前位置: 首页 > 知识库问答 >
问题:

将ByteBuffer字段写入Cassandra时出现异常

郁隐水
2023-03-14

我们有一个apache beam数据流作业,该作业从大查询中读取数据,并在使用Datastax驱动程序写入Cassandra之前将其转换为POJO。我最近在表中添加了一个新的blob列,并在POJO中添加了一个ByteBuffer字段。

我是如何创建ByteBuffer的

String str = objectMapper.writeValueAsString(installSkuAttributes);
byte[] bytes = str.getBytes( StandardCharsets.UTF_8 );
pojo.setInstallAttributes(ByteBuffer.wrap(bytes));

这是管道片段公共空执行管道()抛出异常{

Pipeline pipeline = 
Pipeline.create(jobMetaDataBean.getDataflowPipelineOptions());

。。。。writeDataToCassandra(installSkuData);

pipeline.run();

在对DAO编写器进行了必要的更改之后,我在运行作业时遇到了以下异常。我正在使用Datastax驱动程序

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Forbidden IOException when writing to OutputStream
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed.executePipeline(InstallSkusFullFeed.java:216)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed.main(InstallSkusFullFeed.java:221)
Caused by: java.lang.IllegalArgumentException: Forbidden IOException when writing to OutputStream
    at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:88)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:117)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:242)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed$PrepareDataForCassandraWrite.processElement(InstallSkusFullFeed.java:160)
Caused by: java.io.NotSerializableException: java.nio.HeapByteBuffer
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:166)
    at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:52)
    at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:99)
    at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
    at org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
    at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:148)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:117)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:242)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed$PrepareDataForCassandraWrite.processElement(InstallSkusFullFeed.java:160)
    at com.homedepot.productassortment.fullfeed.dataflow.InstallSkusFullFeed$PrepareDataForCassandraWrite$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:185)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:149)
    at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:78)
    at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:189)
    at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:55)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:161)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:125)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

共有1个答案

全丰
2023-03-14

真正的问题在这里:

java.io.NotSerializableException:java.nio.HeapByteBuffer

这是因为ByteBuffer不可序列化,因此不能用于分布式工作。您可以通过直接使用字节[]作为属性来避免这种情况,或者在字节缓冲区周围实现一个可序列化的包装器。

 类似资料:
  • 我正在建立一个自定义的文件创建与Android应用程序。我试图将Bytebuffer的内容(它是我创建的自定义类中的字符串成员)写入字节类型的文件。但是,每当我这样做时,我得到的是字符串格式的文件内容。我尝试了几种替代方法,例如使用get方法、BufferedOutputStream类、ByteArrayOutputStream类、DataOutputStream类、Filechannel类等。以

  • 我通过选择web、安全、验证和spring-boot Cassandra的几个依赖项下载了spring初始化器项目。当我尝试运行./gradlew bootrun时,我在启动过程中遇到以下异常

  • 问题内容: 我正在尝试使用PrintStream在Java文件中写入0xFF。当我使用十六进制编辑器打开其他值时,会将其他值正确写入文件,但是假定显示0xFF的值却改为0xC3BF。 使用的变量类型为int。经过几次尝试后,我还发现我可以输入的“最大值”值为0x7F,这将在十六进制编辑器中正确显示,如果我输入0x80,则十六进制编辑器将显示0xC280。 怎么了? 问题答案: 进行一些调查后发现,

  • 我正在尝试将50000条记录插入到一个五节点的卡桑德拉集群中。我正在使用执行异步以提高性能(减少应用程序端的插入时间)。我尝试了具有多个批处理大小的批处理语句,但每次我都遇到以下异常。 我插入了数据,即10000,20000到40000条记录,没有任何问题。以下是我编写的java代码。 其中ps是准备好的语句,batches是批数,batchSize是批中的记录数。 我无法理解这个问题的根本原因。

  • 我正在尝试编写一段代码来索引搜索一个值,如果它存在于数组中。以下是代码的样子: 每次我运行代码时,在将值输入数组时,它都会给我这个空指针异常:线程“main”java中的异常。lang.NullPointerException:无法存储到int数组,因为在indexSearch处“[]”为null。main(indexSearch.java:22) 为什么会这样,我该如何修复它?

  • 我们最近开始了使用Scala、Spark和Cassandra的大数据项目,我对所有这些技术都是新手。我试图做简单的任务写到和读从卡桑德拉表。如果将属性名和列名都保留为小写或snake大小写(unserscores)就可以实现这一点,但我希望在scala代码中使用camel大小写。在Scala中使用camel case格式,在Cassandra中使用snake case格式,有没有更好的方法来实现这