当前位置: 首页 > 知识库问答 >
问题:

使用python io从缓冲流组成行读取器

卢才艺
2023-03-14

我使用python boto与s3交互。我在s3上的文件是CSV的,我想从s3中读取行,使用缓冲区来绑定内存使用。

我想知道是否有人有任何方法可以编写python的io类来实现这一点?我们的目标是有某种抽象,能够包装boto键,并在键上提供一个readline或迭代器接口(只提供read(size=0)调用)。其复杂性在于,由于它存储为CSV,所以每一行的长度都是可变的。

目标是有一个抽象,我能够包装python boto key,然后实现迭代器协议,这样我就可以把它传递给csv阅读器,我最终实现了自己。

它看起来像pythonio真的有所有的部分来完成这个BufferedReaderTextIOWrapper,我天真地试图将botoKey传递给它,但是BufferedReader需要一个IOBase对象。

然后,我围绕密钥实现了IOBase协议,但得到了Unicode错误,只是一般不确定我在做什么。

有人知道python io是否可以做类似于上面描述的事情吗??

技术规格:

s3上有一个1-100个CSV文件的目录。它们都具有相同的格式,但行数可变。我试图实现一个函数,它需要一个botoKeys的迭代器。

Key提供了一个读取(num_bytes)方法。

def yield_lines(keys_iterator):
   # had to custom implement this
   # any way using io??
   # yield each CSV row across keys that only provide `read()` method

我最初的尝试是尝试让botoKey遵守IOBase。我将用缓冲读取器编写它,然后尝试使用TextIOWrapper读取其中的行,但遇到了readinto的编码问题。

class IOCompatibleKey(object):

   def __init__(self, s3_key):
      self.s3_key = s3_key

   def readable(self):
      return True

   def writeable(self):
      return False

   def read(num_bytes):
      return self.s3_key.read(num_bytes)

   def readinto(n):
      # .... ?????

buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
text_reader = TextIOWrapper(buffered_reader)
for line in text_reader: # <- IS THIS POSSIBLE????
    print(line)

共有1个答案

柳威
2023-03-14

在Python 2中,您希望避免使用TextIOWrapper对象,因为csv.reader()对象需要一个bytestring。它无法处理unicode对象TextIOWrapper提供的。

提供IOBase实现在其他方面非常简单:

class IOCompatibleKey(object):    
    def __init__(self, s3_key):
        self.s3_key = s3_key

    def readable(self):
        return True

    def writeable(self):
        return False

    @property
    def closed(self):
        return self.s3_key.closed

    def close(self):
        self.s3_key.close()

    def read(self, num_bytes):
        return self.s3_key.read(num_bytes)

    def readinto(self, n):
        chunk = self.s3_key.read(len(n))
        read = len(chunk)
        n[:read] = chunk
        return read

并且在使用Python 2时仅使用BufferedReader

buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
csv_reader = csv.reader(buffered_reader)
for row in csv_reader:
    print(row)

在Python3上,只需在BufferedReader()的顶部添加一个TextIOWrapper()

Python 2中的演示,使用模拟键:

>>> import random, csv
>>> from io import BufferedReader
>>> class Key(object):
...     closed = False
...     def read(self, bytes=1024):
...         if random.random() < 0.2:
...             bytes = random.randrange(bytes)
...         return ''.join([random.choice('abcdefghijklmnopqrstuvwxyz \n,') for _ in range(bytes)])
...
>>> s3_key = Key()
>>> buffered_reader = BufferedReader(IOCompatibleKey(s3_key))
>>> next(buffered_reader)   # produces a single \n terminated line
'nffdahuitmdaktibxjsdgyhlyfm gurfyo,nt\n'
>>> reader = csv.reader(buffered_reader)  # which satisfies csv.reader
>>> next(reader)
['bi iydribq', 'u']
>>> next(reader)
['qzxtbhkk se', 'v', 'b', 'nunyjemtkxaphuqmvgfrfjdloxwohqamdtvfqgddfna cjuzpaotccenxhhhgnvrbey']
 类似资料:
  • 本文向大家介绍Java缓冲读取器,包括了Java缓冲读取器的使用技巧和注意事项,需要的朋友参考一下 示例 介绍 该BufferedReader班是以外的包装Reader是有两个主要目的类: ABufferedReader为包装的提供缓冲Reader。这允许应用程序一次读取一个字符,而不会产生过多的I / O开销。 一个BufferedReader用于一次读取文本行提供的功能。 使用Buffered

  • 我在读这样一个文件: 文件如下所示: 我是这样分析的: 这里没有显示与每一行相关的代码。对于某些行,我想跳回到某个行号,并再次继续阅读文件。最好的方法是什么?我必须创建另一个读取器并跳过行,直到我感兴趣的特定行吗?我正在沿着这篇文章的思路看,但我想继续阅读文件的其余部分。

  • 问题内容: 为了寻求帮助,我目前已编写了HTTP服务器。目前,它可以很好地处理GET请求。但是,在使用POST时,缓冲的读取器似乎挂起。当请求停止时,其余输入流将通过缓冲的读取器读取。我在Google上找到了一些东西。我尝试将CRLF和协议版本从1.1更改为1.0(浏览器会自动将请求发送为1.1),任何想法或帮助将不胜感激。谢谢 问题答案: 这不安全!但是显示了如何在初始HTTP标头之后的输入流中

  • 我正在阅读有关流的信息,发现我们可以使用setvbuf()函数来控制流......它写的是在行缓冲模式中,当遇到换行符时流将数据发送到文件中,在无缓冲状态下没有缓冲......所以我写了以下代码...... 所以我认为,因为这些是无缓冲流,所以输入应该在我写入屏幕后立即发送到标准输出。。。但程序在写入每一行后等待我按enter键,然后屏幕上只显示输出(由于fwrite)。。。我的问题是,当这些是无

  • 问题内容: 我有一个线程可以从缓冲读取器读取字符(该读取器是从套接字创建的,如下所示): 此代码只能运行一次。例如,如果客户端连接并发送此消息:“这是一个测试”和“这是另一个测试”,则主机输出为: 请注意该程序不会收到“这是另一个测试”,因为它停留在读取流上。有什么办法在不减小缓冲区大小的情况下解决这个问题?这是线程的代码: 客户/发送者的代码(不是我的代码): 问题答案: 是一个 阻塞 调用,这

  • 我的目标是用扩展名解析协议缓冲区文件。pb。一串在Mac上使用自制软件下载Protobuff。运行protoc--版本,并具有libprotoc 3.1.0版本。 但当我运行Python时,它会说找不到模块。我改变了主意。pb文件名到\u pb2。py并在Python脚本中导入模块。 我正在使用谷歌文档,但仍然没有任何运气。我在编译Protobuf时也遇到了问题。so文件通过Python。我只是无