监控S3上传的实时数据

淳于涛
2023-12-01

如果您正在研究大数据及其最新技术(如Hadoop等),那么您需要的首要工作是“数据集”。 因此,此数据可以是评论,博客,新闻,社交媒体数据(Twitter,Facebook等),特定领域数据,研究数据,论坛,组,提要,消防水带数据等。通常,公司会与数据供应商联系以获取此类数据一种数据。

通常,这些数据供应商将数据转储到共享服务器类型的环境中。 为了使我们可以将此数据用于MapReduce的处理,我们将它们首先移动到S3进行存储,然后再进行处理。 假设数据属于Twitter或Facebook之类的社交媒体,则可以根据日期格式目录转储数据。 大多数情况下,其实践。

还假设每天140-150GB的数据以2013/04/15这样的层次结构进行转储,即 yyyy / mm / dd格式,数据流,你如何

  • 将它们以相同的层次结构上传到s3到给定存储桶?
  • 监视新的传入文件并上传?
  • 有效地节省磁盘空间?
  • 确保上传到s3的可靠性?
  • 是否启用了日志记录以清除日志?
  • 重试失败的上传?

当我想自动上载到S3时,这些问题浮现在我脑海。 另外,我希望人工干预为0或至少最少!

所以,我想出了

非常感谢! 这帮助我进行了监视部分,效果很好!

  • 我自己的一些脚本。

有哪些成分?

  • 安装s3sync。 我在这里只使用了一个s3cmd脚本,而不是真正的s3sync。 可能在将来-所以我有这个。
Install Ruby from the repository
$ sudo apt-get install ruby libopenssl-ruby
Confirm with the version
$ ruby -v
 
Download and unzip s3sync
$ wget http://s3.amazonaws.com/ServEdge_pub/s3sync/s3sync.tar.gz
$ tar -xvzf s3sync.tar.gz
 
Install the certificates.
$ sudo apt-get install ca-certificates
$ cd s3sync/
 
Add the credentials to the s3config.yml for s3sync to connect to s3.
$ cd s3sync/
$ sudo vi s3config.yml
aws_access_key_id: ABCDEFGHIJKLMNOPQRST
aws_secret_access_key: hkajhsg/knscscns19mksnmcns
ssl_cert_dir: /etc/ssl/certs
 
