当前位置: 首页 > 知识库问答 >
问题:

Akka+Camel+FTP2+localWorkingDirectory工作不可靠

羊舌琛
2023-03-14

其基本思想是监视FTP目录中的文件,然后生成子执行元来单独处理每个文件。Akka被用于管理并发性和可靠性。父使用者执行元使用noop=true轮询目录,因此它不执行任何操作,然后子使用者执行元应该下载文件,并使用“include”Camel选项进行过滤。重要的是,下载是并发的,并且文件不加载到内存中(因此使用了localWorkDirectory)。

我写了一个简单的复制品:

package camelrepro;

import java.io.InputStream;

import org.mockftpserver.core.command.Command;
import org.mockftpserver.core.command.ReplyCodes;
import org.mockftpserver.core.session.Session;
import org.mockftpserver.core.session.SessionKeys;
import org.mockftpserver.fake.FakeFtpServer;
import org.mockftpserver.fake.UserAccount;
import org.mockftpserver.fake.command.AbstractFakeCommandHandler;
import org.mockftpserver.fake.filesystem.FileEntry;
import org.mockftpserver.fake.filesystem.UnixFakeFileSystem;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
import akka.testkit.JavaTestKit;

public class Main {

    public static class ParentActor extends UntypedConsumerActor {

        public ParentActor() {
            System.out.println("Parent started");
        }
        @Override
        public String getEndpointUri() {
            return "ftp://anonymous@localhost:8021?password=password&readLock=changed&initialDelay=0&delay=200&noop=true";
        }

        @Override
        public void onReceive(Object msg) throws Exception {
            if (msg instanceof CamelMessage) {
                getContext().actorOf(new Props(ChildActor.class), "0");
            } else {
                unhandled(msg);
            }
        }
    }

    public static class ChildActor extends UntypedConsumerActor {

        public ChildActor() {
            System.out.println("Child started");
        }

        @Override
        public String getEndpointUri() {
            return "ftp://anonymous@localhost:8021?password=password&readLock=changed&initialDelay=0&delay=200&include=test.txt&localWorkDirectory=/tmp";
        }

        @Override
        public void onReceive(Object msg) throws Exception {
            if (msg instanceof CamelMessage) {
                System.out.println("Child got message");
                CamelMessage camelMsg = (CamelMessage) msg;

                InputStream source = camelMsg.getBodyAs(InputStream.class, getCamelContext());
                System.out.println(source.getClass().getName());
                System.exit(0);
            } else {
                unhandled(msg);
            }
        }
    }

    public static void main(String[] args) {

        ActorSystem system = ActorSystem.create("default");

        FakeFtpServer ftpServer = new FakeFtpServer();
        UnixFakeFileSystem ftpFileSystem = new UnixFakeFileSystem();
        ftpServer.setFileSystem(ftpFileSystem);
        ftpServer.addUserAccount(new UserAccount("anonymous", "password", "/"));
        ftpServer.setServerControlPort(8021);

        // fix bug in PWD handling (either Apache FTP client or mock server depending on opinion)
        ftpServer.setCommandHandler("PWD", new AbstractFakeCommandHandler() {
            @Override
            protected void handle(Command command, Session session) {
                String currentDirectory = (String) session.getAttribute(SessionKeys.CURRENT_DIRECTORY);
                this.replyCodeForFileSystemException = ReplyCodes.READ_FILE_ERROR;
                verifyFileSystemCondition(notNullOrEmpty(currentDirectory), currentDirectory, "filesystem.currentDirectoryNotSet");
                int replyCode = ReplyCodes.PWD_OK;
                String replyText = String.format("\"%s\" OK", currentDirectory.replaceAll("\"", "\"\""));
                session.sendReply(replyCode, replyText);
            }
        });
        ftpFileSystem.add(new FileEntry("/test.txt", "hello world"));
        ftpServer.start();

        new JavaTestKit(system) {{
            getSystem().actorOf(new Props(ParentActor.class));
        }};
    }
}

