Каналы — это конструкция, упрощающая параллельное выполнение и конвейерную обработку данных, и ее часто называют одним из главных преимуществ Go. Но знаете ли вы, что .NET также имеет встроенную поддержку каналов?

Если вы создаете приложения, которые обрабатывают большие объемы данных или вам необходимо взаимодействовать с несколькими API-интерфейсами для некоторого задания обработки при агрегировании результатов, использование каналов может повысить общую пропускную способность и скорость отклика приложения, позволяя выполнять эти вызовы API одновременно, в то время как объединение результатов и/или выполнение постобработки в виде потока.

Давайте посмотрим, как мы можем использовать каналы в C# 10 с .NET 6 и насколько легко реализовать параллельную обработку с помощью встроенного System.Threading.Channels.

Если вы хотите следить за репозиторием, проверьте его здесь:



Вариант использования

Для начала представьте, что мы создаем приложение для согласования календаря. У пользователя есть два или более календаря (например, Google, Outlook и Календарь iCloud), из которых мы хотели бы прочитать и найти конфликтующие события во всех календарях пользователя.

Один из способов сделать это — просто перебрать каждый календарь и собрать все события:

# Pseudo code:

# Use an interval tree to hold our events and detect conflicts
var interval_tree = new IntervalTree()

do {
  # API calls to get the google events; 3s
} while (has_more_google_events)

do {
  # API calls to get the outlook events; 4s
} while (has_more_outlook_events)

do {
  # API calls to get the iCloud events; 3s
} while (has_more_icloud_events)
# Requires 10s if executed sequentially!

Мы можем использовать структуру данных IntervalTree в качестве механизма для представления событий в виде интервалов, чтобы мы могли легко запрашивать, где у нас есть конфликты. Согласно документам:

Запросы требуют O(log n + m) времени, где n — это общее количество интервалов, а m — количество полученных результатов. Строительство требует O(n log n) времени, а хранение требует O(n) места.

Это дает нам довольно эффективный способ проверки совпадений после того, как мы получили все события из разных календарей.

Проблема с этим последовательным подходом к извлечению заключается в том, что каждый из вызовов API для получения событий от поставщиков привязан к вводу-выводу; другими словами, большая часть времени будет потрачена на то, чтобы сеть выполняла вызов API, и выполнение этого последовательно означает, что наша программа должна тратить много времени на ожидание в сети.

Если вызовы занимают в среднем [3s, 4s, 3s], то общее время последовательной обработки этой операции составляет 10 с. Но если бы мы могли делать это одновременно, наше общее время работы было бы ближе к 4 с.

Почему это важно? В бессерверном мире выставление счетов обычно осуществляется по показателю вычислений/времени, например по количеству виртуальных ЦП/секунд. Таким образом, если можно выполнить одну и ту же задачу за меньшее время — особенно задачи, связанные с вводом-выводом, такие как вызовы HTTP API, которые не нагружают ЦП, — тогда эксплуатационные расходы будут сэкономлены за счет повышения пропускная способность (не говоря уже о большей отзывчивости для взаимодействия с конечным пользователем).

.NET предоставляет множество различных вариантов синхронизации этого параллельного выполнения, таких как ConcurrentQueue или прямое использование примитива блокировки (например, мьютекса или семафора) для управления доступом к общему ресурсу (дереву интервалов), но сегодня давайте взглянем на абстракция производителя/потребителя более высокого уровня с использованием System.Threading.Channels, которая обеспечивает простую в использовании парадигму для управления синхронизацией параллельных потоков выполнения.

Насмешка над API календаря

Чтобы имитировать это, мы собираемся создать набор простых фиктивных провайдеров вместо реальных вызовов API, которые возвращают страницы событий из Календаря Google, Outlook и Apple iCloud Calendar (в этом случае никаких реальных вызовов API, поскольку это потребует настройки токены OAuth!).

Если вы не знакомы с C#, Task<> эквивалентно Promise<> в TypeScript (или просто Promise в JavaScript); если вы хотите узнать больше о том, насколько похожи JavaScript, TypeScript и C#, ознакомьтесь с этим репозиторием:



