Принимайте и комбинируйте данные программно, используя очень гибкие входные таблицы.
Девин Смит
Динамические данные поступают из различных источников. Работа с этими данными в режиме реального времени открывает множество возможностей для аналитики и приложений. В современных системах важны не только хорошо известные протоколы, такие как Kafka, но и ваши пользовательские источники. Во многих случаях вам может понадобиться передавать данные из приложений без подключения Kafka.
Deepphaven — это механизм обработки данных, который просто работает с потоковыми таблицами — по сути, таблицами, которые могут обновляться время от времени или быстро меняться в режиме реального времени. В продукте deepphaven-core мы недавно выпустили строительный блок под названием таблица ввода. Он предназначен для размещения большого количества настраиваемых источников данных, которые вас интересуют. Например, вы сможете принимать в режиме реального времени:
- Сторонний канал данных ✓
- Устройство IOT, производящее данные о погоде ✓
- Выводы пользовательского интерфейса, который затем может взаимодействовать с вашим бэкэндом ✓
Это просто несколько возможностей.
Входные таблицы Deephaven предлагают мощный, гибкий и простой API для добавления ваших собственных источников динамических данных. API входных таблиц предоставляется через gRPC.
В этом блоге демонстрируется использование этой функции в удаленном Java-клиенте, но входные таблицы также можно включить непосредственно с сервера (через Режим приложения) или через сеанс скрипта в веб-IDE. Приведенные ниже примеры можно легко адаптировать для работы с клиентом JS или (в будущем) с клиентом Python или C++ (или любым другим языком с поддержкой gRPC).
После того, как вы добавили интеграцию с входной таблицей, вы можете использовать все мощные функции, которые наследуют таблицы Deephaven: объединение, фильтрация, манипулирование, группировка, встраивание функций Python и многое другое. Они становятся просто еще одной исходной таблицей в направленном ациклическом графе вашего живого, обновляющегося запроса.
Типы
Входная таблица — это живая, обновляемая таблица, в которую данные можно добавлять программно. Концептуально это похоже на оператор SQL INSERT или MERGE.
В настоящее время существует два типа входных таблиц:
- Вводная таблица только для добавления. Эти входные таблицы добавляют все новые строки в конец таблицы. Они подходят для случаев использования, когда необходима полная история обновлений. Например, журнал аудита на основе событий, поток событий Интернета вещей или лента социальных сетей.
- Таблица ввода с поддержкой ключей. Эти таблицы ввода содержат ключевые столбцы (ноль или более). Новая строка добавляется, если ключ новый, и существующая строка обновляется, если ключ уже существует. (Также можно удалять строки на основе ключа.) Они подходят для случаев использования, когда (i) есть важный ключ, связанный с данными, и (ii) полная история обновлений не требуется. Наиболее релевантными примерами являются текущая ценовая лента с указанием названия символа, текущая погода с указанием названия города или текущий счет бейсбольного матча с указанием названия команды. Также возможен пустой ключ (ноль столбцов); например, состояние одного выключателя света.
Запрос на создание входной таблицы
Удаленный клиент может создать входную таблицу с помощью RPC запроса на создание входной таблицы. Наиболее важной частью является определение, которое может быть указано с помощью Схемы стрелок (описывающей имена и типы столбцов) или путем ссылки на определение существующей таблицы. Для таблицы ввода с поддержкой ключей также важны ключевые столбцы.
Java-клиент имеет классы InMemoryAppendOnlyInputTable и InMemoryKeyBackedInputTable для представления каждого типа входной таблицы.
Вот пример создания входной таблицы только для добавления с именем audit_log
из клиента Java:
// Create an append-only input table with a timestamp column "Timestamp", a string column "Type", and a string column "Log"
TableHeader header = TableHeader.of(
ColumnHeader.ofInstant("Timestamp"),
ColumnHeader.ofString("Type"),
ColumnHeader.ofString("Log"));
try (TableHandle handle = session.execute(InMemoryAppendOnlyInputTable.of(header))) {
// Publish `audit_log` so that the table can be referenced in the query scope by other sessions
session.publish("audit_log", handle);
}
Вот пример создания входной таблицы с ключом с именем city_weather
из клиента Java:
// Create a key-backed input table with a string key column "City", a timestamp column "Timestamp", and a double column "Degrees"
TableHeader header = TableHeader.of(
ColumnHeader.ofString("City"),
ColumnHeader.ofInstant("Timestamp"),
ColumnHeader.ofDouble("Degrees")), Collections.singletonList("City"));
try (TableHandle handle = session.execute(InMemoryKeyBackedInputTable.of(header))) {
// Publish `city_weather` so that the table can be referenced in the query scope by other sessions
session.publish("city_weather", handle);
}
После выполнения вышеуказанного вы сможете увидеть (пустые) таблицы в раскрывающемся списке Панели веб-интерфейса, обычно доступном по адресу http://localhost:10000/ide/.
Полный исполняемый исходный код для этих примеров доступен в Репозитории входных таблиц примеров Deepphaven.
Добавить запрос таблицы
Создав входную таблицу, вы, вероятно, захотите добавить в нее данные. Удаленный клиент может добавлять данные во входную таблицу с помощью RPC запроса на добавление таблицы. Данные таблицы экспортируются на сервер через Arrow Flight DoPut RPC, а затем добавляются в созданную ранее входную таблицу.
Например, чтобы добавить новые строки во входную таблицу audit_log
из клиента Java, вы можете выполнить:
// Create a table in the builder-style pattern using the 'audit_log' column types NewTable table = ofInstant("Timestamp") .header(ofString("Type")) .header(ofString("Log")) .row(Instant.now(), "NEW_USER", "Added new user 'devin'") .row(Instant.now(), "PASSWORD_RESET", "Password reset for user 'devin'") .newTable();
// Add table into the input table 'audit_log' session.addToInputTable(new ScopeId("audit_log"), table, allocator);
Чтобы добавить или обновить строки во входной таблице city_weather
из клиента Java, вы можете выполнить:
// Create a table in the builder-style pattern using the 'city_weather' column types NewTable table = ofString("City") .header(ofInstant("Timestamp")) .header(ofDouble("Degrees")) .row("Minneapolis, MN", Instant.now(), 35.4) .row("Seattle, WA", Instant.now(), 53.7) .newTable();
// Add table into the input table 'city_weather' session.addToInputTable(new ScopeId("city_weather"), table, allocator);
После выполнения вышеуказанного таблицы в веб-интерфейсе покажут, что данные были успешно добавлены.
Интеграции
Приведенные выше примеры намеренно минимальны, чтобы изолировать основные концепции (i) создания входной таблицы, а затем (ii) добавления к ней. Конечно, природа API такова, что их можно интегрировать с другими системами. Этот простой пример вызова со статическими примерными данными можно легко расширить до реальной интеграции, получая данные программным путем, потенциально даже с высокой скоростью. audit_log
можно интегрировать в качестве приемника для журнала аудита реального приложения; city_weather
можно интегрировать с реальным API погоды.
API-интерфейсы входных таблиц Deephaven представляют собой низкий барьер для входа в потоковую экосистему Deephaven.
Я в восторге от множества вариантов использования для интеграции входных таблиц:
- Устройства IOT, производящие данные, могут использовать входные таблицы и Deephaven для управления домашней автоматизацией.
- Таблицы ввода могут служить коммуникационным уровнем между графическими интерфейсами и внутренними процессами приложений.
- Спортивные инсайдеры могут создавать новые возможности для клиентов, используя входные таблицы и Deephaven, чтобы манипулировать детальными деталями каждой игры, чтобы радовать болельщиков в режиме реального времени.
- Данные блокчейна могут управлять (не блокчейновыми) приложениями для облегчения аналитики и опыта.
- Быстрые POC могут быть разработаны для телеметрии в здравоохранении.
Родственные понятия
Входные таблицы — не единственный способ представления динамических данных в Deephaven. Среди других методов потоки Kafka часто используются для получения данных в реальном времени в живые таблицы Deephaven. Они особенно полезны для систем, в которых уже установлена инфраструктура Kafka. В таком случае сервер Deephaven действует как потребитель для сервера Kafka, и либо сервер Deephaven, либо удаленные клиенты выступают в качестве производителя для сервера Kafka с использованием API Kafka.
Исходный код
Исходный код приведенных выше примеров доступен по адресу deepphaven-examples/input-tables. Он содержит полные исполняемые примеры, которые обеспечивают настройку и контекст для создания и добавления входных таблиц с удаленного клиента. У него также есть другие примеры, кроме тех, что были продемонстрированы выше.