当前位置: 首页 > 知识库问答 >
问题:

使用Akka流动态压缩列表[source[ByteString,NotUsed]]

越嘉树
2023-03-14

我有一个source[ByteString,NotUsed]列表,它与来自S3 bucket的文件名成对。这些需要压缩在恒定内存中,并在Play2.6中提供服务。

我的变化主要在这里:

override def onPush(): Unit = {
  val (filepath, source: StreamGenerator) = grab(in)
  buffer.startEntry(filepath)
  val src: Source[ByteString, NotUsed] = source()
  val operation = src.runForeach(bytestring => {
    val byteInputStream = new ByteArrayInputStream(bytestring.toArray)
    emitMultiple(out, fileChunks(byteInputStream, buffer))
  })
  operation.onComplete {
    case _ => buffer.endEntry()
  }
  Await.ready(operation, 5.minute)
}

我知道这是阻塞,但我不确定在这种情况下是否允许。

我如何以安全的方式完成这件事?

override def onPush(): Unit = {
  val (filepath, source: StreamGenerator) = grab(in)
  buffer.startEntry(filepath)
  val stream = source().runWith(StreamConverters.asInputStream(1.minute))
  currentStream = Some(stream)
  emitMultiple(out, fileChunks(stream, buffer), () => buffer.endEntry())
}

下面是我当前图形阶段实现的完整列表:

import java.io.{ByteArrayInputStream, InputStream, OutputStream}

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.{ByteString, ByteStringBuilder}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.util.control.NonFatal

//scalastyle:off
class StreamedZip(bufferSize: Int = 64 * 1024)(implicit ec: ExecutionContext,
                                               mat: ActorMaterializer)
    extends GraphStage[FlowShape[StreamedZip.ZipSource, ByteString]] {

  import StreamedZip._

  val in: Inlet[ZipSource]    = Inlet("StreamedZip.in")
  val out: Outlet[ByteString] = Outlet("StreamedZip.out")
  override val shape          = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with StageLogging {
      private val buffer                             = new ZipBuffer(bufferSize)
      private var currentStream: Option[InputStream] = None

      setHandler(
        out,
        new OutHandler {
          override def onPull(): Unit =
            if (isClosed(in)) {
              if (buffer.isEmpty) completeStage()
              else {
                buffer.close
                push(out, buffer.toByteString)
              }
            } else pull(in)

          override def onDownstreamFinish(): Unit = {
            closeInput()
            buffer.close
            super.onDownstreamFinish()
          }
        }
      )

      setHandler(
        in,
        new InHandler {
          override def onPush(): Unit = {
            val (filepath, source: StreamGenerator) = grab(in)
            buffer.startEntry(filepath)
            val stream = source().runWith(StreamConverters.asInputStream(1.minute))
            emitMultiple(out, fileChunks(stream, buffer), () => { buffer.endEntry() })
          }

          override def onUpstreamFinish(): Unit = {
            println("Updstream finish")
            closeInput()
            if (buffer.isEmpty) completeStage()
            else {
              buffer.close()
              if (isAvailable(out)) {
                push(out, buffer.toByteString)
              }
            }
          }
        }
      )

      private def closeInput(): Unit = {
        currentStream.foreach(_.close)
        currentStream = None
      }

      private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = {
        // This seems like a good trade-off between single-byte
        // read I/O performance and doubling the ZipBuffer size.
        //
        // And it's still a decent defense against DDOS resource
        // limit attacks.
        val readBuffer = new Array[Byte](1024)
        var done       = false

        def result: Stream[ByteString] =
          if (done) Stream.empty
          else {
            try {
              while (!done && buffer.remaining > 0) {
                val bytesToRead = Math.min(readBuffer.length, buffer.remaining)
                val count       = stream.read(readBuffer, 0, bytesToRead)
                if (count == -1) {
                  stream.close
                  done = true
                } else buffer.write(readBuffer, count)
              }
              buffer.toByteString #:: result
            } catch {
              case NonFatal(e) =>
                closeInput()
                throw e
            }
          }

        result.iterator
      }
    }
}

object StreamedZip {
  type ZipFilePath     = String
  type StreamGenerator = () => Source[ByteString, NotUsed]
  type ZipSource       = (ZipFilePath, StreamGenerator)

  def apply()(implicit ec: ExecutionContext, mat: ActorMaterializer) = new StreamedZip()

}

class ZipBuffer(val bufferSize: Int = 64 * 1024) {
  import java.util.zip.{ZipEntry, ZipOutputStream}

  private var builder = new ByteStringBuilder()
  private val zip = new ZipOutputStream(builder.asOutputStream) {
    // this MUST ONLY be used after flush()!
    def setOut(newStream: OutputStream): Unit = out = newStream
  }
  private var inEntry = false
  private var closed  = false

  def close(): Unit = {
    endEntry()
    closed = true
    zip.close()
  }

  def remaining(): Int = bufferSize - builder.length

  def isEmpty(): Boolean = builder.isEmpty

  def startEntry(path: String): Unit =
    if (!closed) {
      endEntry()
      zip.putNextEntry(new ZipEntry(path))
      inEntry = true
    }

  def endEntry(): Unit =
    if (!closed && inEntry) {
      inEntry = false
      zip.closeEntry()
    }

  def write(byte: Int): Unit =
    if (!closed && inEntry) zip.write(byte)

  def write(bytes: Array[Byte], length: Int): Unit =
    if (!closed && inEntry) zip.write(bytes, 0, length)

