Гайфутдинов Ильнур \ Блог

python, пример параллельного обработки данных

Содержание

Задача содержание

У меня есть справочник, по различным технологиям. Исходники на github, написанный с использованием такого инструмента как sphinx.

Т.е. пишу текст в формате .rst и конвертирую их в .html страницы следующей командой.

make html

И каждая папка в проекте обрабатывается последовательно и все складывается в одну папку как один большой проект.

Я же хочу обработать каждую папку по отдельности, как самодостаточный проект, соответственно указанную команду необходимо выполнить в каждой папке.

Настройка окружения содержание

В данном случае у нас уже гит репозиторий есть и сторонние библиотеки нам не понадобятся, обойдемся стандартными модулями python'a.

Создаем скрипт

touch buld_docs.py

Последовательный обработчик содержание

Объявим глобальные переменные:

# импортируем модуль для работы с файлами и путями
import os

# путь до папки текущего скрипта
BASE_DIR = os.path.abspath(os.path.dirname(__file__))

# путь до папки, куда складываем сконвертированные файлы
BUILD_DIR = os.path.join(BASE_DIR, '_build_html')

# список папок, которые обрабатываем
DIR_NAMES = [
    'android',
    'python',
    'sql',
]

Напишем функцию, которая будет подготавливать папку для выгрузки, т.е. если папки не существует, то мы её создадим.

def prepare_build_dir():
    """функция подготовки папки для выгрузки
    """

    if not os.path.exists(BUILD_DIR):
        # указанного пути не существует
        # значит необходимо создать папку выгрузки
        os.makedirs(BUILD_DIR)

Напишем функцию, которая выполняет команду конвертации внутри конкретной папки

# импортируем билдер из самого sphinx'a
from sphinx.cmd import build


def build_html_serial(dir_name: str):
    """функция последовательной конвертации папки
    :param dir_name: название обрабатываемой папки
    """

    # параметры сборки
    build_params = [
        # укажем папку с конфигом по умолчанию
        '-c', BASE_DIR,
        # укажем формат кнвертируемых данных
        '-b', 'html',
        # укажем папку обработки
        os.path.join(BASE_DIR, dir_name),
        # укажем папку выгрузки
        os.path.join(BUILD_DIR, dir_name),
    ]

    # запускаем сборку
    build.main(build_params)

Запускаем

if __name__ == '__main__':
    prepare_build_dir()
    for dir_name in DIR_NAMES:
        build_html_serial(dir_name)

Теперь запустив скрипт в консоли, мы получим последовательную обработку всех указанных в списке папок.

python build_docs.py

Параллельный обработчик содержание

Т.к. папок у меня много, в данном примере мы рассматриваем только три, то мне нужно выполнить обработку всех папок параллельно. Будем улучшать скрипт. Параллельную обработку реализую через модуль multiprocessing

Напишем новый цикл обработки папки, используем multiprocessing

from time import time

# импортируем Pool, пулл процессов для обработки
# который создает по умолчанию столько процессов, сколько ядер на машине
from multiprocessing import Pool

if __name__ == '__main__':

    # создадим список аргументов для функции параллельной обработки        
    jobs_args = [('html', dir_name) for dir_name in DIR_NAMES]

    # создаем пулл и запускаем обработку всех параметров задач
    # jobs_result - сохраним все что вернут процессы обработки
    jobs_results = Pool().map(build_html_parallel, jobs_args)

    # тут мы сохраним результаты работы в плоский список
    jobs_result_list = []

    # соберем в плоский список результаты, для последующей записи в лог файл
    for job_result in jobs_results:

        # job_result - tupe, то что возвращает функция build_html_parallel
        # это stdout, stderr
        for std in job_result:
            for line in std.split('\n'):
                # отфильтруем лишние строки
                if (
                        line.startswith('reading sources...') or
                        line.startswith('writing output...')
                ):
                    continue

                jobs_result_list.append(line)

    current_time = int(time())
    with open(os.path.join(BASE_DIR, f'{current_time}.log'), 'w') as f:
        f.write('\n'.join(jobs_result_list))

Напишем новую функцию, которая будет запускаться через Poll.map параллельно.

Для того чтобы вернуть stdout и stderr в основной процесс, мы подменим их на свои через StringIO

# импортируем StringIO, файлоподобный объект, который ведет себя как файл, 
# т.е. в него можно писать как в файл, и читать как из файла
# реализует интерфейс файла
from io import StringIO


def build_html_parallel(args):
    """функция конвертации для указанной папки, которая запускается параллельно
    :param args: параметры сборки
    """

    # распаковываем параметры, они сюда придут кортежом
    mode, dir_name = args

    # для визуального контроля выведем информацию о процессе
    print('start', mode, dir_name)

    # подменные объекты для stout, stderr
    stdout = StringIO()
    stderr = StringIO()

    # сохраняем дефолтные объекты
    stdout_def = sys.stdout
    stderr_def = sys.stderr

    # подменяем на свои объекты
    sys.stdout = stdout
    sys.stderr = stderr

    # параметры сборки
    build_params = [
        # укажем папку с конфигом по умолчанию
        '-c', BASE_DIR,
        # укажем формат конвертируемых данных
        '-b', mode,
        # укажем папку обработки
        os.path.join(BASE_DIR, dir_name),
        # укажем папку выгрузки
        os.path.join(BUILD_DIR, dir_name),
    ]

    # запускаем сборку
    build.main(build_params)

    # обратно подменяем дефолтные объекты, на всякий случай
    sys.stdout = stdout_def
    sys.stderr = stderr_def

    # для визуального контроля выведем информацию о процессе
    print('done', mode, dir_name)

    # возвращаем результата
    return stdout.getvalue(), stderr.getvalue()

Добавляем консольные команды содержание