显示版本的Maven依赖项:

    <dependencies>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.10</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.10</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-camel_2.10</artifactId>
            <version>2.1.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-testkit_2.10</artifactId>
            <version>2.1.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-ftp</artifactId>
            <version>2.10.3</version>
        </dependency>
        <dependency>
            <groupId>org.mockftpserver</groupId>
            <artifactId>MockFtpServer</artifactId>
            <version>2.4</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-io</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>commons-net</groupId>
            <artifactId>commons-net</artifactId>
            <version>3.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.2</version>
        </dependency>
    </dependencies>
[ERROR] [02/15/2013 10:53:32.951] [default-akka.actor.default-dispatcher-7] [akka://default/user/$a/0] Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory)
org.apache.camel.TypeConversionException: Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory)
    at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:162)
uname -a: Linux 3.2.0-37-generic #58-Ubuntu SMP Thu Jan 24 15:28:10 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux
java: 1.7.0_11-b21

共有1个答案

苍恩
2023-03-14

我已经找到了解决上述问题的方法。

事实上,子使用者autoack()返回true(默认情况下会这样做)。在这种情况下,akka-camel将发送camelmessagefire-and-forget,并继续进行清理。同时,在getBodyAs()调用的类型转换器之一打开InputStream之前,子执行元实际上不会获得打开的InputStream。因此,在子执行元通过getBodyAs()打开文件和Camel cleanup在异步发送消息后删除文件之间存在竞争。

因此,修复方法是重写autoack()返回false,并在子消息处理程序的末尾发送ack.getInstance()(或者new status.failure( ) )。

 类似资料:
  • 骆驼ftp和FTP2有什么区别?我想写一个新的ftpendpoint,但我在这两者之间感到困惑。此外,如果可能的话,请说明哪一个比其他的好,以及为什么。

  • 我试图学习Akka集群下面的教程提供了这里 我已经创建了应用程序和回购是在这里。 正如教程中提到的,我已经启动了FrontEndApp 即使我在2551和2552上启动后端应用程序,上述警告消息也会不断重复。 在2551上启动后端参与者的终端日志。 最后一个日志持续重复。 在2552上启动后端参与者的终端日志。 不确定是什么原因群集节点不能检测到彼此和参与者节点与后端。 我会错过任何设置吗?

  • 我已经设置了一个骆驼路由,错误处理程序和重新交付策略配置如下 我期待以下结果 消息 1 - 在第一次失败时,当我收到来自系统的异常时,处理程序按以下顺序启动 第一次重试 - 30 秒 第二次重试 - 1 分钟 第三次重试 - 1 分钟 ... 第 6 次重试 - 1 分钟 请告知我在这里做错了什么。

  • 我有这条骆驼路线: 这个想法是这样的:我想将 JSON 格式的对象交付给 REST endpoint(所有标头都已正确设置,其余endpoint接收 json 格式)/ 要将对象转换为 JSON 格式,我使用 marshal 并且它可以工作。现在,从httpendpoint返回的响应是java.io.InputStream类型,但我不在乎。我关心的是将身体转换回编组之前的原始对象。在封送名为 PA

  • 我对camel-jpa组件中的“maxMessagesPerPoll”的理解是,当我设置“maxMessagesPerPoll=4”时,每个投票将获取4条记录。但不是每个投票相同的4条记录。我应该得到下4套记录。 但是当我把参数设置为4时,我总是得到相同的前4条记录。如何获得每次投票的下一组后续记录。 这背后的想法——我想分批处理REORD,而不是同时处理所有数据。 只是想有可滚动的-有点。 如何

  • 我正在尝试获得骆驼路线JMS- 下面的例子说明了如果REST服务的服务器出现故障而无法交付route时会发生什么情况。 我得到了正确的例外: 但是消息被消费并从队列中删除。我的假设是使用事务/事务骆驼和AMQ可以解决这个问题并将消息移动到ActiveMQ.DLQ. 我已经阅读了《骆驼行动》第一版的第9章,并在谷歌上搜索,但没有找到任何解决我问题的方法。 我知道我可以创建/定义自己的Transact