Что такое потоковая обработка данных java
Перейти к содержимому

Что такое потоковая обработка данных java

  • автор:

Потоковая обработка с помощью полностью управляемых подсистем обработки данных с открытым кодом

В этой статье представлен пример решения потоковой передачи, использующего полностью управляемые службы данных Azure.

Архитектура

Скачайте файл Visio этой архитектуры.

Рабочий процесс

  1. Компонент Центров событий для Apache Kafka передает события от производителей Kafka.
  2. Apache Spark потребляет события. AKS предоставляет управляемую среду для заданий Apache Spark.
  3. Приложение, использующее Azure Cosmos DB для Apache Cassandra, записывает события в Cassandra. Эта база данных служит в качестве платформы хранения для событий. AKS размещает микрослужбы, которые выполняют запись в Cassandra.
  4. Компонент канала изменений Azure Cosmos DB обрабатывает события в реальном времени.
  5. Запланированные приложения выполняют пакетную обработку для событий в Cassandra.
  6. Хранилища эталонных данных обогащают сведения о событиях. Пакетные приложения записывают обогащенные сведения о событиях в PostgreSQL. К типичным хранилищам эталонных данных относятся следующие:
    • Azure Data Lake Storage, которое может хранить данные в открытых форматах, например Parquet.
    • Реляционные хранилища данных с открытым кодом, например PostgreSQL и MySQL.
  7. Пакетные приложения обрабатывают данные Cassandra. Такое приложение сохраняет обработанные данные в Базе данных Azure для PostgreSQL. Это реляционное хранилище данных предоставляет данные целевым приложениям, нуждающимся в обогащенной информации.
  8. Приложения и средства для создания отчетов анализируют данные в базе данных PostgreSQL. Например, Power BI подключается к базе данных с помощью соединителя Базы данных Azure для PostgreSQL. Эта служба отчетов затем отображает подробные визуальные представления данных.
  9. Кэш Azure для Redis предоставляет кэш в памяти. В этом решении кэш содержит данные о критических событиях. Приложение сохраняет данные в кэше и извлекает их из кэша.
  10. Веб-сайты и другие приложения используют кэшированные данные для улучшения времени отклика. Иногда данные недоступны в кэше. В таких случаях эти приложения используют шаблон программирования отдельно от кэша или подобную стратегию для извлечения данных из Cassandra в Azure Cosmos DB.

