当前位置: 首页 > 知识库问答 >
问题:

Hazelcast Jet“mapFn”必须是可序列化错误

仲孙经赋
2023-03-14

你好,我在从github到我的项目中实现一些代码源时遇到了问题,所以。。。我正在尝试构建一个管道,然后创建StreamStage来读取源代码,并使用项目中的方法构建一个queuesink,我总是遇到同样的错误“线程中的异常”main“java.lang.IllegalArgumentException:”mapFn“必须可序列化”

我正在阅读关于Hazelcast Jet序列化的文档,一切似乎都很完美,我只是不知道项目内部有什么问题

这是属性和常量:

public class FraudDetectionRun implements Serializable{

private final static ILogger log = Logger.getLogger(FraudDetectionRun.class);

private static final String TXN_QUEUE_ID = PropertiesLoader.TXN_QUEUE_ID;

private static final String ACCOUNT_MAP = PropertiesLoader.ACCOUNT_MAP;
private static final String MERCHANT_MAP = PropertiesLoader.MERCHANT_MAP;
private static final String RULESRESULT_MAP = PropertiesLoader.RULESRESULT_MAP;

private HazelcastInstance clientInstance;
private JetInstance jet;

private static MerchantRuleEngine merchantRuleEngine;
private static HistoricalDataRuleEngine historicalRuleEngine;

private IMap<String, Merchant> merchantMap;
private IMap<String, Account> accountMap;

public static void main(String[] args) {
    new FraudDetectionRun().start();
}

这是主要代码

`私有void start(){init();

    Pipeline p = buildPipeline();
    

    JobConfig jobConfig = new JobConfig();
    jobConfig.setName("Fraud Detection Job");

    
   
    
    jet.newJobIfAbsent(p, jobConfig);
}

private Pipeline buildPipeline() {
    Pipeline p = Pipeline.create();
    
    
   
    StreamStage<Transaction> transaction = p.readFrom(buildQueueSource())
            .withoutTimestamps()
            .map(restValue -> transformToTransaction(restValue))
            .setName("This is where we transfor the RestValue into a Transaction");
    
    StreamStage<Transaction> appliyingMerchantRules = transaction.map(index -> applyMerchantRules(index))
            .setName("We search for the method Apply Merchant to apply the rules to the transaction");
    
    StreamStage<Transaction> rulesIntoTransaction = appliyingMerchantRules.map(index -> applyHistoricalTxnRules(index))
            .setName("Apply Historical transactions rules");
    
    rulesIntoTransaction.writeTo(Sinks.map(RULESRESULT_MAP, Transaction::getTransactionId, Transaction::getRulesResult));
    rulesIntoTransaction.writeTo(buildQueueSink());
    log.info(p.toDotString());
    return p;
}

private String transformResultsToString(Transaction txn) {
    RulesResult result = txn.getRulesResult();
    return "txnID: "+txn.getTransactionId()+" "+result.getMerchantRisk()+" "+result.getTransactionRisk();
}

private Transaction applyHistoricalTxnRules(Transaction txn) {
    log.info("Applying rules on historical data");
    historicalRuleEngine.apply(txn, accountMap.get(txn.getAccountNumber()).getHistoricalTransactions());
    return txn;
}

private Transaction applyMerchantRules(Transaction txn) {
    log.info("Applying merchant rules");
    merchantRuleEngine.apply(txn, merchantMap.get(txn.getMerchantId()));
    return txn;
}

private static Transaction transformToTransaction(RestValue restValue) {
    log.info("Applying transformToTransaction");
    return TransactionBuilderUtil.transformToTransaction(new String(restValue.getValue()));
}

private Sink<? super Transaction> buildQueueSink() {
    
     return SinkBuilder.sinkBuilder("queueSink",
             jet -> jet.jetInstance().getHazelcastInstance().<String>getQueue("sink-queue"))
             .<Transaction>receiveFn( (queue, txn)-> queue.add(transformResultsToString(txn)))
             .build();
}

private StreamSource<RestValue> buildQueueSource() {
    StreamSource<RestValue> source = SourceBuilder.<QueueContext<RestValue>>stream(TXN_QUEUE_ID, c -> new QueueContext<>(c.jetInstance().getHazelcastInstance().getQueue(TXN_QUEUE_ID)))
            .<RestValue>fillBufferFn(QueueContext::fillBuffer)
            .build();

    return source;
}

static class QueueContext<T> extends AbstractCollection<T> {
    static final int MAX_ELEMENTS = 1024;
    IQueue<T> queue;
    SourceBuilder.SourceBuffer<T> buf;
    QueueContext(IQueue<T> queue) {
        this.queue = queue;
    }

    void fillBuffer(SourceBuilder.SourceBuffer<T> buf) {
        this.buf = buf;
        queue.drainTo(this, MAX_ELEMENTS);
    }
    @Override
    public boolean add(T item) {
        buf.add(item);
        return true;
    }
    @Override
    public Iterator<T> iterator() {
        throw new UnsupportedOperationException();
    }
    @Override
    public int size() {
        throw new UnsupportedOperationException();
    }
}`

程序总是抛出这个错误

Exception in thread "main" java.lang.IllegalArgumentException: "mapFn" must be serializable
    at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:188)
    at com.hazelcast.jet.impl.pipeline.ComputeStageImplBase.attachMap(ComputeStageImplBase.java:146)
    at com.hazelcast.jet.impl.pipeline.StreamStageImpl.map(StreamStageImpl.java:87)
    at com.hazelcast.certification.control.FraudDetectionRun.buildPipeline(FraudDetectionRun.java:115)
    at com.hazelcast.certification.control.FraudDetectionRun.start(FraudDetectionRun.java:91)
    at com.hazelcast.certification.control.FraudDetectionRun.main(FraudDetectionRun.java:50)