  def toByteString(): ByteString = {
    zip.flush()
    val result = builder.result
    builder = new ByteStringBuilder()
    // set the underlying output for the zip stream to be the buffer
    // directly, so we don't have to copy the zip'd byte array.
    zip.setOut(builder.asOutputStream)
    result
  }
}

共有1个答案

杜俊爽
2023-03-14

最后我使用了上面的zipbuffer,并使用akka流DSL解决了整个问题。

我的解决方案如下所示:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, SourceShape}
import akka.util.ByteString
import com.company.config.AWS
import org.log4s.getLogger

case class S3StreamingServiceLike(awsConf: AWS, s3Client: S3ClientAlpakka)(
    implicit sys: ActorSystem,
    mat: ActorMaterializer)
    extends S3StreamingService {

  private implicit class ConcatSyntax[T, U](source: Source[T, U]) {
    def ++[TT >: T, NotUsed](that: Source[SourceShape[TT], NotUsed]): Source[Any, U] = //scalastyle:ignore
      source.concat(that)
  }

  private val logger = getLogger

  private sealed trait ZipElement
  private case class FileStart(name: String, index: Int, outOf: Int) extends ZipElement
  private case class FileEnd(name: String, index: Int, outOf: Int)   extends ZipElement
  private case class FilePayload(byteString: ByteString)             extends ZipElement
  private case object EndZip                                         extends ZipElement

  private def payloadSource(filename: String) =
    s3Client.download(awsConf.s3BucketName, filename).map(FilePayload.apply)

  private def fileNameToZipElements(filename: String,
                                    index: Int,
                                    outOf: Int): Source[ZipElement, NotUsed] =
    Source.single(FileStart(filename, index, outOf)) ++
      payloadSource(filename) ++
      Source.single(FileEnd(filename, index, outOf))

  def streamFilesAsZip(filenames: List[String])(forUser: String): Source[ByteString, NotUsed] = {

    val zipBuffer = new ZipBuffer()

    val zipElementSource: Source[ZipElement, NotUsed] =
      Source(filenames.zipWithIndex).flatMapConcat {
        case (filename, index) => fileNameToZipElements(filename, index + 1, filenames.length)
      } ++ Source.single(EndZip)

    zipElementSource
      .map {
        case FileStart(name, index, outOf) =>
          logger.info(s"Zipping file #$index of $outOf with name $name for user $forUser")
          zipBuffer.startEntry(name)
          None
        case FilePayload(byteString) =>
          if (byteString.length > zipBuffer.remaining()) {
            throw new Exception(
              s"Bytestring size exceeded buffer size ${byteString.length} > ${zipBuffer.remaining}")
          }
          zipBuffer.write(byteString.toArray, byteString.length)
          Some(zipBuffer.toByteString())
        case FileEnd(name, index, outOf) =>
          logger.info(s"Finished zipping file #$index of $outOf with $name for user $forUser")
          zipBuffer.endEntry()
          Some(zipBuffer.toByteString())
        case EndZip =>
          zipBuffer.close()
          Some(zipBuffer.toByteString())
      }
      .collect {
        case Some(bytes) if bytes.length > 0 => bytes
      }
  }

}
 类似资料:
  • 我正在尝试使用akka流传输一个文件,在将流的结果提取到Future[String]时遇到了一个小问题: 我得到一个编译错误: 任何人都可以帮助我了解我做错了什么,以及我需要做些什么来提取流的结果?

  • Ziplist 是由一系列特殊编码的内存块构成的列表, 一个 ziplist 可以包含多个节点(entry), 每个节点可以保存一个长度受限的字符数组(不以 \0 结尾的 char 数组)或者整数, 包括: 字符数组 长度小于等于 63 ()字节的字符数组 长度小于等于 16383 () 字节的字符数组 长度小于等于 4294967295 ()字节的字符数组 整数 4 位长,介于 0 至 12 之

  • 问题内容: 伙计们 我正在尝试找到问题的最优雅解决方案,并且想知道python是否为我想做的事情内置了任何东西。 我正在做的是这个。我有一个列表,并且我有一个接受项目并返回列表的函数。我可以使用列表推导来转换所有内容,例如: 但这返回一个列表列表; 我真正想要的是获取扁平化的列表; 现在,其他语言也有它。传统上用函数式编程语言来称呼它,而.Net称它为。python有类似的东西吗?有没有一种巧妙的

  • 在带有lambda b93的JDK8中,b93中有一个类java.util.Stream.streams.zip,可以用来压缩流(这在Dhananjay Nene编写的Exploring Java8 lambda.Part 1教程中进行了说明)。此功能: 然而,在b98中,这种情况消失了。事实上,B98中的java.util.Stream甚至不能访问类。 这个功能已经移动了吗?如果移动了,我如何使

  • 我正在用Java构建一个文件浏览器,并在JTrees中列出文件/文件夹。我现在想做的是,当我到达一个压缩文件夹时,我想列出它的内容,但不首先提取它。 如果有人有想法,请分享。

  • 问题内容: Gzip格式文件(gzip例如,使用程序创建的文件)使用“放气”压缩算法,该压缩算法与zlib使用的压缩算法相同。但是,使用zlib膨胀gzip压缩文件时,该库将返回Z_DATA_ERROR。 如何使用zlib解压缩gzip文件? 问题答案: python zlib库支持: RFC 1950(zlib压缩格式) RFC 1951(deflate压缩格式) RFC 1952(gzip压缩