Добавим несколько консольных команд, используем argparser

# импортируем объект, парсер командной строки
from argparse import ArgumentParser

arg_parser = ArgumentParser()

arg_parser.add_argument(
    '-c',
    '--clean',
    help='Полная конвертация, с удалением папки выгрузки',
    action='store_true',
    dest='clean',
)

arg_parser.add_argument(
    '-d',
    '--dirs',
    help='Конвертировать указанные папки, по умолчанию все',
    default='all',
    dest='process_dirs',
    nargs='*',
)

app_args = arg_parser.parse_args()

Доработаем функцию подготовки выбора папки, добавим предварительную очистку папки, если необходимо

# импортируем модуль для работы с файловой системой
import shutil

def prepare_build_dir(app_args: ArgumentParser):
    """функция подготовки папки для выгрузки
    """

    if app_args.clean and os.path.exists(build_dir):
        # удаляем папку рекурсивно
        shutil.rmtree(BUILD_DIR)

    if not os.path.exists(BUILD_DIR):
        # указанного пути не существует
        # значит необходимо создать папку выгрузки
        os.makedirs(BUILD_DIR)

Доработаем создание пула процессов обработки, добавив проверку если указаны конкретные папки

if __name__ == '__main__':

    jobs_args = [
        ('html', dir_name) 
        for dir_name in DIR_NAMES

        # папки из списка попадут тогда,
        # когда мы конвертируем либо все
        # или когда указанная папка есть в списке указанных
        if 'all' in app_args.process_dirs or dir_name in app_args.process_dirs
    ]
Итоговый скрипт
import os
import shutil
import sys

from argparse import ArgumentParser
from io import StringIO
from time import time
from multiprocessing import Pool

from sphinx.cmd import build

BASE_DIR = os.path.abspath(os.path.dirname(__file__))

# путь до папки, куда складываем сконвертированные файлы
BUILD_DIR = os.path.join(BASE_DIR, '_build_html')

# список папок, которые обрабатываем
DIR_NAMES = [
    'android',
    'arduino',
    'cplus',
    'css',
    'docker',
    'git',
    'html',
    'java',
    'js',
    'kotlin',
    'linux',
    'nginx',
    'puppet',
    'python',
    're',
    'sql',
    'svg',
]


def prepare_build_dir(app_args: ArgumentParser):
    """функция подготовки папки для выгрузки
    """

    if app_args.clean and os.path.exists(BUILD_DIR):
        shutil.rmtree(BUILD_DIR)

    if not os.path.exists(BUILD_DIR):
        os.makedirs(BUILD_DIR)


def build_html_serial(dir_name: str):
    """функция последовательной конвертации папки
    :param dir_name: название обрабатываемой папки
    """

    build_params = [
        # укажем папку с конфигом по умолчанию
        '-c', BASE_DIR,
        # укажем формат кнвертируемых данных
        '-b', 'html',
        # укажем папку обработки
        os.path.join(BASE_DIR, dir_name),
        # укажем папку выгрузки
        os.path.join(BUILD_DIR, dir_name),
    ]

    build.main(build_params)


def build_html_parallel(args):
    """функция конвертации для указанной папки, которая запускается параллельно
    :param args: параметры сборки
    """

    mode, dir_name = args

    print('start', mode, dir_name)

    stdout = StringIO()
    stderr = StringIO()

    stdout_def = sys.stdout
    stderr_def = sys.stderr

    sys.stdout = stdout
    sys.stderr = stderr

    build_params = [
        # укажем папку с конфигом по умолчанию
        '-c', BASE_DIR,
        # укажем формат конвертируемых данных
        '-b', mode,
        # укажем папку обработки
        os.path.join(BASE_DIR, dir_name),
        # укажем папку выгрузки
        os.path.join(BUILD_DIR, dir_name),
    ]

    build.main(build_params)

    sys.stdout = stdout_def
    sys.stderr = stderr_def

    print('done', mode, dir_name)

    # возвращаем результата
    return stdout.getvalue(), stderr.getvalue()


if __name__ == '__main__':

    arg_parser = ArgumentParser()

    arg_parser.add_argument(
        '-c',
        '--clean',
        help='Полная конвертация, с удалением папки выгрузки',
        action='store_true',
        dest='clean',
    )

    arg_parser.add_argument(
        '-d',
        '--dirs',
        help='Конвертировать указанные папки, по умолчанию все',
        default='all',
        dest='process_dirs',
        nargs='*',
    )

    app_args = arg_parser.parse_args()

    prepare_build_dir(app_args)

    # создадим список аргументов для функции параллельной обработки        
    jobs_args = [
        ('html', dir_name) 
        for dir_name in DIR_NAMES

        # папки из списка попадут тогда,
        # когда мы конвертируем либо все
        # или когда указанная папка есть в списке указанных
        if 'all' in app_args.process_dirs or dir_name in app_args.process_dirs
    ]

    # создаем пулл и запускаем обработку всех параметров задач
    # jobs_result - сохраним все что вернут процессы обработки
    jobs_results = Pool().map(build_html_parallel, jobs_args)

    # тут мы сохраним результаты работы в плоский список
    jobs_result_list = []

    # соберем в плоский список результаты, для последующей записи в лог файл
    for job_result in jobs_results:

        # job_result - tupe, то что возвращает функция build_html_parallel
        # это stdout, stderr
        for std in job_result:
            for line in std.split('\n'):
                # отфильтруем лишние строки
                if (
                        line.startswith('reading sources...') or
                        line.startswith('writing output...')
                ):
                    continue

                jobs_result_list.append(line)

    current_time = int(time())
    with open(os.path.join(BASE_DIR, f'{current_time}.log'), 'w') as f:
        f.write('\n'.join(jobs_result_list))

Комментарии