Добро пожаловать в третью часть моей серии статей Инженерия данных с помощью Rust и Apache Arrow DataFusion. Доступ к первой части здесь.

В предыдущей статье я разработал простой интерфейс командной строки с помощью Clap. Этот интерфейс командной строки может обрабатывать спецификации команд конечного пользователя для входных/выходных файлов и предоставляет подкоманду для настройки операции фильтрации. Затем эти конфигурации командной строки проверяются и объединяются в Rust структуру.

В этой статье я демонстрирую преобразование данных с помощью оператора Filter, который принимает столбец и значение в качестве параметров CLI. Я реализую часть обработки с помощью Apache Arrow DataFusion.

Apache Arrow DataFusion — это механизм выполнения запросов, который позволяет легко выполнять операции чтения, записи и преобразования данных. Он использует формат памяти Apache Arrow, обеспечивающий быстрый доступ к хранилищу и обработку для анализа данных.

Формат Apache Arrow в памяти стандартизирован и доступен на нескольких языках (например, C++, Rust, Julia, Python и т. д.). Он развертывается в крупных проектах по обработке данных (например, Apache Spark и Apache Parquet) и ускоряет выполнение сложных операций: например, чтение/запись Parquet, преобразование данных и совместное использование данных между процессами локально или по сети.

Проект Apache Arrow DataFusion использует эту экосистему и позволяет быстро проектировать и внедрять конвейеры преобразования данных. Он предоставляет механизм запросов, написанный на Rust, с обширной поддержкой SQL и API DataFrame. Кроме того, эта платформа может использовать многоядерную многопоточность, распараллеливая запросы к файлам CSV или Parquet.

Структура программы

В предыдущей статье я определил следующую структуру программы.

→ Rust «mdata_app/src/main.rs» =
 
use std::fs;
use std::num::ParseFloatError;
use std::path::{Path, PathBuf};
use std::str::ParseBoolError;
 
use clap::{ArgEnum, Parser, Subcommand};
use datafusion::arrow::datatypes::DataType;
use datafusion::error::DataFusionError;
use datafusion::prelude::*;
use env_logger::{Builder, Env};
use log::{debug, info, LevelFilter, trace};
use thiserror::Error;
 
(1)
<<cli-args>>
 
(2)
<<structures>>
 
(3)
<<utilities>>
 
(4)
<<data-processing>>
 
(5)
<<program-main>>
 
(6)
<<unit-testing>>

В следующих разделах я сосредоточусь на реализации блоков кода <<structures>> и <<data-processing>>.

Формат и управление ошибками

Прежде чем погрузиться в код преобразования данных, я определяю две небольшие утилиты перечисления:

  • Перечисление WriteFormat указывает формат файла, обрабатываемый нашим приложением.
  • Перечисление MDataAppError — это наш тип управления ошибками, определяющий случаи ошибок.
→ Rust «structures» =
 
(1)
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, ArgEnum)]
enum WriteFormat {
    Undefined = 0,
    Csv = 1,
    Parquet = 2,
}
 
(2)
#[derive(Error, Debug)]
enum MDataAppError {
    #[error("invalid path encoding {path:?}")]
    PathEncoding { path: PathBuf },
    #[error("invalid input format {path:?}")]
    InputFormat { path: PathBuf },
    #[error("invalid output format {format:?}")]
    OutputFormat { format: WriteFormat },
    #[error("invalid filter value {error_message:?}")]
    FilterValue { error_message: String },
    #[error("transparent")]
    DataFusionOp(#[from] DataFusionError),
}
 
(3)
impl From<ParseBoolError> for MDataAppError {
    fn from(e: ParseBoolError) -> Self {
        MDataAppError::FilterValue { error_message: e.to_string() }
    }
}
 
impl From<ParseFloatError> for MDataAppError {
    fn from(e: ParseFloatError) -> Self {
        MDataAppError::FilterValue { error_message: e.to_string() }
    }
}
  • (1): Три случая для входных/выходных файлов. Undefined используется для кодирования случая, когда наша программа не определяет тип файла. Мы используем это значение для неопределенного формата ввода. Мы выводим входной формат из имени входного файла или формата каталога.
  • (2) : MDataAppError использует корзину thiserror, предоставляющую макрос Error. С помощью этой ошибки мы быстро генерируем тип ошибки с пользовательскими предупреждающими сообщениями. Ошибки, полученные из этого ящика, реализуют трейт std::error::Error.
  • (3): программа может вызвать несколько типов ошибок. Мы определяем правило автоматического преобразования из Parse[Bool/Float]Error в наш пользовательский тип MDataAppError.

Фокус управления ошибками

Язык Rust не реализует механизм исключений. Почему? Системы исключений обеспечивают комплексное управление ошибками, прерывая поток управления в любой точке. Эти исключения могут быть перехвачены и обработаны разными способами. Но при плохом использовании (например, универсальное использование) они часто создают код, который трудно читать и поддерживать (см., например, [1] G. B. de Pádua and W. Shang, Изучение взаимосвязи между практиками обработки исключений). и дефекты после выпуска).

Rust различает неисправимые ошибки с макросом panic! и исправимые ошибки с типом Result. В случае неисправимых ошибок программа останавливается с указанным сообщением об ошибке panic!. В восстанавливаемом случае управление ошибками в программе Rust должно быть реализовано в потоке управления вашей программы с использованием возвращаемых значений.

Как мы можем сделать это? Основной способ — использовать тип Option<T>; в случае неудачи вы можете вернуть вызывающей стороне значение None. Но как понять результат вашей функции? Это ошибка или частный случай, когда эта функция ничего не возвращает? С Option<T> вы не распространяете информацию об ошибке.

Лучший вариант, Rust предоставляет тип std::Result<T, E> для возврата значения функции типа T в случае успеха или ошибки типа E. Затем с помощью оператора ? вы можете распространять ошибку в своих функциях.

Но как управлять функцией, которая может возвращать несколько внутренних типов ошибок и внешних типов ошибок (как в нашем примере с DataFusionError)?

В этом случае мы реализуем перечисление случаев ошибок с помощью этой ошибки ящика (2).

В случае типа DataFusionError следующая строка разрешает прозрачную пересылку ошибки внутри нашего пользовательского типа.

#[error("transparent")]
DataFusionOp ( #[from] DataFusionError ),

Без этой функции из корзины thiserror нам пришлось бы реализовать соответствующее преобразование типа From.

impl From<DataFusionError> for MDataAppError {
    fn from(e: DataFusionError) -> MDataAppError {
        MDataAppError::DataFusionOp { error: e }
    }
}

Мы реализуем эти преобразования только для ошибок синтаксического анализа строк Parse[Bool/Float]Error (3).

Основная функция обработки

Функция обработки данных mdata_app принимает параметры аргумента opts в качестве входных данных и возвращает пустой результат (с потенциальным типом ошибки). Слой обработки включает в себя три шага:

  • Инициализация: создайте контекст выполнения Apache Arrow DataFusion и получите путь ввода и вывода.
  • Сделайте вывод о формате ввода на основе имени файла или содержимого каталога. Для этого шага мы используем пользовательскую служебную функцию infer_file_type. Затем мы регистрируем предполагаемый источник данных в контексте Apache Arrow DataFusion.
  • Примените операции преобразования и запишите полученный файл.
→ Rust «data-processing» =
 
(1)
async fn mdata_app(opts: MDataAppArgs) -> Result<(), MDataAppError> {
    let ctx = SessionContext::new();
    let input_path = get_os_path(&opts.input)?;
    let output_path = get_os_path(&opts.output)?;
 
    (2)
    // the inferred format is returned
    let inferred_input_format =
        match infer_file_type(&opts.input, true) {
            WriteFormat::Csv => {
                (3)
                ctx.register_csv("input", input_path, CsvReadOptions::new())
                    .await?;
                WriteFormat::Csv
            }
            WriteFormat::Parquet => {
                (3)
                ctx.register_parquet("input", input_path,
                                     ParquetReadOptions::default()).await?;
                WriteFormat::Parquet
            }
            WriteFormat::Undefined => {
                return Err(MDataAppError::InputFormat { path: opts.input })
                ;
            }
        };
 
    (3)
    <<data-transformation>>
 
    Ok(())
}
  • (1) : ключевое слово async указывает на контекст асинхронной функции.
  • (2): мы вызываем функцию infer_file_type с нашим аргументом input_path. Предполагаемый тип файла основан на расширении файла или структуре каталогов.
  • (3): мы используем функции Apache Arrow DataFusion register_csv и register_parquet. Эти функции принимают в качестве входных данных имя источника, путь и параметры. Мы регистрируем источник данных в механизме запросов Apache Arrow DataFusion с его именем; в нашем случае "input". Это позволяет нам получить доступ к файлам CSV или Parquet в следующих частях функции, используя общий подход.
  • (4): наконец, мы применяем функции загрузки, преобразования и записи данных в блоке <<data-transformation>>.

Операции Apache Arrow DataFusion

Библиотека Apache Arrow DataFusion предлагает интерфейс, похожий на DataFrame, обеспечивающий простой и стандартный доступ к операторам данных. Мы загружаем нашу начальную зарегистрированную функцию из нашего контекста ctx. Затем мы можем применить к нему преобразования. В этой функции мы используем две операции преобразования limit и filter.

→ Rust «data-transformation» =
 
let df_input = ctx.table("input")?; (1)
 
// get the table schema
(2)
let schema = df_input.schema();
 
// print-it
if opts.schema {
    info!("# Schema\n{:#?}", schema)
}
 
// process user filters
(3)
let df_flt =
    if let Some(Filters::Eq { column: column_name, value }) = &opts.filter {
        (4)
        // get the data type of the filtered column
        let filter_type = schema.field_with_name(None,
                                         column_name)?.data_type();
        (5)
        // parse the filter value based on column type
        let filter_value = match filter_type {
            DataType::Boolean => lit(value.to_string().parse::<bool>()?),
            DataType::Utf8 => lit(value.to_string()),
            x if DataType::is_numeric(x) =>
                lit(value.to_string().parse::<f64>()?),
            _ => return Err(MDataAppError::FilterValue {
                error_message: "invalid filter value".to_string()
            })
        };
 
        (6)
        // filter the current dataframe
        df_input.filter(col(column_name).eq(filter_value))?
    } else {
        df_input
    };
 
// if limit is active, only N first rows are written
let df_lmt = if opts.limit > 0 {
    (7)
    df_flt.limit(opts.limit)?
} else {
    df_flt
};
 
// if the output format is undefined, default to input format
(8)
let output_format = match opts.format {
    WriteFormat::Undefined => {
        if inferred_input_format == WriteFormat::Undefined {
            WriteFormat::Undefined
        } else {
            inferred_input_format
        }
    }
    _ => opts.format.clone()
};
 
(9)
match output_format {
    WriteFormat::Csv => df_lmt.write_csv(output_path).await?,
    WriteFormat::Parquet => df_lmt.write_parquet(output_path, None).await?,
    WriteFormat::Undefined => {
        return Err(
            MDataAppError::OutputFormat {
                format: opts.format.clone(),
            },
        );
    }
}
  • (1): мы связываем объект фрейма данных с ранее сопоставленным файлом "input". С помощью этого объекта мы можем вызывать другие операторы Apache Arrow DataFusion.
  • (2): с помощью оператора schema мы получаем доступ к предполагаемой схеме данных, используемой платформой. Я печатаю его как отладочную информацию, если включено подробное ведение журнала.
  • (3)-(6): если конечный пользователь указал преобразование фильтра, мы загружаем и проверяем параметры (3), получаем тип столбца из схемы информации (4), извлеките и проверьте значение фильтра (5) и, наконец, примените оператор filter с режимом eq (равенство), используя извлеченную спецификацию конечного пользователя (6).
  • (7): мы применяем простой оператор ограничения, ограничивающий количество записываемых строк.
  • (8)–(9): наконец, формат вывода выводится или указывается конечным пользователем (8). Мы пишем окончательные результаты в формате CSV или Parquet (9).

Мы доработали нашу функцию преобразования данных. Некоторые замечания об этом коде:

  • Я определяю все свои операции с данными с помощью следующего кода let df_xxx = <expression>. Это показывает, что Rust в основном является языком, основанным на выражениях: if, if let, match, ... блоков кода возвращают значение, которое мы можем присвоить переменной. В языках C/C++ блок кода if является оператором: он не производит никакого значения.
  • Каждому преобразованию присваивается новая переменная df_xxx. Это необязательно, и вы также можете использовать переменную shadowing in Rust. В этом случае вы определяете множители let df = ... с одинаковым именем переменной. Каждая новая переменная df скрывает предыдущую.

Заворачивать

Вот и все! Мы доработали короткую и эффективную функцию для обработки наших файлов данных. В этой функции у нас есть лишь краткий обзор возможностей Apache Arrow DataFusion.

В этом примере я выбрал DataFrame API, но мы можем легко использовать язык SQL с помощью SQL API.

В последней статье будет показано, как мы можем протестировать эту программу, используя поддельные наборы данных.

Оставайтесь с нами, и спасибо за чтение!

Рекомендации

[1] Г. Б. де Падуа и В. Шанг, «Изучение взаимосвязи между методами обработки исключений и дефектами после выпуска», в Материалы 15-й международной конференции по репозиториям программного обеспечения для майнинга, 2018 г., стр. 564. –575.