python, пример параллельного обработки данных
Содержание
- Задача
- Настройка окружения
- Последовательный обработчик
- Параллельный обработчик
- Добавляем консольные команды
Задача содержание
У меня есть справочник, по различным технологиям. Исходники на github, написанный с использованием такого инструмента как sphinx.
Т.е. пишу текст в формате .rst и конвертирую их в .html страницы следующей командой.
make html
И каждая папка в проекте обрабатывается последовательно и все складывается в одну папку как один большой проект.
Я же хочу обработать каждую папку по отдельности, как самодостаточный проект, соответственно указанную команду необходимо выполнить в каждой папке.
Настройка окружения содержание
В данном случае у нас уже гит репозиторий есть и сторонние библиотеки нам не понадобятся, обойдемся стандартными модулями python'a.
Создаем скрипт
touch buld_docs.py
Последовательный обработчик содержание
Объявим глобальные переменные:
- папку, куда будем выгружать скомпилированные в html файлы
- список папок, которые будем обрабатывать
# импортируем модуль для работы с файлами и путями
import os
# путь до папки текущего скрипта
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
# путь до папки, куда складываем сконвертированные файлы
BUILD_DIR = os.path.join(BASE_DIR, '_build_html')
# список папок, которые обрабатываем
DIR_NAMES = [
'android',
'python',
'sql',
]
Напишем функцию, которая будет подготавливать папку для выгрузки, т.е. если папки не существует, то мы её создадим.
def prepare_build_dir():
"""функция подготовки папки для выгрузки
"""
if not os.path.exists(BUILD_DIR):
# указанного пути не существует
# значит необходимо создать папку выгрузки
os.makedirs(BUILD_DIR)
Напишем функцию, которая выполняет команду конвертации внутри конкретной папки
# импортируем билдер из самого sphinx'a
from sphinx.cmd import build
def build_html_serial(dir_name: str):
"""функция последовательной конвертации папки
:param dir_name: название обрабатываемой папки
"""
# параметры сборки
build_params = [
# укажем папку с конфигом по умолчанию
'-c', BASE_DIR,
# укажем формат кнвертируемых данных
'-b', 'html',
# укажем папку обработки
os.path.join(BASE_DIR, dir_name),
# укажем папку выгрузки
os.path.join(BUILD_DIR, dir_name),
]
# запускаем сборку
build.main(build_params)
Запускаем
if __name__ == '__main__':
prepare_build_dir()
for dir_name in DIR_NAMES:
build_html_serial(dir_name)
Теперь запустив скрипт в консоли, мы получим последовательную обработку всех указанных в списке папок.
python build_docs.py
Параллельный обработчик содержание
Т.к. папок у меня много, в данном примере мы рассматриваем только три, то мне нужно выполнить обработку всех папок параллельно. Будем улучшать скрипт. Параллельную обработку реализую через модуль multiprocessing
Напишем новый цикл обработки папки, используем multiprocessing
from time import time
# импортируем Pool, пулл процессов для обработки
# который создает по умолчанию столько процессов, сколько ядер на машине
from multiprocessing import Pool
if __name__ == '__main__':
# создадим список аргументов для функции параллельной обработки
jobs_args = [('html', dir_name) for dir_name in DIR_NAMES]
# создаем пулл и запускаем обработку всех параметров задач
# jobs_result - сохраним все что вернут процессы обработки
jobs_results = Pool().map(build_html_parallel, jobs_args)
# тут мы сохраним результаты работы в плоский список
jobs_result_list = []
# соберем в плоский список результаты, для последующей записи в лог файл
for job_result in jobs_results:
# job_result - tupe, то что возвращает функция build_html_parallel
# это stdout, stderr
for std in job_result:
for line in std.split('\n'):
# отфильтруем лишние строки
if (
line.startswith('reading sources...') or
line.startswith('writing output...')
):
continue
jobs_result_list.append(line)
current_time = int(time())
with open(os.path.join(BASE_DIR, f'{current_time}.log'), 'w') as f:
f.write('\n'.join(jobs_result_list))
Напишем новую функцию, которая будет запускаться через Poll.map параллельно.
Для того чтобы вернуть stdout и stderr в основной процесс, мы подменим их на свои через StringIO
# импортируем StringIO, файлоподобный объект, который ведет себя как файл,
# т.е. в него можно писать как в файл, и читать как из файла
# реализует интерфейс файла
from io import StringIO
def build_html_parallel(args):
"""функция конвертации для указанной папки, которая запускается параллельно
:param args: параметры сборки
"""
# распаковываем параметры, они сюда придут кортежом
mode, dir_name = args
# для визуального контроля выведем информацию о процессе
print('start', mode, dir_name)
# подменные объекты для stout, stderr
stdout = StringIO()
stderr = StringIO()
# сохраняем дефолтные объекты
stdout_def = sys.stdout
stderr_def = sys.stderr
# подменяем на свои объекты
sys.stdout = stdout
sys.stderr = stderr
# параметры сборки
build_params = [
# укажем папку с конфигом по умолчанию
'-c', BASE_DIR,
# укажем формат конвертируемых данных
'-b', mode,
# укажем папку обработки
os.path.join(BASE_DIR, dir_name),
# укажем папку выгрузки
os.path.join(BUILD_DIR, dir_name),
]
# запускаем сборку
build.main(build_params)
# обратно подменяем дефолтные объекты, на всякий случай
sys.stdout = stdout_def
sys.stderr = stderr_def
# для визуального контроля выведем информацию о процессе
print('done', mode, dir_name)
# возвращаем результата
return stdout.getvalue(), stderr.getvalue()
Добавляем консольные команды содержание
Добавим несколько консольных команд, используем argparser
- добавим параметр -с, для полной конвертации, т.е. с удалением папки билда и полной конвертацией всех папок
- добавим параметр, который будет указывать конкретные папки для конвертации
# импортируем объект, парсер командной строки
from argparse import ArgumentParser
arg_parser = ArgumentParser()
arg_parser.add_argument(
'-c',
'--clean',
help='Полная конвертация, с удалением папки выгрузки',
action='store_true',
dest='clean',
)
arg_parser.add_argument(
'-d',
'--dirs',
help='Конвертировать указанные папки, по умолчанию все',
default='all',
dest='process_dirs',
nargs='*',
)
app_args = arg_parser.parse_args()
Доработаем функцию подготовки выбора папки, добавим предварительную очистку папки, если необходимо
# импортируем модуль для работы с файловой системой
import shutil
def prepare_build_dir(app_args: ArgumentParser):
"""функция подготовки папки для выгрузки
"""
if app_args.clean and os.path.exists(build_dir):
# удаляем папку рекурсивно
shutil.rmtree(BUILD_DIR)
if not os.path.exists(BUILD_DIR):
# указанного пути не существует
# значит необходимо создать папку выгрузки
os.makedirs(BUILD_DIR)
Доработаем создание пула процессов обработки, добавив проверку если указаны конкретные папки
if __name__ == '__main__':
jobs_args = [
('html', dir_name)
for dir_name in DIR_NAMES
# папки из списка попадут тогда,
# когда мы конвертируем либо все
# или когда указанная папка есть в списке указанных
if 'all' in app_args.process_dirs or dir_name in app_args.process_dirs
]
Итоговый скрипт
import os
import shutil
import sys
from argparse import ArgumentParser
from io import StringIO
from time import time
from multiprocessing import Pool
from sphinx.cmd import build
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
# путь до папки, куда складываем сконвертированные файлы
BUILD_DIR = os.path.join(BASE_DIR, '_build_html')
# список папок, которые обрабатываем
DIR_NAMES = [
'android',
'arduino',
'cplus',
'css',
'docker',
'git',
'html',
'java',
'js',
'kotlin',
'linux',
'nginx',
'puppet',
'python',
're',
'sql',
'svg',
]
def prepare_build_dir(app_args: ArgumentParser):
"""функция подготовки папки для выгрузки
"""
if app_args.clean and os.path.exists(BUILD_DIR):
shutil.rmtree(BUILD_DIR)
if not os.path.exists(BUILD_DIR):
os.makedirs(BUILD_DIR)
def build_html_serial(dir_name: str):
"""функция последовательной конвертации папки
:param dir_name: название обрабатываемой папки
"""
build_params = [
# укажем папку с конфигом по умолчанию
'-c', BASE_DIR,
# укажем формат кнвертируемых данных
'-b', 'html',
# укажем папку обработки
os.path.join(BASE_DIR, dir_name),
# укажем папку выгрузки
os.path.join(BUILD_DIR, dir_name),
]
build.main(build_params)
def build_html_parallel(args):
"""функция конвертации для указанной папки, которая запускается параллельно
:param args: параметры сборки
"""
mode, dir_name = args
print('start', mode, dir_name)
stdout = StringIO()
stderr = StringIO()
stdout_def = sys.stdout
stderr_def = sys.stderr
sys.stdout = stdout
sys.stderr = stderr
build_params = [
# укажем папку с конфигом по умолчанию
'-c', BASE_DIR,
# укажем формат конвертируемых данных
'-b', mode,
# укажем папку обработки
os.path.join(BASE_DIR, dir_name),
# укажем папку выгрузки
os.path.join(BUILD_DIR, dir_name),
]
build.main(build_params)
sys.stdout = stdout_def
sys.stderr = stderr_def
print('done', mode, dir_name)
# возвращаем результата
return stdout.getvalue(), stderr.getvalue()
if __name__ == '__main__':
arg_parser = ArgumentParser()
arg_parser.add_argument(
'-c',
'--clean',
help='Полная конвертация, с удалением папки выгрузки',
action='store_true',
dest='clean',
)
arg_parser.add_argument(
'-d',
'--dirs',
help='Конвертировать указанные папки, по умолчанию все',
default='all',
dest='process_dirs',
nargs='*',
)
app_args = arg_parser.parse_args()
prepare_build_dir(app_args)
# создадим список аргументов для функции параллельной обработки
jobs_args = [
('html', dir_name)
for dir_name in DIR_NAMES
# папки из списка попадут тогда,
# когда мы конвертируем либо все
# или когда указанная папка есть в списке указанных
if 'all' in app_args.process_dirs or dir_name in app_args.process_dirs
]
# создаем пулл и запускаем обработку всех параметров задач
# jobs_result - сохраним все что вернут процессы обработки
jobs_results = Pool().map(build_html_parallel, jobs_args)
# тут мы сохраним результаты работы в плоский список
jobs_result_list = []
# соберем в плоский список результаты, для последующей записи в лог файл
for job_result in jobs_results:
# job_result - tupe, то что возвращает функция build_html_parallel
# это stdout, stderr
for std in job_result:
for line in std.split('\n'):
# отфильтруем лишние строки
if (
line.startswith('reading sources...') or
line.startswith('writing output...')
):
continue
jobs_result_list.append(line)
current_time = int(time())
with open(os.path.join(BASE_DIR, f'{current_time}.log'), 'w') as f:
f.write('\n'.join(jobs_result_list))
Комментарии