Caused by: java.io.NotSerializableException: com.hazelcast.client.impl.proxy.ClientMapProxy
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
    at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
    at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
    at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
    at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
    at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
    at com.hazelcast.jet.impl.util.Util.checkSerializable(Util.java:186)
    ... 5 more

看来问题就出在这里

StreamStage<Transaction> appliyingMerchantRules = transaction.map(index -> applyMerchantRules(index))

出于某种原因,你能告诉我我做错了什么吗?

共有1个答案

袁开宇
2023-03-14

您的appyMerchant规则方法是非静态的,因此lambda捕获了封闭的this实例。似乎您已将FraudDetsionRun序列化以尝试修复此问题,但您更应该将appyMerchant规则设为静态。

 类似资料:
  • 我正在分析SonarQube 5.1版的Java SE 7项目。 然后,我在下面的代码中面对squid:S1948。 “可序列化”类中的字段应该是瞬态的或可序列化的 Serializable类中的字段本身必须是可序列化的或瞬态的,即使该类从未显式序列化或反序列化。这是因为在负载下,大多数J2EE应用程序框架都会将对象刷新到磁盘,并且具有非瞬态、不可序列化数据成员的所谓可序列化对象可能会导致程序崩溃

  • 问题内容: Java Bean是否必须实现接口? 问题答案: 这是Javabeans规范中描述的“典型”功能之一。 这是第 2.1 章的摘录 什么是bean? 各个Java Bean支持的功能会有所不同,但是区分Java Bean的典型统一功能是: 支持“自省”,以便构建器工具可以分析bean的工作方式 支持“自定义”,以便在使用应用程序构建器时,用户可以自定义Bean的外观和行为。 支持“事件”

  • 我正在尝试让一个客户与电子邮件和电话有一对多的关系。 这是我的连载器 我无法指出为什么这篇文章不会继续,因为它一直说客户是必需的。 虽然我通过添加read_only=True使其工作,但即使我在ClientSerializer中定义了一个def create(self,validated_数据),我也无法在json数据中获得电子邮件和电话数组。 这就是我在views.py中调用序列化程序的方式 这

  • 问题内容: 如何在PostgreSQL中定义一列,以使每个值都必须按顺序排列,而不是使用类型时获得的顺序,而不能使值2插入,除非该列中已存在值1才能插入该值? 问题答案: 从理论上讲,您可以使用像这样工作的约束。(但是实际上它是行不通的。) 计算行数。 评估。 比较结果。 在创建CHECK约束之前,您可能必须插入一行。如果您不这样做,则max(column)将返回NULL。一排 计算行数(1)。

  • 问题内容: var firstName: String = “John Appleseed” if let name = firstName { print (“Hello, (name)”) } 第二行出现字符串错误:条件绑定的初始化程序必须具有可选类型,而不是’String’ 如何决定使用可选变量还是非可选变量? 问题答案: 首先,让我们考虑一下构造的含义。当你写 你告诉斯威夫特你想 尝试展开

  • 问题内容: 为什么Kotlin对此抱怨: 编译器抱怨在Line中由处理程序再次发布。这在纯Java中确实有效: 问题答案: Kotlin认为一个属性在其初始化程序结束之前尚未初始化,因此即使在lambda中也无法在其自己的初始化程序中使用该属性。这种语义类似于其初始化程序内部局部变量使用的限制。 有几种解决方法: 使用对象表达式可以引用已声明的对象: } 这仅适用于接口作为lambda的替代品,并