Каждый из наших провайдеров — просто для целей моделирования — создает список событий, которые мы будем листать:

Это приведет к возврату 3 «страниц» данных при вызове GetCalendarEventsAsync.

(Вы можете легко разветвить код и поэкспериментировать с ним, чтобы увидеть, как разные расписания будут генерировать конфликты.)

Параллельное выполнение

Ядром приложения является простая настройка набора одновременных вызовов, которые будут использовать System.Threading.Channel для связи между параллельным выполнением и агрегацией в средстве проверки конфликтов в нашем планировщике.

Начнем с создания нашего канала:

Теперь сразу начинаем наш Scheduler:

И настроим наши параллельные задачи для каждого из трех календарей, проходящих на стороне производителя канала — writer:

Приведенный выше код устанавливает наши одновременные вызовы; каждый поставщик календаря получает ссылку на наш writer.

А теперь ждем, когда закончат все наши провайдеры, а затем сигнализируем нашему каналу, что запись завершена (все события из наших календарей у нас есть):

И, наконец, мы ждем, когда наш планировщик закончит работу:

Выход

В нашем примере мы получим следующий вывод:

Обратите внимание, что хотя мы выполняем всего 12 вызовов (последний набор вызовов возвращает 0 результатов, указывающих на конец набора данных) со случайным ожиданием до 1 с, наше выполнение завершается всего за 2,566 с (в данном случае)! Очень круто и почти нет работы, чтобы сделать это одновременно!

Теперь давайте посмотрим, как мы на самом деле используем канал.

Сторона продюсера

Это сторона, которая пишет в канал. Другими словами, когда мы делаем вызовы API и извлекаем страницы событий, мы хотим передать эти события планировщику через канал.

Это удивительно просто:

Когда мы получаем события, мы просто записываем их в канал, используя конец записи. Наш канал даже строго типизирован:

Потребительская сторона

Это сторона, которая читает из канала. В этом случае, поскольку наши поставщики событий календаря выполняют вызовы API, возвращают результаты и записывают их в канал, мы собираемся использовать Scheduler для чтения из канала и проверки на наличие конфликтов.

Это также удивительно просто:

Вот и все!

Вызов writer.Complete() сигнализирует стороне чтения, что все сообщения были записаны в канал и мы можем выйти из цикла.

Как и сторона записи, сторона чтения также строго типизирована, поэтому мы точно знаем, что получаем от канала.

Дерево интервалов — это приятный бонус, который позволяет невероятно легко обнаруживать перекрытия, просто добавляя событие в дерево, а затем запрашивая дерево, чтобы увидеть, есть ли у нас более 1 события для данного интервала.

Практически без каких-либо дополнительных усилий и сложностей мы написали механизм обнаружения конфликтов планирования календаря, который может выполняться одновременно и извлекать события из нескольких конечных точек для обнаружения конфликтов!

Заворачивать

Мы только поцарапали поверхность с каналами. Можно создавать сложные сценарии с несколькими производителями и несколькими потребителями; конвейеры многоэтапной обработки данных; и реализовывать другие сценарии с высокой пропускной способностью, используя каналы, а не методы синхронизации на основе блокировок.

Майкл Шпильт написал отличную статью о производительности различных методов синхронизации в .NET:

(Следует отметить, что внутренняя реализация параллельного хранения Channel на самом деле является ConcurrentQueue)

System.Threading.Channels — это одна из множества веских причин, по которым следует рассмотреть возможность использования .NET и C# для серверных или ресурсоемких задач. Для задач с интенсивным вводом-выводом, которые могут выполняться одновременно, использование каналов может значительно повысить производительность и пропускную способность.

Если вы заинтересовались, ознакомьтесь с этими прошлыми статьями:

Если вам понравилась эта статья, подпишитесь и подпишитесь на обновления по электронной почте. Отметить меня в Твиттере @chrlschnили LinkedIn; Я хотел бы услышать от вас!