Принимайте и комбинируйте данные программно, используя очень гибкие входные таблицы.

Девин Смит

Динамические данные поступают из различных источников. Работа с этими данными в режиме реального времени открывает множество возможностей для аналитики и приложений. В современных системах важны не только хорошо известные протоколы, такие как Kafka, но и ваши пользовательские источники. Во многих случаях вам может понадобиться передавать данные из приложений без подключения Kafka.

Deepphaven — это механизм обработки данных, который просто работает с потоковыми таблицами — по сути, таблицами, которые могут обновляться время от времени или быстро меняться в режиме реального времени. В продукте deepphaven-core мы недавно выпустили строительный блок под названием таблица ввода. Он предназначен для размещения большого количества настраиваемых источников данных, которые вас интересуют. Например, вы сможете принимать в режиме реального времени:

  • Сторонний канал данных ✓
  • Устройство IOT, производящее данные о погоде ✓
  • Выводы пользовательского интерфейса, который затем может взаимодействовать с вашим бэкэндом ✓

Это просто несколько возможностей.

Входные таблицы Deephaven предлагают мощный, гибкий и простой API для добавления ваших собственных источников динамических данных. API входных таблиц предоставляется через gRPC.
В этом блоге демонстрируется использование этой функции в удаленном Java-клиенте, но входные таблицы также можно включить непосредственно с сервера (через Режим приложения) или через сеанс скрипта в веб-IDE. Приведенные ниже примеры можно легко адаптировать для работы с клиентом JS или (в будущем) с клиентом Python или C++ (или любым другим языком с поддержкой gRPC).

После того, как вы добавили интеграцию с входной таблицей, вы можете использовать все мощные функции, которые наследуют таблицы Deephaven: объединение, фильтрация, манипулирование, группировка, встраивание функций Python и многое другое. Они становятся просто еще одной исходной таблицей в направленном ациклическом графе вашего живого, обновляющегося запроса.

Типы

Входная таблица — это живая, обновляемая таблица, в которую данные можно добавлять программно. Концептуально это похоже на оператор SQL INSERT или MERGE.

В настоящее время существует два типа входных таблиц:

  1. Вводная таблица только для добавления. Эти входные таблицы добавляют все новые строки в конец таблицы. Они подходят для случаев использования, когда необходима полная история обновлений. Например, журнал аудита на основе событий, поток событий Интернета вещей или лента социальных сетей.
  2. Таблица ввода с поддержкой ключей. Эти таблицы ввода содержат ключевые столбцы (ноль или более). Новая строка добавляется, если ключ новый, и существующая строка обновляется, если ключ уже существует. (Также можно удалять строки на основе ключа.) Они подходят для случаев использования, когда (i) есть важный ключ, связанный с данными, и (ii) полная история обновлений не требуется. Наиболее релевантными примерами являются текущая ценовая лента с указанием названия символа, текущая погода с указанием названия города или текущий счет бейсбольного матча с указанием названия команды. Также возможен пустой ключ (ноль столбцов); например, состояние одного выключателя света.

Запрос на создание входной таблицы

Удаленный клиент может создать входную таблицу с помощью RPC запроса на создание входной таблицы. Наиболее важной частью является определение, которое может быть указано с помощью Схемы стрелок (описывающей имена и типы столбцов) или путем ссылки на определение существующей таблицы. Для таблицы ввода с поддержкой ключей также важны ключевые столбцы.

Java-клиент имеет классы InMemoryAppendOnlyInputTable и InMemoryKeyBackedInputTable для представления каждого типа входной таблицы.

Вот пример создания входной таблицы только для добавления с именем audit_log из клиента Java:

// Create an append-only input table with a timestamp column "Timestamp", a string column "Type", and a string column "Log"
TableHeader header = TableHeader.of(
        ColumnHeader.ofInstant("Timestamp"),
        ColumnHeader.ofString("Type"),
        ColumnHeader.ofString("Log"));
try (TableHandle handle = session.execute(InMemoryAppendOnlyInputTable.of(header))) {
    // Publish `audit_log` so that the table can be referenced in the query scope by other sessions
    session.publish("audit_log", handle);
}

Вот пример создания входной таблицы с ключом с именем city_weather из клиента Java:

// Create a key-backed input table with a string key column "City", a timestamp column "Timestamp", and a double column "Degrees"
TableHeader header = TableHeader.of(
        ColumnHeader.ofString("City"),
        ColumnHeader.ofInstant("Timestamp"),
        ColumnHeader.ofDouble("Degrees")), Collections.singletonList("City"));