Компоненты

  • Центры событий — это полностью управляемая платформа потоковой передачи, которая может обрабатывать миллионы событий в секунду. Центры событий предоставляют конечную точку для Apache Kafka, популярной платформы обработки данных потоковой передачи с открытым кодом. Если организации используют компонент конечной точки, им не нужно создавать и обслуживать кластеры Kafka для обработки данных потоковой передачи. Вместо этого они могут получить все преимущества полностью управляемой реализации Kafka, предлагаемых Центрами событий.
  • Azure Cosmos DB — это полностью управляемая база данных NoSQL и реляционная база данных, которая предлагает репликацию с несколькими master. Azure Cosmos DB поддерживает API с открытым кодом для множества баз данных, языков и платформ. Примеры:
    • Apache Cassandra.
    • Gremlin.
    • MongoDB.

    С помощью Azure Cosmos DB для Apache Cassandra можно получить доступ к данным Azure Cosmos DB с помощью инструментов, языков и драйверов Apache Cassandra. Apache Cassandra — это база данных NoSQL с открытым кодом, которая хорошо подходит для рабочих нагрузок с высокой интенсивностью записи.

    Альтернативные варианты

    Продукты и службы с поддержкой открытого кода в этом решении можно заменить на другие. Дополнительные сведения о службах с открытым кодом, доступных в Azure, см. в статье Открытый код в Azure.

    Сведения о сценарии

    Полностью управляемые службы данных Azure, которые выполняют подсистемы с открытым исходным кодом, составляют это решение потоковой передачи:

    • Центры событий Azure предлагают реализацию Kafka для приема потоковой передачи.
    • Azure Cosmos DB поддерживает хранение событий в Cassandra.
    • Служба Azure Kubernetes (AKS) размещает микрослужбы Kubernetes для обработки потоковой передачи.
    • База данных Azure для PostgreSQL управляет реляционным хранилищем данных в PostgreSQL.
    • Кэш Azure для Redis управляет хранилищами данных Redis в памяти.

    Технологии с открытым кодом предоставляют множество преимуществ. Например, организации могут использовать технологии с открытым для следующего:

    • Перенос существующих рабочих нагрузок.
    • Поддержка широкого сообщества, работающего с открытым кодом.
    • Устранение зависимости от одного поставщика.

    Предоставляя технологии с открытым кодом, средства и службы Azure помогают организациям получить их преимущества и разработать нужные им решения.

    Это решение использует полностью управляемые службы на основе модели PaaS (платформа как услуга). Поэтому за установку исправлений, обслуживание в рамках SLA и другие задачи управления отвечает корпорация Майкрософт. Еще одним преимуществом является встроенная интеграция с инфраструктурой безопасности Azure.

    Потенциальные варианты использования

    Это решение применяется к разным сценариям:

    • Использование служб PaaS Azure для создания современных решений потоковой передачи, которые используют технологии с открытым кодом.
    • Перенос решений обработки потоковой передачи с открытым кодом в Azure.

    Рекомендации

    Эти рекомендации реализуют основные принципы Azure Well-Architected Framework, которая представляет собой набор руководящих принципов, которые можно использовать для повышения качества рабочей нагрузки. Дополнительные сведения см. в статье Microsoft Azure Well-Architected Framework.

    Разрабатывайте и реализуйте каждую службу с применением лучших методик. Рекомендации по каждой службе см. на сайте документации Майкрософт. Кроме того, ознакомьтесь со сведениями в следующих разделах:

    Производительность

    • Реализуйте пулы подключений для Базы данных Azure для PostgreSQL. Вы можете использовать библиотеку пулов подключений в самом приложении. Или же вы можете использовать средство для создания пулов подключений, например PgBouncer или Pgpool. Установка подключения с PostgreSQL — это ресурсоемкая операция. Благодаря пулам подключений вы можете избежать ухудшения производительности приложения. PgBouncer встроен в База данных Azure для PostgreSQL гибкий сервер.
    • Настройте Azure Cosmos DB для Apache Cassandra для оптимальной производительности с помощью соответствующей стратегии секционирования. Решите, будете ли вы использовать первичный ключ с одним полем, составной первичный ключ или составной ключ секции при секционировании таблиц.

    Масштабируемость

    • При выборе уровня Центра событий учитывайте свои требования к потоковой передаче:
      • Если вам требуется пропускная способность на уровне не более 120 Мбит/с, рекомендуем уровень «Премиум». Он позволяет гибко масштабироваться для соответствия требованиям потоковой передачи.
      • Для рабочих нагрузок потоковой передачи высокого уровня с входящим трафиком на уровне гигабайт данных, рекомендуем уровень «Выделенный». Этот уровень является однотенатным предложением с гарантированной емкостью. Масштаб выделенных кластеров можно уменьшать и увеличивать.

      Безопасность

      Безопасность обеспечивает гарантии от преднамеренных атак и злоупотреблений ценными данными и системами. Дополнительные сведения см. в статье Общие сведения о принципах безопасности.

      • Используйте Приватный канал Azure, чтобы включить службы Azure в свою виртуальную сеть. При использовании Приватного канала трафик между службами и вашей сетью передается по магистрали Azure без прохода по общедоступным сетям Интернета. Службы Azure в этом решении поддерживают Приватный канал только для отдельных SKU.
      • Проверьте соответствие политикам безопасности в своей организации. С помощью Azure Cosmos DB для Apache Cassandra ключи предоставляют доступ к таким ресурсам, как пространства ключей и таблицы. Такие ключи хранятся в экземпляре Azure Cosmos DB. Ваши политики безопасности могут требовать передачи таких ключей в службу управления ключами, например Azure Key Vault. Также обеспечьте смену ключей в соответствии с политиками организации.

      Устойчивость

      Используйте зоны доступности для защиты критически важных для бизнеса приложений от сбоев в центре обработки данных. Службы этого решения поддерживают зоны доступности для выбранных SKU в регионах с зонами доступности. Актуальные сведения см. в списке служб с поддержкой зон доступности.

      Оптимизация затрат

      Оптимизация затрат заключается в поиске способов уменьшения ненужных расходов и повышения эффективности работы. Дополнительные сведения см. в разделе Обзор критерия «Оптимизация затрат».

      Для оценки стоимости решения используйте калькулятор цен Azure. Также учитывайте следующее:

      • Центры событий доступны в уровнях «Базовый», «Стандартный», «Премиум» и «Выделенный». Уровни «Премиум» и «Выделенный» оптимальны для крупномасштабных рабочих нагрузок потоковой передачи. Вы можете масштабировать пропускную способность, поэтому мы рекомендуем начать с небольшого объема ресурсов и масштабироваться при повышении спроса.
      • Azure Cosmos DB предлагает две модели:
        • Модель с подготовленной пропускной способностью, которая идеально подходит для требовательных рабочих нагрузок. Эта модель доступна в двух вариантах управления емкостью: стандартный и с автомасштабированием.
        • Бессерверная модель, которая хорошо подходит для выполнения небольших рабочих нагрузок с частыми пиками.
        • Кластеризация
        • Сохраняемость
        • Активная георепликация

        Развертывание этого сценария

        При развертывании решения учитывайте следующее:

        • При развертывании Центров событий для Kafka изучите статью Краткое руководство. Потоковая передача данных с помощью Центров событий с использованием протокола Kafka. Эта статья содержит следующие сведения.
          • Как отправлять и получать сообщения с помощью Kafka в Центрах событий.
          • Пример кода для публикации приложения.
          • Как перенести существующие приложения Kafka в Центры событий для Kafka, внеся изменения в конфигурацию.
          • Сведения о создании базового приложения Spark См. в статье Подключение приложения Apache Spark к Центрам событий Azure.
          • Сведения о размещении приложения Spark в AKS см. в статье Выполнение заданий Apache Spark в AKS.
          • Как использовать предикаты запросов в языке запросов Cassandra (CQL) для отправки запросов к API канала изменений.
          • Пример кода для приложения Java.

          Соавторы

          Эта статья обновляется и поддерживается корпорацией Майкрософт. Первоначально она была написана следующими авторами.

          • Аджит Анантрам | Архитектор облачных решений

          Дальнейшие действия

          • Руководство разработчика Apache Kafka для концентраторов событий Azure
          • Часто задаваемые вопросы об Azure Cosmos DB для Apache Cassandra
          • Рекомендации по созданию приложения с помощью Базы данных Azure для PostgreSQL
          • Вопросы и ответы по кэшу Redis для Azure

          Связанные ресурсы

          Чтобы узнать больше о связанных решениях, см. следующие сведения:

          • Проектирование архитектуры аналитики
          • Выбор хранилища аналитических данных в Azure
          • Выбор технологии для анализа данных в Azure
          • Использование Azure Kubernetes при обработке потока событий
          • Потоковая передача данных с помощью AKS

          Потоки данных. Stream API. Общая информация

          Перед изучением данной темы рекомендуется ознакомиться со следующими темами:

          • Обобщения. Параметризованные типы. Обобщенные классы интерфейсы, методы
          • Лямбда-выражения. Основные понятия. Функциональный интерфейс. Примеры
          • Стандартные (предопределенные) функциональные интерфейсы Java

          Поиск на других ресурсах:

          1. Прикладной потоковый интерфейс Stream API. Характерные особенности

          Начиная с версии JDK 8 в языке Java были введены средства работы с потоками данных, которые получили название прикладной потоковый интерфейс API (Stream Application Programming Interface). Работа со средствами Stream API базируется на использовании лямбда-выражений.

          Характерными особенностями этого интерфейса является использование различных операций над потоками данных. К этим операциям можно отнести:

          • поиск данных;
          • модификация данных;
          • фильтрование потока данных для получения нового потока;
          • сортировка данных;
          • другие разнообразные манипуляции над данными.

          Операции над потоками данных выполняются на основе формирования соответствующих запросов. Интерфейс Stream API хорошо подходит для обработки больших массивов данных с применением механизмов распараллеливания.

          2. Поток данных. Определение

          Под понятием «поток данных» подразумевается канал передачи данных. Для потока данных определяется понятие источника данных. Источниками данных могут быть массив, коллекция, список и тому подобное. Поток данных оперирует этими источниками. Поток данных еще можно определить как последовательность объектов.

          В самом потоке данные не сохраняются, а только перемещаются при их обработке (фильтровании, сортировке и т.д.). При обработке потока данных источник данных не изменяется. Это означает, что при сортировке данных создается новый отсортированный поток данных, а начальный источник остается несортированным.

          3. Интерфейсы из пакета java.util.stream
          3.1. Интерфейс BaseStream . Методы

          Средства работы с потоками данных Stream API являются составляющей пакета java.util.stream . Данный пакет содержит набор потоковых интерфейсов, образующих иерархию.

          Базовым поточным интерфейсом является BaseStream , который имеет следующее объявление

          interface BaseStreamextends BaseStream>
          • T – тип элементов в потоке данных;
          • S – тип потока данных, который расширяет интерфейс BaseStream .

          В интерфейсе BaseStream объявляется ряд методов, перечисленных ниже.

          1. Закрыть поток данных

          void close()

          Метод закрывает вызывающий поток данных. Обязательно нужно закрывать потоки данных, связанные с файлами.

          2. Определить, является ли параллельным поток данных

          boolean isParallel()

          Метод возвращает true , если поток данных есть параллельным.

          3. Получить итератор для потока данных

          Iterator iterator()

          Метод получает итератор для потока данных и возвращает ссылку на него. Метод является конечной операцией.

          4. Задать обработчик события закрытия потока.

          S onClose(Runnable handler)
          • S – тип новосозданного потока данных;
          • handler – метод, содержащий код, который должен выполниться при закрытии потока.

          Метод onClose() возвращает новый поток данных с заданным обработчиком события закрытия. Указанный обработчик вызывается при закрытии потока данных. Метод является промежуточной операцией.

          5. Вернуть паралельный поток данных

          S parallel()
          • S – тип новосозданного параллельного потока данных.

          Метод возвращает параллельный поток данных основываясь на вызывающем потоке данных. Если вызывающий поток данных является параллельным, то он и возвращается. Это промежуточная операция.

          6. Вернуть последовательный поток данных

          S sequential()
          • S – тип новосозданного последовательного потока данных.

          Метод возвращает последовательный поток данных на основе вызывающего потока данных. Если вызывающий поток данных уже является последовательным, то этот поток и возвращается.

          Метод является промежуточной операцией.

          7. Получить итератор-разделитель

          Spliterator spliterator()

          Метод получает итератор-разделитель для потока данных и возвращает ссылку на него. Метод является конечной операцией.

          8. Вернуть неупорядоченный поток данных

          S unordered()
          • S – тип результирующего неупорядоченного потока данных.

          Метод возвращает неупорядоченный поток данных на основе вызывающего потока данных. Если вызывающий поток данных уже есть неупорядоченным, то именно он и возвращается. Метод является промежуточной операцией.

          3.2. Интерфейс Stream . Обзор методов

          От базового интерфейса BaseStream унаследованы несколько интерфейсов. Наиболее употребительным из них является обобщенный интерфейс Stream , который имеет следующее объявление

          interface Stream

          • T – тип элементов в потоке данных.

          В интерфейсе Stream определяется ряд методов, которые можно использовать при обработке потоков данных. Эти методы описываются далее.

          1. Накопить элементы в контейнере

           R collect(Collectorsuper T, A, R> collector)
          • R – тип контейнера, в котором накапливаются элементы;
          • T – тип елемента из вызывающего потока данных;
          • A – внутренний накопительный тип;
          • collector – функция накопления, которая представлена лямбда-выражением. Функция определяет порядок выполнения процесса накопления.

          Метод реализует операцию изменяемого сведения. Метод накапливает элементы в изменяющемся контейнере и возвращает этот контейнер.

          Метод collect() является конечной операцией.

          2. Получить количество элементов в потоке

          long count()

          Метод count() есть конечной операцией.

          3. Выработать новый поток данных по заданному фильтру

          Stream filter(Predicatesuper T> predicate)
          • T – тип элементов потока данных;
          • predicate – условие, по которому формируется новый поток данных.

          Метод filter() есть промежуточной операцией.

          4. Выполнить действие над каждым элементом потока данных

          void forEach(Consumersuper T> action)
          • action — ссылка на стандартный функциональный интерфейс Consumer . В интерфейсе Consumer реализован метод, который выполняет некоторое действие над элементом типа T. Это действие будет применяться к каждому элементу потока данных.

          Метод forEach() есть конечной операцией.

          5. Применить указанную функцию отображения для обобщенного типа R

           Stream map(Functionsuper T, ? extends R> map_function)
          • map_function – функция отображения, которая применяется к элементам из вызывающего потока данных. Результатом работы функции есть новый поток данных, который содержит эти элементы.

          Это промежуточная операция.

          6. Применить функцию отображения для потока типа DoubleStream

          DoubleStream mapToDouble(ToDoubleFunctin super T> map_function)
          • map_function — функция отображения, которая применяется к элементам вызывающего потока данных.

          Результатом работы функции mapToDouble() является новый поток данных типа DoubleStream . Тип элементов в потоке устанавливается Double . Это промежуточная операция.

          7. Применить функцию отображения для потока типа IntStream

          IntStream mapToInt(ToIntFunctin super T> map_function)
          • map_function — функция отображения, которая применяется к элементам вызывающего потока данных.

          Результатом работы функции mapToInt() является новый поток данных типа IntStream . Тип элементов в потоке устанавливается Integer . Это промежуточная операция.

          8. Применить функцию отображения для потока типа LongStream

          LongStream mapToLong(ToLongFunctin super T> map_function)
          • map_function — функция отображения, которая применяется к элементам вызывающего потока данных.

          На основе заданной функции отображения создается новый поток данных типа LongStream содержащий эти элементы. Это промежуточная операция.

          9. Поиск минимального значения в потоке данных типа T

          Optional min(Comparatorsuper T> comparator)
          • comparator — ссылка на метод, в котором описывается код сравнения двух элементов типа T. По коду этого метода определяется элемент с минимальным значением в потоке данных.

          Метод min() является конечной операцией.

          10. Поиск максимального значения в потоке данных типа T

          Optional max(Comparatorsuper T> comparator)
          • comparator — ссылка на метод, в котором описывается код сравнения двух элементов типа T. На основе кода этого метода определяется элемент с максимальным значением в потоке данных.

          Метод max() есть конечной операцией.

          11. Реализовать сведение для элементов в вызывающем потоке данных

          T reduce (T identityVal, BinaryOperator storage)
          • identityVal — значение идентичности, которое используется в сочетании с функцией storage , для получения такого же элемента без изменений;
          • storage — функция, которая оперирует двумя значениями типа T и возвращает результат.

          Метод reduce() является конечной операцией.

          12. Сортировка потока данных

          Stream sorted()

          Метод sorted() предназначен для сортировки потока данных в естественном порядке (по возрастанию элементов). Если нужно изменить порядок сортировки элементов, то нужно реализовать стандартный функциональный интерфейс Comparator и передать лямбда-выражение в данный метод. Это конечная операция.

          13. Создать массив из элементов в вызывающем потоке данных

          Object[] toArray()

          Метод toArray() используется для преобразования потока данных в массив типа Object[] . Метод позволяет оперировать любыми типами ( Integer , Double , Float и т.д.).

          Связанные темы

          • Обобщения. Параметризованные типы. Обобщенные классы интерфейсы, методы
          • Лямбда-выражения. Основные понятия. Функциональный интерфейс. Примеры
          • Стандартные (предопределенные) функциональные интерфейсы Java
          • Понятие конечной и промежуточной операции. Примеры. Отличия. Методы создания потока данных stream() , parallelStream()

          Реактивное программирование на Java: как, зачем и стоит ли? Часть II

          Реактивное программирование — один из самых актуальных трендов современности. Обучение ему — сложный процесс, особенно если нет подходящих материалов. В качестве своеобразного дайджеста может выступить эта статья. На конференции РИТ++ 2020 эксперт и тренер Luxoft Training Владимир Сонькин рассказал о фишках управления асинхронными потоками данных и подходах к ним, а также показал на примерах, в каких ситуациях нужна реактивность, и что она может дать.

          В первой части статьи рассказывалось о том, что привело к появлению реактивного программирования, где оно применяется, и что нам может дать асинхронность. Пришло время рассказать о следующем шаге, позволяющем получить максимум преимуществ от асинхронности, и это — реактивное программирование.

          Reactivity

          Реактивное программирование — это асинхронность, соединенная с потоковой обработкой данных. То есть если в асинхронной обработке нет блокировок потоков, но данные обрабатываются все равно порциями, то реактивность добавляет возможность обрабатывать данные потоком. Помните тот пример, когда начальник поручает задачу Васе, тот должен передать результат Диме, а Дима вернуть начальнику? Но у нас задача — это некая порция, и пока она не будет сделана, дальше передать ее нельзя. Такой подход действительно разгружает начальника, но Дима и Вася периодически простаивают, ведь Диме надо дождаться результатов работы Васи, а Васе — дождаться нового задания.

          Пример
          А теперь представьте, что задачу разбили на множество подзадач. И теперь они плывут непрерывным потоком:
          Пример 2
          Говорят, когда Генри Форд придумал свой конвейер, он повысил производительность труда в четыре раза, благодаря чему ему удалось сделать автомобили доступными. Здесь мы видим то же самое: у нас небольшие порции данных, а конвейер с потоком данных, и каждый обработчик пропускает через себя эти данные, каким-то образом их преобразовывая. В качестве Васи и Димы у нас выступают потоки выполнения (threads), обеспечивая, таким образом, многопоточную обработку данных.

          Схема технологии распараллеливания

          На этой схеме показаны разные технологии распараллеливания, добавлявшиеся в Java в разных версиях. Как мы видим, спецификация Reactive Streams на вершине — она не заменяет всего, что было до нее, но добавляет самый высокий уровень абстракции, а значит ее использование просто и эффективно. Попробуем в этом разобраться.

          Идея реактивности построена на паттерне проектирования Observer.

          Observer

          Давайте вспомним, что это за паттерн. У нас есть подписчики и то, на что мы подписываемся. В качестве примера здесь рассмотрен Твиттер, но подписаться на какое-то сообщество или человека, а потом получать обновления можно в любой соцсети. После подписки, как только появляется новое сообщение, всем подписчикам приходит notify, то есть уведомление. Это базовый паттерн.

          В данной схеме есть:

          • Publisher — тот, кто публикует новые сообщения;
          • Observer — тот, кто на них подписан. В реактивных потоках подписчик обычно называется Subscriber. Термины разные, но по сути это одно и то же. В большинстве сообществ более привычны термины Publisher/Subscriber.

          Это базовая идея, на которой все строится.

          Один из жизненных примеров реактивности — система оповещения при пожаре. Допустим, нам надо сделать систему, включающую тревогу в случае превышения задымленности и температуры.

          Пример реактивности

          У нас есть датчик дыма и градусник. Когда дыма становится много и/или температура растет, на соответствующих датчиках увеличивается значение. Когда значение и температура на датчике дыма оказываются выше пороговых, включается колокольчик и оповещает о тревоге.

          Если бы у нас был традиционный, а не реактивный подход, мы бы писали код, который каждые пять минут опрашивает детектор дыма и датчик температуры, и включает или выключает колокольчик. Однако в реактивном подходе за нас это делает реактивный фреймворк, а мы только прописываем условия: колокольчик активен, когда детектор больше X, а температура больше Y. Это происходит каждый раз, когда приходит новое событие.

          От детектора дыма идет поток данных: например, значение 10, потом 12, и т.д. Температура тоже меняется, это другой поток данных — 20, 25, 15. Каждый раз, когда появляется новое значение, результат пересчитывается, что приводит к включению или выключению системы оповещения. Нам достаточно сформулировать условие, при котором колокольчик должен включиться.

          Если вернуться к паттерну Observer, у нас детектор дыма и термометр — это публикаторы сообщений, то есть источники данных (Publisher), а колокольчик на них подписан, то есть он Subscriber, или наблюдатель (Observer).

          Пример реактивности

          Немного разобравшись с идеей реактивности, давайте углубимся в реактивный подход. Мы поговорим об операторах реактивного программирования. Операторы позволяют каким-либо образом трансформировать потоки данных, меняя данные и создавая новые потоки. Для примера рассмотрим оператор distinctUntilChanged. Он убирает одинаковые значения, идущие друг за другом. Действительно, если значение на детекторе дыма не изменилось — зачем нам на него реагировать и что-то там пересчитывать:

          Оператор distinctUntilChanged

          Reactive approach

          Рассмотрим еще один пример: допустим, мы разрабатываем UI, и нам нужно отслеживать двойные нажатия мышкой. Тройной клик будем считать как двойной.

          Пример реактивного подхода

          Клики здесь — это поток щелчков мышкой (на схеме 1, 2, 1, 3). Нам нужно их сгруппировать. Для этого мы используем оператор throttle. Говорим, что если два события (два клика) произошли в течение 250 мс, их нужно сгруппировать. На второй схеме представлены сгруппированные значения (1, 2, 1, 3). Это поток данных, но уже обработанных — в данном случае сгрупированных.

          Таким образом начальный поток преобразовался в другой. Дальше нужно получить длину списка ( 1, 2, 1, 3). Фильтруем, оставляя только те значения, которые больше или равны 2. На нижней схеме осталось только два элемента (2, 3) — это и были двойные клики. Таким образом, мы преобразовали начальный поток в поток двойных кликов.

          Это и есть реактивное программирование: есть потоки на входе, каким-то образом мы пропускаем их через обработчики, и получаем поток на выходе. При этом вся обработка происходит асинхронно, то есть никто никого не ждет.

          Еще одна хорошая метафора — это система водопровода: есть трубы, одна подключена к другой, есть какие-то вентили, может быть, стоят очистители, нагреватели или охладители (это операторы), трубы разделяются или объединяются. Система работает, вода льется. Так и в реактивном программировании, только в водопроводе течет вода, а у нас — данные.

          Можно придумать потоковое приготовление супа. Например, есть задача максимально эффективно сварить много супа. Обычно берется кастрюля, в нее наливается порция воды, овощи нарезаются и т.д. Это не потоковый, а традиционный подход, когда мы варим суп порциями. Сварили эту кастрюлю, потом нужно ставить следующую, а после — еще одну. Соответственно, надо дождаться, пока в новой кастрюле снова закипит вода, растворится соль, специи и т.д. Все это занимает время.

          Представьте себе такой вариант: в трубе нужного диаметра (достаточного, чтобы заполнялась кастрюля) вода сразу подогревается до нужной температуры, есть нарезанная свекла и другие овощи. На вход они поступают целыми, а выходят уже шинкованными. В какой-то момент все смешивается, вода подсаливается и т.д. Это максимально эффективное приготовление, супоконвейер. И именно в этом идея реактивного подхода.

          Observable example

          Теперь посмотрим на код, в котором мы публикуем события:

          Пример Observable

          Observable.just позволяет положить в поток несколько значений, причем если обычные реактивные потоки содержат значения, растянутые во времени, то тут мы их кладем все сразу — то есть синхронно. В данном случае это названия городов, на которые в дальнейшем можно подписаться (тут для примера взяты города, в которых есть учебный центр Люксофт).

          Девушка (Publisher) опубликовала эти значения, а Observers на них подписываются и печатают значения из потока.

          Это похоже на потоки данных (Stream) в Java 8. И тут, и там синхронные потоки. И здесь, и в Java 8 список значений нам известен сразу. Но если бы использовался обычный для Java 8 поток, мы не могли бы туда что-то докладывать. В стрим ничего нельзя добавить: он синхронный. В нашем примере потоки асинхронные, то есть в любой момент времени в них могут появляться новые события — скажем, если через год откроется учебный центр в новой локации — она может добавиться в поток, и реактивные операторы правильно обработают эту ситуацию. Мы добавили события и сразу же на них подписались:

          Мы можем в любой момент добавить значение, которое через какое-то время выводится. Когда появляется новое значение, мы просим его напечатать, и на выходе получаем список значений:

          Список значений

          При этом есть возможность не только указать, что должно происходить, когда появляются новые значения, но и дополнительно отработать такие сценарии, как возникновение ошибок в потоке данных или завершение потока данных. Да-да, хотя часто потоки данных не завершаются (например, показания термометра или датчика дыма), многие потоки могут завершаться: например, поток данных с сервера или с другого микросервиса. В какой-то момент сервер закрывает соединение, и появляется потребность на это как-то отреагировать.

          Implementing and subscribing to an observer

          В Java 9 нет реализации реактивных потоков — только спецификация. Но есть несколько библиотек — реализаций реактивного подхода. В этом примере используется библиотека RxJava. Мы подписываемся на поток данных, и определяем несколько обработчиков, то есть методы, которые будут запущены в начале обработки потока (onSubscribe), при получении каждого очередного сообщения (onNext), при возникновении ошибки (onError) и при завершении потока (onComplete):

          Библиотека RxJava

          Давайте посмотрим на последнюю строчку.

          locations.map(String::length).filter(l -> l >= 5).subscribe(observer);

          Мы используем операторы map и filter. Если вы работали со стримами Java 8, вам, конечно, знакомы map и filter. Здесь они работают точно так же. Разница в том, что в реактивном программировании эти значения могут появляться постепенно. Каждый раз, когда приходит новое значение, оно проходит через все преобразования. Так, String::length заменит строчки на длину в каждой из строк.

          В данном случае получится 5 (Minsk), 6 (Krakow), 6 (Moscow), 4 (Kiev), 5 (Sofia). Фильтруем, оставляя только те, что больше 5. У нас получится список длин строк, которые больше 5 (Киев отсеется). Подписываемся на итоговый поток, после этого вызывается Observer и реагирует на значения в этом итоговом потоке. При каждом следующем значении он будет выводить длину:

          public void onNext(Integer value) System.out.println(«Length: » + value);

          То есть сначала появится Length 5, потом — Length 6. Когда наш поток завершится, будет вызван onComplete, а в конце появится надпись «Done.»:

          public void onComplete() System.out.println(«Done.»);

          Не все потоки могут завершаться. Но некоторые способны на это. Например, если мы читали что-то из файла, поток завершится, когда файл закончится.

          Если где-то произойдет ошибка, мы можем на нее отреагировать:

          public void onError(Throwable e) e.printStackTrace();

          Таким образом мы можем реагировать разными способами на разные события: на следующее значение, на завершение потока и на ошибочную ситуацию.

          Reactive Streams spec

          Реактивные потоки вошли в Java 9 как спецификация.

          Если предыдущие технологии (Completable Future, Fork/Join framework) получили свою имплементацию в JDK, то реактивные потоки имплементации не имеют. Есть только очень короткая спецификация. Там всего 4 интерфейса:

          Reactive Streams spec

          Если рассматривать наш пример из картинки про Твиттер, мы можем сказать, что:

          Publisher — девушка, которая постит твиты;

          Subscriber — подписчик. Он определяет , что делать, если:

          • Начали слушать поток (onSubscribe). Когда мы успешно подписались, вызовется эта функция;
          • Появилось очередное значение в потоке (onNext);
          • Появилось ошибочное значение (onError);
          • Поток завершился (onComplete).

          Subscription — у нас есть подписка, которую можно отменить (cancel) или запросить определенное количество значений (request(long n)). Мы можем определить поведение при каждом следующем значении, а можем забирать значения вручную.

          Processor — обработчик — это два в одном: он одновременно и Subscriber, и Publisher. Он принимает какие-то значения и куда-то их кладет.

          Если мы хотим на что-то подписаться, вызываем Subscribe, подписываемся, и потом каждый раз будем получать обновления. Можно запросить их вручную с помощью request. А можно определить поведение при приходе нового сообщения (onNext): что делать, если появилось новое сообщение, что делать, если пришла ошибка и что делать, если Publisher завершил поток. Мы можем определить эти callbacks, или отписаться (cancel).

          PUSH / PULL модели

          Существует две модели потоков:

          • Push-модель — когда идет «проталкивание» значений.

          Например, вы подписались на кого-то в Telegram или Instagram и получаете оповещения (они так и называются — push-сообщения, вы их не запрашиваете, они приходят сами). Это может быть, например, всплывающее сообщение. Можно определить, как реагировать на каждое новое сообщение.

          • Pull-модель — когда мы сами делаем запрос.

          Например, мы не хотим подписываться, т.к. информации и так слишком много, а хотим сами заходить на сайт и узнавать новости.

          Для Push-модели мы определяем callbacks, то есть функции, которые будут вызваны, когда придет очередное сообщение, а для Pull-модели можно воспользоваться методом request, когда мы захотим узнать, что новенького.

          Pull-модель очень важна для Backpressure — «напирания» сзади. Что же это такое?

          Вы можете быть просто заспамленными своими подписками. В этом случае прочитать их все нереально, и есть шанс потерять действительно важные данные — они просто утонут в этом потоке сообщений. Когда подписчик из-за большого потока информации не справляется со всем, что публикует Publisher, получается Backpressure.

          В этом случае можно использовать Pull-модель и делать request по одному сообщению, прежде всего из тех потоков данных, которые наиболее важны для вас.

          Implementations

          Давайте рассмотрим существующие реализации реактивных потоков:

          • RxJava. Эта библиотека реализована для разных языков. Помимо RxJava существует Rx для C#, JS, Kotlin, Scala и т.д.
          • Reactor Core. Был создан под эгидой Spring, и вошел в Spring 5.
          • Akka-стримы от создателя Scala Мартина Одерски. Они создали фреймворк Akka (подход с Actor), а Akka-стримы — это реализация реактивных потоков, которые дружат с этим фреймворком.

          Во многом эти реализации похожи, и все они реализуют спецификацию реактивных потоков из Java 9.

          Посмотрим подробнее на Spring’овский Reactor.

          Function may return…

          Давайте обобщим, что может возвращать функция:

          Что может возвращать функция

          • Single/Synchronous;

          Обычная функция возвращает одно значение, и делает это синхронно.

          • Multipple/Synchronous;

          Если мы используем Java 8, можем возвращать из функции поток данных Stream. Когда вернулось много значений, их можно отправлять на обработку. Но мы не можем отправить на обработку данные до того, как все они получены — ведь Stream работают только синхронно.

          • Single/Asynchronous;

          Здесь уже используется асинхронный подход, но функция возвращает только одно значение:

          • либо CompletableFuture (Java), и через какое-то время приходит асинхронный ответ;
          • либо Mono, возвращающая одно значение в библиотеке Spring Reactor.
          • Multiple/Asynchronous.

          А вот тут как раз — реактивные потоки. Они асинхронные, то есть возвращают значение не сразу, а через какое-то время. И именно в этом варианте можно получить поток значений, причем эти значения будут растянуты во времени Таким образом, мы комбинируем преимущества потоков Stream, позволяющих вернуть цепочку значений, и асинхронности, позволяющей отложить возврат значения.

          Например, вы читаете файл, а он меняется. В случае Single/Asynchronous вы через какое-то время получаете целиком весь файл. В случае Multiple/Asynchronous вы получаете поток данных из файла, который сразу же можно начинать обрабатывать. То есть можно одновременно читать данные, обрабатывать их, и, возможно, куда-то записывать. . Реактивные асинхронные потоки называются:

          • Publisher (в спецификации Java 9);
          • Observable (в RxJava);
          • Flux (в Spring Reactor).

          Netty as a non-blocking server

          Рассмотрим пример использования реактивных потоков Flux вместе со Spring Reactor. В основе Reactor лежит сервер Netty. Spring Reactor — это основа технологии, которую мы будем использовать. А сама технология называется WebFlux. Чтобы WebFlux работал, нужен асинхронный неблокирующий сервер.

          Схема работы сервера Netty

          Схема работы сервера Netty похожа на то, как работает Node.js. Есть Selector — входной поток, который принимает запросы от клиентов и отправляет их на выполнение в освободившиеся потоки. Если в качестве синхронного сервера (Servlet-контейнера) используется Tomcat, то в качестве асинхронного используется Netty.

          Давайте посмотрим, сколько вычислительных ресурсов расходуют Netty и Tomcat на выполнение одного запроса:

          CPU

          Throughput — это общее количество обработанных данных. При небольшой нагрузке, до первых 300 пользователей у RxNetty и Tomcat оно одинаковое, а после Netty уходит в приличный отрыв — почти в 2 фраза.

          Throughput

          Blocking vs Reactive

          У нас есть два стека обработки запросов:

          • Традиционный блокирующий стек.
          • Неблокирующий стек — в нем все происходит асинхронно и реактивно.

          Два стека обработки запросов

          В блокирующем стеке все строится на Servlet API, в реактивном неблокирующем стеке — на Netty.

          Сравним реактивный стек и стек Servlet.

          В Reactive Stack применяется технология Spring WebFlux. Например, вместо Servlet API используются реактивные стримы.

          Reactive Stack

          Чтобы мы получили ощутимое преимущество в производительности, весь стек должен быть реактивным. Поэтому чтение данных тоже должно происходить из реактивного источника.

          Например, если у нас используется стандартный JDBC, он является не реактивным блокирующим источником, потому что JDBC не поддерживает неблокирующий ввод/вывод. Когда мы отправляем запрос в базу данных, приходится ждать, пока результат этого запроса придет. Соответственно, получить преимущество не удается.

          В Reactive Stack мы получаем преимущество за счет реактивности. Netty работает с пользователем, Reactive Streams Adapters — со Spring WebFlux, а в конце находится реактивная база: то есть весь стек получается реактивным. Давайте посмотрим на него на схеме:

          Схема реактивного стека

          Data Repo — репозиторий, где хранятся данные. В случае, если есть запросы, допустим, от клиента или внешнего сервера, они через Flux поступают в контроллер, обрабатываются, добавляются в репозиторий, а потом ответ идет в обратную сторону.

          При этом все это делается неблокирующим способом: мы можем использовать либо Push-подход, когда мы определяем, что делать при каждой следующей операции, либо Pull-подход, если есть вероятность Backpressure, и мы хотим сами контролировать скорость обработки данных, а не получать все данные разом.

          Операторы

          В реактивных потоках огромное количество операторов. Многие из них похожи на те, которые есть в обычных стримах Java. Мы рассмотрим только несколько самых распространенных операторов, которые понадобятся нам для практического примера применения реактивности.

          Filter operator

          Скорее всего, вы уже знакомы с фильтрами из интерфейса Stream.

          Filter operator

          По синтаксису этот фильтр точно такой же, как обычный. Но если в стриме Java 8 все данные есть сразу, здесь они могут появляться постепенно. Стрелки вправо — это временная шкала, а в кружочках находятся появляющиеся данные. Мы видим, что фильтр оставляет в итоговом потоке только значения, превышающие 10.

          Take 2

          Take 2 означает, что нужно взять только первые два значения.

          Map operator

          Оператор Map тоже хорошо знаком:

          Map operator

          Это действие, происходящее с каждым значением. Здесь — умножить на десять: было 3, стало 30; было 2, стало 20 и т.д.

          Delay operator

          Delay operator

          Задержка: все операции сдвигаются. Этот оператор может понадобиться, когда значения уже генерируются, но подготовительные процессы еще происходят, поэтому приходится отложить обработку данных из потока.

          Reduce operator

          Еще один всем известный оператор:

          Reduce operator

          Он дожидается конца работы потока (onComplete) — на схеме она представлена вертикальной чертой. После чего мы получаем результат — здесь это число 15. Оператор reduce сложил все значения, которые были в потоке.

          Scan operator

          Этот оператор отличается от предыдущего тем, что не дожидается конца работы потока.

          Scan operator

          Оператор scan рассчитывает текущее значение нарастающим итогом: сначала был 1, потом прибавил к предыдущему значению 2, стало 3, потом прибавил 3, стало 6, еще 4, стало 10 и т.д. На выходе получили 15. Дальше мы видим вертикальную черту — onComplete. Но, может быть, его никогда не произойдет: некоторые потоки не завершаются. Например, у термометра или датчика дыма нет завершения, но scan поможет рассчитать текущее суммарное значение, а при некоторой комбинации операторов — текущее среднее значение всех данных в потоке.

          Merge operator

          Объединяет значения двух потоков.

          Merge operator

          Например, есть два температурных датчика в разных местах, а нам нужно обрабатывать их единообразно, в общем потоке .

          Combine latest

          Получив новое значение, комбинирует его с последним значением из предыдущего потока.

          Combine latest

          Если в потоке возникает новое событие, мы его комбинируем с последним полученным значением из другого потока. Скажем, таким образом мы можем комбинировать значения от датчика дыма и термометра: при появлении нового значения температуры в потоке temperatureStream оно будет комбинироваться с последним полученным значением задымленности из smokeStream. И мы будем получать пару значений. А уже по этой паре можно выполнить итоговый расчет:

          temperatureStream.combineLatest(smokeStream).map((x, y) -> x > X && y > Y)

          В итоге на выходе у нас получается поток значений true или false — включить или выключить колокольчик. Он будет пересчитываться каждый раз, когда будет появляться новое значение в temperatureStream или в smokeStream.

          FlatMap operator

          Этот оператор вам, скорее всего, знаком по стримам Java 8. Элементами потока в данном случае являются другие потоки. Получается поток потоков. Работать с ними неудобно, и в этих случаях нам может понадобиться «уплостить» поток.

          FlatMap operator

          Можно представить такой поток как конвейер, на который ставят коробки с запчастями. До того, как мы начнем их применять, запчасти нужно достать из коробок. Именно это делает оператор flatMap.

          Flatmap часто используется при обработке потока данных, полученных с сервера. Т.к. сервер возвращает поток, чтобы мы смогли обрабатывать отдельные данные, этот поток сначала надо «развернуть». Это и делает flatMap.

          Buffer operator

          Buffer operator

          Это оператор, который помогает группировать данные. На выходе Buffer получается поток, элементами которого являются списки (List в Java). Он может пригодиться, когда мы хотим отправлять данные не по одному, а порциями.

          Мы с самого начала говорили, что реактивные потоки позволяют разбить задачу на подзадачи, и обрабатывать их маленькими порциями. Но иногда лучше наоборот, собрать много маленьких частей в блоки. Скажем, продолжая пример с конвейером и запчастями, нам может понадобиться отправлять запчасти на другой завод (другой сервер). Но каждую отдельную запчасть отправлять неэффективно. Лучше их собрать в коробки, скажем по 100 штук, и отправлять более крупными партиями.

          На схеме выше мы группируем отдельные значения по три элемента (так как всего их было пять, получилась «коробка» из трех, а потом из двух значений). То есть если flatMap распаковывает данные из коробок, buffer, наоборот, упаковывает их.

          Всего существует более сотни операторов реактивного программирования. Здесь разобрана только небольшая часть.

          Итого

          Есть два подхода:

          Что объединяет два подхода

          • Spring MVC — традиционная модель, в которой используется JDBC, императивная логика и т.д.
          • Spring WebFlux, в котором используется реактивный подход и сервер Netty.

          Есть кое-что, что их объединяет. Tomcat, Jetty, Undertow могут работать и со Spring MVC, и со Spring WebFlux. Однако дефолтным сервером в Spring для работы с реактивным подходом является именно Netty.

          Заинтересовались темой?

          Новый практический online-курс Java Advanced: функциональное, асинхронное и реактивное программирование по изучению современных функциональных, асинхронных и реактивных подходов к разработке на Java. Включает изучение NIO2, CompletableFurure, RxJava, Reactor, R2DBC, SSE, Spring Data reactive, WebClient, reactive WebSocket, RSocket.

          Расскажи друзьям:
          Как не пропустить самое интересное?
          Подписывайтесь на наш ежемесячный дайджест!

          Оценка и обучение ИТ-специалистов по ключевым направлениям разработки программного обеспечения. Курсы от экспертов-практиков по языкам программирования, системному и бизнес-анализу, архитектуре ПО, ручному и автоматизированному тестированию ПО, Big Data и машинному обучению, управлению проектами и Agile. Действует скидка 10% на обучение физических лиц.

          Остались вопросы?
          IBS Training Center Контакты: +7 (495) 609-6967 education@ibs.ru Адрес:
          127018 , Москва , ул. Складочная, д. 3, стр. 1
          © 2024 IBS, all rights reserved
          Пользователь только что записался на курс » »

          Сайт IBS Training Center использует cookie. Это дает нам возможность следить за корректной работой сайта, а также анализировать данные, чтобы развивать наши продукты и сервисы. Посещая сайт, вы соглашаетесь с обработкой ваших персональных данных. В случае несогласия вам следует покинуть его

          Как создать приложение для потоковой обработки данных при помощи Apache Flink

          Среди рассматриваемых нами фреймворков для сложной обработки данных на Java есть и Apache Flink. Хотим предложить вам перевод неплохой статьи из блога Analytics Vidhya на портале Medium, чтобы оценить читательский интерес. Не стесняйтесь участвовать в голосовании!

          В этой статье мы разберем «снизу вверх», как организовать потоковую обработку при помощи Flink; в облачных сервисах и на других платформах предоставляются решения для потоковой обработки (в некоторых из них «под капотом» интегрирован Flink). Если вы хотели разобраться в этой теме с азов, то нашли как раз то, что искали.

          Наше монолитное решение не справлялось с возрастающими объемами входящих данных; следовательно, его требовалось развивать. Настало время перейти к новому поколению в эволюции нашего продукта. Было решено воспользоваться потоковой обработкой. Это новая парадигма поглощения данных, более выигрышная по сравнению с традиционной пакетной обработкой данных.

          Apache Flink: краткая характеристика

          Apache Flink – это фреймворк для масштабируемой распределенной обработки потоков, предназначенный для операций над непрерывными потоками данных. В рамках этого фреймворка используются такие концепции как источники, преобразования потоков, параллельная обработка, планирование, присваивание ресурсов. Поддерживаются разнообразные места назначения данных. В частности, Apache Flink может подключаться к HDFS, Kafka, Amazon Kinesis, RabbitMQ и Cassandra.

          Flink известен своей высокой пропускной способностью и малыми задержками, поддерживает согласованную строго однократную обработку (все данные обрабатываются по одному разу, без дублирования), а также высокую доступность. Как и любой другой успешный опенсорсный продукт, Flink обладает обширным сообществом, в котором культивируются и расширяются возможности этого фреймворка.

          Flink может обрабатывать потоки данных (размер потока является неопределенным) или множества данных (размер множества данных является определенным). В этой статье рассматривается именно обработка потоков (обращение с объектами DataStream ).

          Потоковая обработка и присущие ей вызовы

          В настоящее время, при повсеместной распространенности устройств «Интернета Вещей» и прочих сенсоров, данные непрерывно поступают из множества источников. Такой нескончаемый поток данных требует адаптировать к новым условиям традиционные пакетные вычисления.

          • Потоковые данные неограниченные; у них нет начала и конца.
          • Новые данные поступают в непредсказуемом режиме, с нерегулярными интервалами.
          • Данные могут поступать неупорядоченно, с различными временными метками.

          Apache Flink позволяет справиться с такими проблемами при обработке, поскольку ориентируется на метки времени, которыми входящие данные снабжаются еще в источнике. Во Flink есть механизм аккумулирования событий на основе временных меток, проставленных на них -–и только после аккумулирования система переходит к выполнению обработки. В таком случае удается обойтись без применения микропакетов, а также в данном случае повышается точность результатов.

          Flink реализует согласованную строго однократную обработку, что гарантирует точность вычислений, а разработчику не требуется для этого ничего специально программировать.

          Из чего состоят пакеты Flink

          Как правило, Flink поглощает потоки данных из разных источников. Базовый объект — DataStream , представляющий собой поток однотипных элементов. Тип элемента в таком потоке определяется во время компиляции путем установки обобщенного типа T (подробнее об этом можно почитать здесь).

          Объект DataStream содержит много полезных методов для преобразования, разделения и фильтрации данных. Для начала будет полезно иметь представление о том, что делают map , reduce и filter ; это основные преобразующие методы:

            Map : получает объект T и в результате возвращает объект типа R ; MapFunction строго однократно применяется с каждым элементом объекта DataStream .

          SingleOutputStreamOperator map(MapFunction mapper)
          T reduce(T value1, T value2)
          SingleOutputStreamOperator filter(FilterFunction filter)

          Сток данных

          Одна из основных целей Flink, наряду с преобразованием данных, заключается в управлении потоками и направлении их в те или иные места назначения. Эти места называются «стоками». В Flink есть встроенные стоки (текст, CSV, сокет), а также представляемые «из коробки» механизмы для подключения к иным системам, например, Apache Kafka.

          Метки событий Flink Event

          При обработке потоков данных исключительно важен фактор времени. Существует три способа определить временную метку:

            Время обработки (опция по умолчанию): это системное время машины, выполняющей операцию обработки потоков; следовательно, это самое простое определение времени. Оно не требует какой-либо координации между потоками и машинами. Поскольку данная концепция основана на машинном времени, она обеспечивает наилучшую производительность и минимальные задержки.

          Недостаток, возникающий при использовании времени обработки существенен в распределенных и асинхронных окружениях, поскольку это не детерминистический метод. Метки времени, которыми сопровождаются события потока, могут рассинхронизироваться, если между ходом машинных часов есть разница; сетевая задержка также может привести к запаздыванию во времени, когда событие ушло с одной машины и прибыло на другую.

          // Установка атрибута Processing Time для StreamExecutionEnvironment objectstreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

          Поскольку не сам Flink устанавливает метку времени, должен быть механизм, который просигнализирует, должно быть обработано это событие или нет; данный механизм называется «водяной знак» (watermark). Тема водяных знаков выходит за рамки данной статьи; подробнее об этом можно почитать в документации по Flink.

          // Определение Event Time как метода временной метки streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream dataStream = streamEnv.readFile(auditFormat, dataDir, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000). assignTimestampsAndWatermarks( new TimestampExtractor());// . еще код . // определение класса для извлечения временной метки из событий потока public class TimestampExtractor implements AssignerWithPeriodicWatermarks < @Override public Watermark getCurrentWatermark() < return new Watermark(System.currentTimeMillis()-maxTimeFrame); >@Override public long extractTimestamp(String str, long l) < return InputData.getDataObject(str).timestamp; >>
          // Установка атрибута Ingestion Time для StreamExecutionEnvironment objectstreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

          Подробнее о временных метках и о том, как они влияют на потоковую обработку, можно почитать по следующей ссылке.

          Разбивка на окна

          Поток по определению бесконечен; следовательно, механизм обработки связан с определением фрагментов (например, периодов-окон). Таким образом поток разбивается на партии, удобные для агрегации и анализа. Определение окна – это операция над объектом DataStream или каким-то другим, который его наследует.

          Есть несколько видов окон, зависящих от времени:

          Кувыркающееся окно (конфигурация по умолчанию):

          Поток делится на окна эквивалентного размера, которые не перекрываются друг с другом. Пока поток течет, Flink непрерывно производит вычисления над данными на основе такой фиксированной во времени раскадровки.

          Кувыркающееся окно

          Реализация в коде:

          // будет использоваться с потоком, не снабженным ключами public AllWindowedStream timeWindowAll(Time size) // Кувыркающееся окно для потока, снабженного ключами public WindowedStream timeWindow(Time size)

          Скользящее окно

          Такие окна могут перекрываться друг с другом, а свойства скользящего окна определяются размером данного окна и отступом (когда начинать следующее окно). В таком случае в конкретный момент времени могут обрабатываться события, относящиеся более чем к одному окну.

          Скользящее окно

          А вот как оно выглядит в коде:

          // скользящее окно длиной 1 минуту и с интервалом срабатывания 30 секунд dataStreamObject.timeWindow(Time.minutes(1), Time.seconds(30))

          Сеансовое окно

          Включает все события, ограниченные рамками одного сеанса. Сеанс завершается при отсутствии активности, или если по истечении определенного временного периода не зафиксировано никаких событий. Данный период может быть фиксированным или динамическим, в зависимости от обрабатываемых событий. Теоретически, если промежуток между сеансами меньше размера окна, то сеанс может никогда не закончиться.

          Сеансовое окно

          В первом фрагменте кода ниже показан сеанс с фиксированной временной величиной (2 секунды). Второй пример реализует динамическое сеансовое окно, на основе событий потока.

          // Определение фиксированного сеансового окна длительностью 2 секунды dataStreamObject.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2))) // Определение динамического сеансового окна, которое может быть задано элементами потока dataStreamObject.window(EventTimeSessionWindows.withDynamicGap((elem) -> < // возвращается промежуток между сеансами, который может зависеть от событий потока >))

          Глобальное окно

          Вся система трактуется как единственное окно.

          Глобальное окно

          Flink также позволяет реализовывать собственные окна, логику которых определяет пользователь.

          Кроме окон, зависящих от времени, есть и другие, например, Окно счета, где устанавливается предельное количество входящих событий; по достижении порога X, Flink обрабатывает X событий.

          Окно счета для трех событий

          После теоретического введения давайте подробнее обсудим, что представляет собой поток данных с практической точки зрения. Подробнее об Apache Flink и потоковых процессах рассказано на официальном сайте.

          Описание потока

          В качестве резюме теоретической части на следующей блок-схеме показаны основные потоки данных, реализованные в сниппетах кода из этой статьи. Поток, изображенный ниже, начинается из источника (файлы записываются в каталог) и продолжается при обработке событий, превращаемых в объекты.

          Реализация, изображенная ниже, состоит из двух путей обработки. Тот, что показан в верхней части разделяет один поток на два боковых потока, а затем объединяет их, получая поток третьего типа. Сценарий, изображенный в нижней части схемы, описывает обработку потока, после которой результаты работы передаются в сток.

          Далее попытаемся пощупать руками практическую реализацию вышеизложенной теории; весь исходный код, рассматриваемый далее, выложен на GitHub.

          Базовая обработка потоков (пример #1)

          Усвоить концепции Flink будет проще, если начать с простейшего приложения. В этом приложении продьюсер записывает файлы в каталог, таким образом имитируется поток информации. Flink считывает файлы из этого каталога и записывает обобщенную информацию по ним в каталог назначения; это и есть сток.

          Далее давайте внимательно посмотрим, что происходит при обработке:

          Преобразование сырых данных в объект:

          // Каждая запись преобразуется в объект InputData; каждая новая строка считается новой записью DataStream inputDataObjectStream = dataStream .map((MapFunction) inputStr -> < System.out.println("--- Received Record : " + inputStr); return InputData.getDataObject(inputStr); >);

          В приведенном ниже фрагменте кода потоковый объект ( InputData ) преобразуется в кортеж строки и целого числа. Он извлекает лишь определенные поля из потока объектов, группируя их по одному полю квантами по две секунды.

           // Каждая запись преобразуется в кортеж с именем и счетом DataStream> userCounts = inputDataObjectStream .map(new MapFunction>() < @Override public Tuple2map(InputData item) < return new Tuple2(item.getName() ,item.getScore() ); > >) .returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(0) // возвращает KeyedStream на основе первого элемента (поля 'name') //.timeWindowAll(Time.seconds(windowInterval)) // НЕ ИСПОЛЬЗОВАТЬ timeWindowAll с потоком на основе ключей .timeWindow(Time.seconds(2)) // вернуть WindowedStream .reduce((x,y) -> new Tuple2( x.f0+"-"+y.f0, x.f1+y.f1));

          Создание точки назначения для потока (реализация стока данных):

           // Определить временное окно и подсчитать количество записей DataStream> inputCountSummary = inputDataObjectStream .map( item -> new Tuple2 (String.valueOf(System.currentTimeMillis()),1)) // для каждого элемента вернуть кортеж из временной метки и целого числа (1) .returns(Types.TUPLE(Types.STRING ,Types.INT)) .timeWindowAll(Time.seconds(windowInterval)) // кувыркающееся окно .reduce((x,y) -> // суммируем числа, и так до достижения единого результата (new Tuple2(x.f0, x.f1 + y.f1))); // Задаем в качестве стока для потокового файла каталог вывода final StreamingFileSink> countSink = StreamingFileSink .forRowFormat(new Path(outputDir), new SimpleStringEncoder> ("UTF-8")) .build(); // Добавляем поток стока к DataStream; при таком условии inputCountSummary будет вписан в путь countSink inputCountSummary.addSink(countSink); 

          Образец кода, описывающего создание стока данных.

          Расщепление потоков (пример #2)

          В данном примере демонстрируется, как разделить основной поток, используя боковые потоки вывода. Flink обеспечивает создание множества боковых потоков из главного DataStream . Тип данных, располагающихся с каждой стороны потока, может отличаться от типа данных основного потока, равно как и от типа данных каждого из боковых потоков.

          Итак, используя боковой поток вывода, можно убить двух зайцев одним ударом: расщепить поток и преобразовать тип данных потока во множество типов данных (они могут быть уникальны для каждого бокового потока вывода).

          В приведенном ниже фрагменте кода вызывается ProcessFunction , разделяющий поток на два боковых, в зависимости от свойства ввода. Для получения того же результата нам пришлось бы неоднократно использовать функцию filter .

          Функция ProcessFunction собирает определенные объекты (на основе критерия) и отправляет в главный выводной коллектор (заключается в SingleOutputStreamOperator ), а остальные события передаются в боковые выводы. Поток DataStream разделяется по вертикали и публикует различные форматы для каждого бокового потока.

          Обратите внимание: определение бокового потока вывода основано на уникальном теге вывода (объект OutputTag ).

           // Определить отдельный поток для Исполнителей final OutputTag> playerTag = new OutputTag>("player")<>; // Определить отдельный поток для Певцов final OutputTag> singerTag = new OutputTag>("singer")<>; // Преобразовать каждую запись в объект InputData и разделить главный поток на два боковых. SingleOutputStreamOperator inputDataMain = inputStream .process(new ProcessFunction() < @Override public void processElement( String inputStr, Context ctx, CollectorcollInputData) < Utils.print(Utils.COLOR_CYAN, "Received record : " + inputStr); // Преобразовать строку в объект InputData InputData inputData = InputData.getDataObject(inputStr); switch (inputData.getType()) < case "Singer": // Создать выходной кортеж со значениями имени и счета ctx.output(singerTag, new Tuple2(inputData.getName(), inputData.getScore())); break; case "Player": // Создать выходной кортеж со значениями имени и типа; // Если новоиспеченный кортеж не совпадает с типом playerTag, то выбрасывается ошибка компиляции ("вывод метода не может быть применен к указанным типам") ctx.output(playerTag, new Tuple2 (inputData.getName(), inputData.getType())); break; default: // Собрать вывод основного потока как объекты InputData collInputData.collect(inputData); break; > > >);

          Пример кода, демонстрирующий, как разделить поток

          Объединение потоков (пример #3)

          Последняя операция, которая будет рассмотрена в этой статье – объединение потоков. Идея заключается в том, чтобы скомбинировать два разных потока, форматы данных в которых могут отличаться, из которых собрать один поток с унифицированной структурой данных. В отличие от операции объединения из SQL, где слияние данных происходит по горизонтали, объединение потоков осуществляется по вертикали, поскольку поток событий продолжается и никак не ограничен во времени.

          Объединение потоков выполняется путем вызова метода connect, после чего для каждого элемента в каждом отдельном потоке определяется операция отображения. В результате получается объединенный поток.

          // В описании возвращенного потока учтены типы данных обоих потоков ConnectedStreams, Tuple2> mergedStream = singerStream .connect(playerStream); DataStream> combinedStream = mergedStream.map(new CoMapFunction< Tuple2, // Поток 1 Tuple2, // Поток 2 Tuple4 //Вывод >() < @Override public Tuple4//Обработка потока 1 map1(Tuple2 singer) throws Exception < return new Tuple4("Source: singer stream", singer.f0, "", singer.f1); > @Override public Tuple4 // Обработка потока 2 map2(Tuple2 player) throws Exception < return new Tuple4("Source: player stream", player.f0, player.f1, 0); > >);

          Листинг, демонстрирующий получение объединенного потока

          Создание рабочего проекта

          Итак, резюмируем: демо-проект загружен на GitHub. Там описано, как его собрать и скомпилировать. Это хорошая отправная точка, чтобы поупражняться с Flink.

          Выводы

          В этой статье описаны основные операции, позволяющие создать рабочее приложение для обработки потоков на основе Flink. Цель приложения – дать общее представление о важнейших вызовах, присущих потоковой обработке, и заложить базис для последующего создания полнофункционального приложения Flink.

          Поскольку у потоковой обработки множество аспектов, и она сопряжена с разными сложностями, многие вопросы в этой статье остались не раскрыты; в частности, выполнение Flink и управление задачами, использование водяных знаков при установке времени для потоковых событий, подсадка состояния в события потока, выполнение итераций потока, выполнение SQL-подобных запросов к потокам и многое другое.

          Надеемся, этой статьи было достаточно, чтобы вам захотелось попробовать Flink.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *