我有一个source[ByteString,NotUsed]
列表,它与来自S3 bucket的文件名成对。这些需要压缩在恒定内存中,并在Play2.6中提供服务。
我的变化主要在这里:
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val src: Source[ByteString, NotUsed] = source()
val operation = src.runForeach(bytestring => {
val byteInputStream = new ByteArrayInputStream(bytestring.toArray)
emitMultiple(out, fileChunks(byteInputStream, buffer))
})
operation.onComplete {
case _ => buffer.endEntry()
}
Await.ready(operation, 5.minute)
}
我知道这是阻塞,但我不确定在这种情况下是否允许。
我如何以安全的方式完成这件事?
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val stream = source().runWith(StreamConverters.asInputStream(1.minute))
currentStream = Some(stream)
emitMultiple(out, fileChunks(stream, buffer), () => buffer.endEntry())
}
下面是我当前图形阶段实现的完整列表:
import java.io.{ByteArrayInputStream, InputStream, OutputStream}
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.{ByteString, ByteStringBuilder}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
import scala.util.control.NonFatal
//scalastyle:off
class StreamedZip(bufferSize: Int = 64 * 1024)(implicit ec: ExecutionContext,
mat: ActorMaterializer)
extends GraphStage[FlowShape[StreamedZip.ZipSource, ByteString]] {
import StreamedZip._
val in: Inlet[ZipSource] = Inlet("StreamedZip.in")
val out: Outlet[ByteString] = Outlet("StreamedZip.out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
private val buffer = new ZipBuffer(bufferSize)
private var currentStream: Option[InputStream] = None
setHandler(
out,
new OutHandler {
override def onPull(): Unit =
if (isClosed(in)) {
if (buffer.isEmpty) completeStage()
else {
buffer.close
push(out, buffer.toByteString)
}
} else pull(in)
override def onDownstreamFinish(): Unit = {
closeInput()
buffer.close
super.onDownstreamFinish()
}
}
)
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val (filepath, source: StreamGenerator) = grab(in)
buffer.startEntry(filepath)
val stream = source().runWith(StreamConverters.asInputStream(1.minute))
emitMultiple(out, fileChunks(stream, buffer), () => { buffer.endEntry() })
}
override def onUpstreamFinish(): Unit = {
println("Updstream finish")
closeInput()
if (buffer.isEmpty) completeStage()
else {
buffer.close()
if (isAvailable(out)) {
push(out, buffer.toByteString)
}
}
}
}
)
private def closeInput(): Unit = {
currentStream.foreach(_.close)
currentStream = None
}
private def fileChunks(stream: InputStream, buffer: ZipBuffer): Iterator[ByteString] = {
// This seems like a good trade-off between single-byte
// read I/O performance and doubling the ZipBuffer size.
//
// And it's still a decent defense against DDOS resource
// limit attacks.
val readBuffer = new Array[Byte](1024)
var done = false
def result: Stream[ByteString] =
if (done) Stream.empty
else {
try {
while (!done && buffer.remaining > 0) {
val bytesToRead = Math.min(readBuffer.length, buffer.remaining)
val count = stream.read(readBuffer, 0, bytesToRead)
if (count == -1) {
stream.close
done = true
} else buffer.write(readBuffer, count)
}
buffer.toByteString #:: result
} catch {
case NonFatal(e) =>
closeInput()
throw e
}
}
result.iterator
}
}
}
object StreamedZip {
type ZipFilePath = String
type StreamGenerator = () => Source[ByteString, NotUsed]
type ZipSource = (ZipFilePath, StreamGenerator)
def apply()(implicit ec: ExecutionContext, mat: ActorMaterializer) = new StreamedZip()
}
class ZipBuffer(val bufferSize: Int = 64 * 1024) {
import java.util.zip.{ZipEntry, ZipOutputStream}
private var builder = new ByteStringBuilder()
private val zip = new ZipOutputStream(builder.asOutputStream) {
// this MUST ONLY be used after flush()!
def setOut(newStream: OutputStream): Unit = out = newStream
}
private var inEntry = false
private var closed = false
def close(): Unit = {
endEntry()
closed = true
zip.close()
}
def remaining(): Int = bufferSize - builder.length
def isEmpty(): Boolean = builder.isEmpty
def startEntry(path: String): Unit =
if (!closed) {
endEntry()
zip.putNextEntry(new ZipEntry(path))
inEntry = true
}
def endEntry(): Unit =
if (!closed && inEntry) {
inEntry = false
zip.closeEntry()
}
def write(byte: Int): Unit =
if (!closed && inEntry) zip.write(byte)
def write(bytes: Array[Byte], length: Int): Unit =
if (!closed && inEntry) zip.write(bytes, 0, length)
def toByteString(): ByteString = {
zip.flush()
val result = builder.result
builder = new ByteStringBuilder()
// set the underlying output for the zip stream to be the buffer
// directly, so we don't have to copy the zip'd byte array.
zip.setOut(builder.asOutputStream)
result
}
}
最后我使用了上面的zipbuffer
,并使用akka流DSL解决了整个问题。
我的解决方案如下所示:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, SourceShape}
import akka.util.ByteString
import com.company.config.AWS
import org.log4s.getLogger
case class S3StreamingServiceLike(awsConf: AWS, s3Client: S3ClientAlpakka)(
implicit sys: ActorSystem,
mat: ActorMaterializer)
extends S3StreamingService {
private implicit class ConcatSyntax[T, U](source: Source[T, U]) {
def ++[TT >: T, NotUsed](that: Source[SourceShape[TT], NotUsed]): Source[Any, U] = //scalastyle:ignore
source.concat(that)
}
private val logger = getLogger
private sealed trait ZipElement
private case class FileStart(name: String, index: Int, outOf: Int) extends ZipElement
private case class FileEnd(name: String, index: Int, outOf: Int) extends ZipElement
private case class FilePayload(byteString: ByteString) extends ZipElement
private case object EndZip extends ZipElement
private def payloadSource(filename: String) =
s3Client.download(awsConf.s3BucketName, filename).map(FilePayload.apply)
private def fileNameToZipElements(filename: String,
index: Int,
outOf: Int): Source[ZipElement, NotUsed] =
Source.single(FileStart(filename, index, outOf)) ++
payloadSource(filename) ++
Source.single(FileEnd(filename, index, outOf))
def streamFilesAsZip(filenames: List[String])(forUser: String): Source[ByteString, NotUsed] = {
val zipBuffer = new ZipBuffer()
val zipElementSource: Source[ZipElement, NotUsed] =
Source(filenames.zipWithIndex).flatMapConcat {
case (filename, index) => fileNameToZipElements(filename, index + 1, filenames.length)
} ++ Source.single(EndZip)
zipElementSource
.map {
case FileStart(name, index, outOf) =>
logger.info(s"Zipping file #$index of $outOf with name $name for user $forUser")
zipBuffer.startEntry(name)
None
case FilePayload(byteString) =>
if (byteString.length > zipBuffer.remaining()) {
throw new Exception(
s"Bytestring size exceeded buffer size ${byteString.length} > ${zipBuffer.remaining}")
}
zipBuffer.write(byteString.toArray, byteString.length)
Some(zipBuffer.toByteString())
case FileEnd(name, index, outOf) =>
logger.info(s"Finished zipping file #$index of $outOf with $name for user $forUser")
zipBuffer.endEntry()
Some(zipBuffer.toByteString())
case EndZip =>
zipBuffer.close()
Some(zipBuffer.toByteString())
}
.collect {
case Some(bytes) if bytes.length > 0 => bytes
}
}
}
我正在尝试使用akka流传输一个文件,在将流的结果提取到Future[String]时遇到了一个小问题: 我得到一个编译错误: 任何人都可以帮助我了解我做错了什么,以及我需要做些什么来提取流的结果?
Ziplist 是由一系列特殊编码的内存块构成的列表, 一个 ziplist 可以包含多个节点(entry), 每个节点可以保存一个长度受限的字符数组(不以 \0 结尾的 char 数组)或者整数, 包括: 字符数组 长度小于等于 63 ()字节的字符数组 长度小于等于 16383 () 字节的字符数组 长度小于等于 4294967295 ()字节的字符数组 整数 4 位长,介于 0 至 12 之
问题内容: 伙计们 我正在尝试找到问题的最优雅解决方案,并且想知道python是否为我想做的事情内置了任何东西。 我正在做的是这个。我有一个列表,并且我有一个接受项目并返回列表的函数。我可以使用列表推导来转换所有内容,例如: 但这返回一个列表列表; 我真正想要的是获取扁平化的列表; 现在,其他语言也有它。传统上用函数式编程语言来称呼它,而.Net称它为。python有类似的东西吗?有没有一种巧妙的
在带有lambda b93的JDK8中,b93中有一个类java.util.Stream.streams.zip,可以用来压缩流(这在Dhananjay Nene编写的Exploring Java8 lambda.Part 1教程中进行了说明)。此功能: 然而,在b98中,这种情况消失了。事实上,B98中的java.util.Stream甚至不能访问类。 这个功能已经移动了吗?如果移动了,我如何使
我正在用Java构建一个文件浏览器,并在JTrees中列出文件/文件夹。我现在想做的是,当我到达一个压缩文件夹时,我想列出它的内容,但不首先提取它。 如果有人有想法,请分享。
问题内容: Gzip格式文件(gzip例如,使用程序创建的文件)使用“放气”压缩算法,该压缩算法与zlib使用的压缩算法相同。但是,使用zlib膨胀gzip压缩文件时,该库将返回Z_DATA_ERROR。 如何使用zlib解压缩gzip文件? 问题答案: python zlib库支持: RFC 1950(zlib压缩格式) RFC 1951(deflate压缩格式) RFC 1952(gzip压缩