try (TableHandle handle = session.execute(InMemoryKeyBackedInputTable.of(header))) {
    // Publish `city_weather` so that the table can be referenced in the query scope by other sessions
    session.publish("city_weather", handle);
}

После выполнения вышеуказанного вы сможете увидеть (пустые) таблицы в раскрывающемся списке Панели веб-интерфейса, обычно доступном по адресу http://localhost:10000/ide/.

Полный исполняемый исходный код для этих примеров доступен в Репозитории входных таблиц примеров Deepphaven.

Добавить запрос таблицы

Создав входную таблицу, вы, вероятно, захотите добавить в нее данные. Удаленный клиент может добавлять данные во входную таблицу с помощью RPC запроса на добавление таблицы. Данные таблицы экспортируются на сервер через Arrow Flight DoPut RPC, а затем добавляются в созданную ранее входную таблицу.

Например, чтобы добавить новые строки во входную таблицу audit_log из клиента Java, вы можете выполнить:

// Create a table in the builder-style pattern using the 'audit_log' column types
NewTable table = ofInstant("Timestamp")
        .header(ofString("Type"))
        .header(ofString("Log"))
        .row(Instant.now(), "NEW_USER", "Added new user 'devin'")
        .row(Instant.now(), "PASSWORD_RESET", "Password reset for user 'devin'")
        .newTable();
// Add table into the input table 'audit_log'
session.addToInputTable(new ScopeId("audit_log"), table, allocator);

Чтобы добавить или обновить строки во входной таблице city_weather из клиента Java, вы можете выполнить:

// Create a table in the builder-style pattern using the 'city_weather' column types
NewTable table = ofString("City")
        .header(ofInstant("Timestamp"))
        .header(ofDouble("Degrees"))
        .row("Minneapolis, MN", Instant.now(), 35.4)
        .row("Seattle, WA", Instant.now(), 53.7)
        .newTable();
// Add table into the input table 'city_weather'
session.addToInputTable(new ScopeId("city_weather"), table, allocator);

После выполнения вышеуказанного таблицы в веб-интерфейсе покажут, что данные были успешно добавлены.

Интеграции

Приведенные выше примеры намеренно минимальны, чтобы изолировать основные концепции (i) создания входной таблицы, а затем (ii) добавления к ней. Конечно, природа API такова, что их можно интегрировать с другими системами. Этот простой пример вызова со статическими примерными данными можно легко расширить до реальной интеграции, получая данные программным путем, потенциально даже с высокой скоростью. audit_log можно интегрировать в качестве приемника для журнала аудита реального приложения; city_weather можно интегрировать с реальным API погоды.

API-интерфейсы входных таблиц Deephaven представляют собой низкий барьер для входа в потоковую экосистему Deephaven.

Я в восторге от множества вариантов использования для интеграции входных таблиц:

  • Устройства IOT, производящие данные, могут использовать входные таблицы и Deephaven для управления домашней автоматизацией.
  • Таблицы ввода могут служить коммуникационным уровнем между графическими интерфейсами и внутренними процессами приложений.
  • Спортивные инсайдеры могут создавать новые возможности для клиентов, используя входные таблицы и Deephaven, чтобы манипулировать детальными деталями каждой игры, чтобы радовать болельщиков в режиме реального времени.
  • Данные блокчейна могут управлять (не блокчейновыми) приложениями для облегчения аналитики и опыта.
  • Быстрые POC могут быть разработаны для телеметрии в здравоохранении.

Родственные понятия

Входные таблицы — не единственный способ представления динамических данных в Deephaven. Среди других методов потоки Kafka часто используются для получения данных в реальном времени в живые таблицы Deephaven. Они особенно полезны для систем, в которых уже установлена ​​инфраструктура Kafka. В таком случае сервер Deephaven действует как потребитель для сервера Kafka, и либо сервер Deephaven, либо удаленные клиенты выступают в качестве производителя для сервера Kafka с использованием API Kafka.

Исходный код

Исходный код приведенных выше примеров доступен по адресу deepphaven-examples/input-tables. Он содержит полные исполняемые примеры, которые обеспечивают настройку и контекст для создания и добавления входных таблиц с удаленного клиента. У него также есть другие примеры, кроме тех, что были продемонстрированы выше.