当前位置: 首页 > 知识库问答 >
问题:

Java Watch服务的可靠替代方案

孔阎宝
2023-03-14

我正在使用Java nio的WatchService,但我发现它在以下用例中非常不可靠:

>

  • 当一个很大的文件(

    但是,当数千个小型(约2Kb)文件被复制到监视目录时,其中80%不会调用ENTRY_CREATE或ENTRY_MODIFY!

    还有其他人面对过吗?还有更好更可靠的lib吗?或者我应该简单地切换到阻塞队列实现,文件复制器将文件名添加到队列中,消费者线程处理文件?

    WatchService实现的代码:

    
                WatchService watchService = FileSystems.getDefault().newWatchService();
                Path path = Paths.get( coreProperties.getStagingLocation() );
                path.register( watchService,
                        new WatchEvent.Kind[] { ENTRY_MODIFY, ENTRY_CREATE },
                        SensitivityWatchEventModifier.MEDIUM
                );
    
                WatchKey key;
                while ( ( key = watchService.take() ) != null ) {
                    for ( WatchEvent<?> event : key.pollEvents() ) {
                        log.info( "Event kind: {} . File affected: {}.", event.kind(), event.context() );
                // Processing the file..
                    }
                    key.reset();
                }
    
     
    
  • 共有1个答案

    严扬
    2023-03-14

    根据DuncG和Jim Garrison的有益评论,我意识到Watch Service对每个通知的处理时间很敏感。我正在将6416个文件复制到它正在查看的文件夹中,如果我除了记录ENTRY_XX事件之外还做了什么,那么它在许多更新中都丢失了。

    以下是对我有用的:

    1. 在处理ENTRY_XX事件时,我将其写入一个LMAX中断器,该中断器的环形缓冲区大小高于批量中预期的最大文件数(我将其设置为2^19,即524288个插槽,并且它足以在不阻塞的情况下处理50k或更多文件更新,假设文件将有10个监视服务通知)

    [PS:由于线程同步的延迟,写入一个简单的ExecutorService队列没有帮助。我只得到了6416中的1273个文件名!]。

    示例代码:

    // publishing file names from watch service event to Disrupto ring buffer
     
        private void watchStagingFolder() {
            try {
                WatchService watchService = FileSystems.getDefault().newWatchService();
                Path path = Paths.get( coreProperties.getStagingLocation() );
                path.register( watchService,
                        new WatchEvent.Kind[] { ENTRY_MODIFY, ENTRY_CREATE },
                        SensitivityWatchEventModifier.HIGH
                );
    
                WatchKey key;
                while ( ( key = watchService.take() ) != null ) {
                    log.info( "key found: {}", key );
                    for ( WatchEvent<?> event : key.pollEvents() ) {
                        String eventKindStr = event.kind().name();
                        log.info( "Event kind: {} . File affected: {}", eventKindStr, event.context() );
                        if ( event.kind().equals( ENTRY_CREATE ) || event.kind().equals( ENTRY_MODIFY ) ) {
                            String fileName = event.context().toString();
                            log.info( "File to be processed: {}", fileName );
                            fileProcessorDisruptorEventProducer.send( fileName );
                        } else {
                            log.info( "Ignoring event kind {}", event.kind() );
                        }
                    }
                    key.reset();
                }
            } catch ( Exception e ) {
                log.error( "Found error while watching the staging directory.", e );
            }
        }
    
    // ensuring Disruptor ring buffer is warmed up
    @Component
    @RequiredArgsConstructor
    @Slf4j
    public class DisruptorWarmer {
        public static final String TEST_FILE_NAME = "TEST_FILE_NAME";
        private final CoreProperties coreProperties;
        private final FileProcessorDisruptorEventProducer fileProcessorDisruptorEventProducer;
    
        @PostConstruct
        public void init() {
            int bufferSize = coreProperties.getDisruptor().getBufferSize();
            for ( int i = 0; i < bufferSize; i++ ) {
                fileProcessorDisruptorEventProducer.send( TEST_FILE_NAME );
            }
            log.info( "Warmed up disruptor with {} test messages.", bufferSize );
        }
    }
    
    // processing files in the Disruptor consumer/handler
        @Override
        public void onEvent( Msg msg, long l, boolean b ) {
            try {
                if ( count < bufferSize ) {
                    log.debug( "Disruptor warming up. Count: {}. Ignoring msg: {}", count, msg.getPayload() );
                    count++;
                } else if ( count == bufferSize ) {
                    log.info( "Disruptor warmed up now with {} test messages.", count + 1 );
                    newSingleThreadExecutor.submit( () ->
                            applicationEventPublisher.publishEvent( new FileProcessorDisruptorReadyEvent( this, "Disruptor warmed up." ) )
                    );
                    count++;
                } else {
                    log.debug( "File: {}", msg.getPayload() );
    ############ 
    // no longer worried about slow processing impacting watch service
                    processFile( ( String ) msg.getPayload() );
    ############
                }
            } catch ( RuntimeException rte ) {
                log.error( "Found error html" target="_blank">while processing msg: [{}]. Skipping to next message.", msg, rte );
            }
        }
    
     类似资料:
    • 问题内容: 如何确保我的文件服务可靠且可扩展?它可以处理多少个并行请求? 我正在考虑超出硬件功能和带宽。 我正在关注在JavaServlet中流式传输大文件 问题答案: 如果这些是静态文件,则 直接 链接到它。所有不错的servlet容器/应用服务器都有完善的。如果这些是静态文件,这些静态文件位于您要链接到的Web应用程序 之外 ,那么您也可以仅将这些文件的根文件夹添加为另一个上下文。目前还不清楚

    • 我们继续看到Azure服务总线的不稳定性,并正在寻找替代方案。理想情况下,我们希望能够在Windows 2012 R2域中本地运行,并拥有许多与Azure SB相同的功能集。我们已经查看了Windows 1.1的服务总线,但是该产品已经有一段时间没有更新了,我们也不确定它的未来。我们使用C#,因此理想情况下会有一个客户端API/包装器,使与现有应用程序的集成相对容易。完全可以接受开源和自由软件。:

    • https://kubernetes.io/docs/concepts/services-networking/Ingress/中的入口示例显示,为了指定bancked入口,我们必须指定servicePort和serviceName,例如: 我希望创建具有随机端口的服务,并避免在入口定义中指定它们。有没有ServicePort的替代方案?也许使用服务中分配的端口名称或targetPort?

    • 来自环境端点的默认JSON格式对于Spring应用程序的消费是完美的,因为它直接映射到Environment抽象。如果您喜欢,可以通过向资源路径(“.yml”,“.yaml”或“.properties”)添加后缀来使用与YAML或Java属性相同的数据。这对于不关心JSON端点的结构的应用程序或其提供的额外的元数据的应用程序来说可能是有用的,例如,不使用Spring的应用程序可能会受益于此方法的简

    • 如果服务帐户对我没有帮助,我有什么选择来存储2TB或更多的数据?我应该另找方法存储这些文件吗? 我想继续使用谷歌。如果没有任何选择使用谷歌驱动器API,请,为这个场景提出任何其他建议。

    • 问题内容: 我知道比较两个时会有一些问题。看来这是一个更好的方法。好吧,我正在进行JUnit测试,我倾向于使用。这是断言两个字符串包含相同内容的可靠方法吗?我会使用,但是那样的话您就看不到预期的和实际的失败值。 在相关说明中,是否有人链接到页面或线程,这些链接清楚地说明了问题所在? 问题答案: 在Java中进行比较时应 始终 使用。 JUnit调用该方法以确定方法中的相等性。 因此,您绝对可以放心