Потоковая обработка часто встречается при работе с большими файлами или когда данные приходят частями. В Python есть множество инструментов для работы с такими данными. Самый известный - итератор файла по строкам. В веб-приложениях это стандарт для передачи файлов. Далее приведу несколько примеров.
Чтение файлов
with open('huge-file.txt') as file:
for line in file:
process_line(line)
Это позволяет нам читать текстовый файл по строкам не загружая всё в память.
Конечно, если позволяет формат данных. С JSON такое не сработает (ijson может в этом помочь).
Запись файла чанками
with open('file-to-save.txt',
'w') as file:
for line in iter_data():
file.write(line)
Частные случаи есть в разных библиотеках. Например DictWriter и DictReader из модуля csv позволяет работать с конкретным форматом данных а не просто текст.
import csv
with open('data.csv', 'r') as file:
reader = csv.DictReader(file)
for row in reader:
print(row)
with open('data.csv', 'a',
newline='') as f:
writer = csv.DictWriter(f,
fieldnames=['col1', 'col2']
)
for row in iter_objects():
writer.writerow(row)
Отдельно интересен ZipFile, позволяющий "открыть" файл сразу внутри архива и записывать его частями
import zipfile as zf
with zf.ZipFile(
'archive.zip',
'w',
compression=zf.ZIP_DEFLATED) as zf:
with zf.open(
'large_data.bin',
mode='w') as in_file:
with open(
'large_data.bin',
'rb') as source:
for chunk in iter(
lambda: source.read(1024),
b''):
in_file.write(chunk)
Создание хеша для большого файла
import hashlib
sha256 = hashlib.sha256()
with open(
'large-file.bin',
'rb') as f:
for block in iter(
lambda: f.read(1024), b''
):
sha256.update(block)
hash_sum = sha256.hexdigest()
Сжатие данных в файл отдельными чанками
import gzip
with gzip.open('data.gz', 'wb') as f:
for bin_chunk in iter_bin_data():
f.write(bin_chunk)
Чтение с записью в файл
with gzip.open('data.gz', 'rb') as f_in:
with open(
'extracted_data.txt',
'wb') as f_out:
for chunk in iter(
lambda: f_in.read(1024),
b''):
f_out.write(chunk)
Подсчет объектов из стрима. Добавление обновляет счетчики.
from collections import Counter
c = Counter()
for data in iter_objects():
c.update(data)
Это не все доступные примеры, их еще много. Каждый из них позволяет обрабатывать данные из потока не ожидая весь набор и не загружая их в оперативку.
Это очень полезная техника, которую я призываю использовать по назначению!
#tricks#libs