Edit aws_access_key_id and aws_secret_access_key to your own credentials.
  • 安装Watcher。
  • Goto https://github.com/greggoryhz/Watcher
    Copy https://github.com/greggoryhz/Watcher.git to your clipboard
    Install git if you have not
     
    Clone the Watcher
    $ git clone https://github.com/greggoryhz/Watcher.git
    $ cd Watcher/
  • 我自己的包装器脚本。
  • 克朗
  • 接下来,准备好环境设置,然后进行一些常见的“假设”。

    • 转储的数据将位于/ home / ubuntu / data /-从那里可能是2013/04/15,例如。
    • s3sync位于/ home / ubuntu
    • 监视程序存储库位于/ home / ubuntu

    弄脏我们的手...

    • 转到Watcher并设置要监视的目录和要执行的相应操作。
    $ cd Watcher/
    Start the script,
    $ sudo python watcher.py start
    This will create a .watcher dirctory at /home/ubuntu
    Now,
    $ sudo python watcher.py stop
    
    Goto the .watcher directory created and 
    set the destination to be watched for and action to be undertaken
    in jobs.yml ie. watch: and command:
    
    # Copyright (c) 2010 Greggory Hernandez
    
    # Permission is hereby granted, free of charge, to any person obtaining a copy
    # of this software and associated documentation files (the "Software"), to deal
    # in the Software without restriction, including without limitation the rights
    # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    # copies of the Software, and to permit persons to whom the Software is
    # furnished to do so, subject to the following conditions:
    
    # The above copyright notice and this permission notice shall be included in
    # all copies or substantial portions of the Software.
    
    # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
    # THE SOFTWARE.
    
    # ---------------------------END COPYRIGHT--------------------------------------
    
    # This is a sample jobs file. Yours should go in ~/.watcher/jobs.yml
    # if you run watcher.py start, this file and folder will be created
    
    job1:
      # a generic label for a job.  Currently not used make it whatever you want
      label: Watch /home/ubuntu/data for added or removed files
    
      # directory or file to watch.  Probably should be abs path.
      watch: /home/ubuntu/data
    
      # list of events to watch for.
      # supported events:
      # 'access' - File was accessed (read) (*)
      # 'atrribute_change' - Metadata changed (permissions, timestamps, extended attributes, etc.) (*)
      # 'write_close' - File opened for writing was closed (*)
      # 'nowrite_close' - File not opened for writing was closed (*)
      # 'create' - File/directory created in watched directory (*)
      # 'delete' - File/directory deleted from watched directory (*)
      # 'self_delete' - Watched file/directory was itself deleted
      # 'modify' - File was modified (*)
      # 'self_move' - Watched file/directory was itself moved
      # 'move_from' - File moved out of watched directory (*)
      # 'move_to' - File moved into watched directory (*)
      # 'open' - File was opened (*)
      # 'all' - Any of the above events are fired
      # 'move' - A combination of 'move_from' and 'move_to'
      # 'close' - A combination of 'write_close' and 'nowrite_close'
      #
      # When monitoring a directory, the events marked with an asterisk (*) above
      # can occur for files in the directory, in which case the name field in the
      # returned event data identifies the name of the file within the directory.
      events: ['create', 'move_from', 'move_to']
    
      # TODO:
      # this currently isn't implemented, but this is where support will be added for:
      # IN_DONT_FOLLOW, IN_ONESHOT, IN_ONLYDIR and IN_NO_LOOP
      # There will be further documentation on these once they are implmented
      options: []
    
      # if true, watcher will monitor directories recursively for changes
      recursive: true
      
      # the command to run. Can be any command. It's run as whatever user started watcher.
      # The following wildards may be used inside command specification:
      # $$ dollar sign
      # $watched watched filesystem path (see above)
      # $filename event-related file name
      # $tflags event flags (textually)
      # $nflags event flags (numerically)
      # $dest_file this will manage recursion better if included as the dest (especially when copying or similar)
      #     if $dest_file was left out of the command below, Watcher won't properly
      #     handle newly created directories when watching recursively. It's fine
      #     to leave out when recursive is false or you won't be creating new
      #     directories.
      # $src_path is only used in move_to and is the corresponding path from move_from
      # $src_rel_path [needs doc]
      command: sudo sh /home/ubuntu/s3sync/monitor.sh $filename
  • 创建一个名为monitor.sh的脚本,以将其上传到s3sync目录中的s3,如下所示。
    • 您可能要更改的变量是monitor.sh中“ s3path”中的s3bucket路径
    • 该脚本将以减少的冗余存储格式上载观察者脚本检测到的新传入文件。 (您可以删除标题,只要您不希望以RRS格式存储)
    • 该脚本将调用s3cmd ruby​​脚本以递归方式上传,从而保持层次结构。 yyyy / mm / dd格式的文件*。*
    • 它将删除从本地路径成功上传到s3的文件-以节省磁盘空间。
    • 该脚本不会删除该目录,因为另一个脚本re-upload.sh将处理该目录,该脚本充当失败上传的备份,以将其再次上传到s3。
    Goto s3sync directory
    $ cd ~/s3sync
    $ sudo vim monitor.sh
    
    #!/bin/bash
    ##...........................................................##
    ## script to upload to S3BUCKET, once the change is detected ##
    ##...........................................................##
    
    
    ## AWS Credentials required for s3sync ##
    export AWS_ACCESS_KEY_ID=ABCDEFGHSGJBKHKDAKS
    export AWS_SECRET_ACCESS_KEY=jhhvftGFHVgs/bagFVAdbsga+vtpmefLOd
    export SSL_CERT_DIR=/etc/ssl/certs
    
    #echo "Running monitor.sh!"
    echo "[INFO] File or directory modified = $1 "
    
    ## Read arguments
    PASSED=$1
    
    # Declare the watch path and  S3 destination path
    watchPath='/home/ubuntu/data'
    s3path='bucket-data:'
    
    # Trim watch path from PASSED
    out=${PASSED#$watchPath}
    outPath=${out#"/"}
    
    echo "[INFO] ${PASSED} will be uploaded to the S3PATH : $s3path$outPath"
    
    if   [ -d "${PASSED}" ]
    then  echo "[SAFEMODE ON] Directory created will not be uploaded, unless a file exists!"
    elif [ -f "${PASSED}" ]
    then ruby /home/ubuntu/s3sync/s3cmd.rb --ssl put $s3path$outPath ${PASSED}  x-amz-storage-class:REDUCED_REDUNDANCY; #USE s3cmd : File
    else echo "[ERROR] ${PASSED} is not valid type!!";
         exit 1
    fi
    
    RETVAL=$?
    [ $RETVAL -eq 0 ] && echo "[SUCCESS] Upload successful! " &&
    if   [ -d "${PASSED}" ]
    then echo "[SAFEMODE ON] ${PASSED} is a directory and its not deleted!";
    elif [ -f "${PASSED}" ]
    then sudo rm -rf ${PASSED}; echo "[SUCCESS] Sync and Deletion successful!";
    fi
    
    [ $RETVAL -ne 0 ] && echo "[ERROR] Synchronization failed!!"
  • 创建一个名为re-upload.sh的脚本,它将上传失败的文件。
    • 该脚本可确保从monitor.sh遗留的文件(失败的上传-机会很少。由于各种原因,每天可能为2-4个文件。)将以相同的方式再次上传到s3。 RRS格式的层次结构。
    • 成功上传后,删除文件,并删除目录(如果为空)。
    Goto s3sync directory.
    $ cd s3sync
    $ sudo vim re-upload.sh
     
    #!/bin/bash
    ##.........................................................##
    ## script to detect failed uploads of other date directories
    ## and re-try ##
    ##.........................................................##
     
    ## AWS Credentials required for s3sync ##
    export AWS_ACCESS_KEY_ID=ABHJGDVABU5236DVBJD
    export AWS_SECRET_ACCESS_KEY=hgvgvjhgGYTfs/I5sdn+fsbfsgLKjs
    export SSL_CERT_DIR=/etc/ssl/certs
     
    # Get the previous date
    today_date=$(date -d "1 days ago" +%Y%m%d)
    year=$(date -d "1 days ago" +%Y%m%d|head -c 4|tail -c 4)
    month=$(date -d "1 days ago" +%Y%m%d|head -c 6|tail -c 2)
    yday=$(date -d "1 days ago" +%Y%m%d|head -c 8|tail -c 2)
     
    # Set the path of data
    basePath="/home/ubuntu/data"
    datePath="$year/$month/$yday"
    fullPath="$basePath/$datePath"
    echo "Path checked for: $fullPath"
     
    # Declare the watch path and S3 destination path
    watchPath='/home/ubuntu/data'
    s3path='bucket-data:'
     
     
    # check for left files (failed uploads)
    if [ "$(ls -A $fullPath)" ]; then
    for i in `ls -a $fullPath/*.*`
    do
    echo "Left over file: $i";
    if [ -f "$i" ]
    then out=${i#$watchPath};
    outPath=${out#"/"};
    echo "Uploading to $s3path/$outPath";
    ruby /home/ubuntu/s3sync/s3cmd.rb --ssl put $s3path$outPath $i x-amz-storage-class:REDUCED_REDUNDANCY; #USE s3cmd : File
    RETVAL=$?
    [ $RETVAL -eq 0 ] && echo "[SUCCESS] Upload successful! " &&
    sudo rm -rf $i &&
    echo "[SUCCESS] Deletion successful!"
    [ $RETVAL -ne 0 ] && echo "[ERROR] Upload failed!!"
    else echo "[CLEAN] no files exist!!";
    exit 1
    fi
    done
    else
    echo "$fullPath is empty";
    sudo rm -rf $fullPath;
    echo "Successfully deleted $fullPath"
    exit 1
    fi
     
    # post failed uploads -- delete empty dirs
    if [ "$(ls -A $fullPath)" ]; then
    echo "Man!! Somethingz FISHY! All (failed)uploaded files will be deleted. Are there files yet!??";
    echo "Man!! I cannot delete it then! Please go check $fullPath";
    else
    echo "$fullPath is empty after uploads";
    sudo rm -rf $fullPath;
    echo "Successfully deleted $fullPath"
    fi
  • 现在,最肮脏的工作-记录和清理日志。
    • 当watcher.py运行时,可以在〜/ .watcher / watcher.log中找到在monitor.sh中创建的所有“ echo”。
    • 此日志最初对我们有帮助,以后也可能帮助我们回溯错误。
    • 值班–清洁日志的看门人。 为此,我们可以在某些时候使用cron运行脚本。 我有兴趣跑步–每个星期六的8:00 AM
    • 在/ home / ubuntu / s3sync中创建一个脚本来将日志清理为“ clean_log.sh”
  • 时间到了
    $ crontab -e
     
    Add the following lines at the end and save.
     
    # EVERY SATURDAY 8:00AM clean watcher log
    0 8 * * 6 sudo sh /home/ubuntu/s3sync/clean_log.sh
    # EVERYDAY at 10:00AM check failed uploads of previous day
    0 10 * * * sudo sh /home/ubuntu/s3sync/re-upload.sh
  • 搞定! 日志清理会在每个星期六的8:00 AM发生,并且前一天会运行重新上传脚本,以检查文件是否存在并相应地进行清理。
    • 让我们开始脚本
    Goto Watcher repository
    $ cd ~/Watcher
    $ sudo python watcher.py start
     
    This will create ~/.watcher directory and has watcher.log in it,
    when started.

    因此,这可以确保成功上传到S3。

    参考:在我们的JCG合作伙伴 Swathi V的* Techie(S)pArK *博客上监视S3上传实时数据

    翻译自: https://www.javacodegeeks.com/2013/04/monitoring-s3-uploads-for-a-real-time-data.html

 类似资料: