Как оказать обратное давление на ActorPublisher

Я пишу несколько примеров, чтобы понять потоки akka и обратное давление. Я пытаюсь понять, как медленный потребитель воздействует на AkkaPublisher.

Мой код выглядит следующим образом.

class DataPublisher extends ActorPublisher[Int] {

  import akka.stream.actor.ActorPublisherMessage._

  var items: List[Int] = List.empty

  def receive = {
    case s: String =>
      println(s"Producer buffer size ${items.size}")
      if (totalDemand == 0)
        items = items :+ s.toInt
      else
        onNext(s.toInt)

    case Request(demand) =>
      if (demand > items.size) {
        items foreach (onNext)
        items = List.empty
      }
      else {
        val (send, keep) = items.splitAt(demand.toInt)
        items = keep
        send foreach (onNext)
      }


    case other =>
      println(s"got other $other")
  }
}

а также

Source.fromPublisher(ActorPublisher[Int](dataPublisherRef)).runWith(sink)

Где приемник — это подписчик со сном, чтобы подражать медленному потребителю. И издатель продолжает выпускать данные, несмотря ни на что.

--EDIT-- Мой вопрос заключается в том, что когда требование равно 0, данные программно буферизуются. Как я могу использовать обратное давление, чтобы замедлить работу издателя

Что-то типа

throttledSource().buffer(10, OverflowStrategy.backpressure).runWith(throttledSink())

Это не повлияет на издателя, и его буфер продолжает работать.

Спасибо, Саджит


person Sajith Silva    schedule 08.01.2018    source источник


Ответы (2)


Не используйте ActorPublisher

Во-первых, не используйте ActorPublisher — это очень низкоуровневый и устаревший API. Мы решили отказаться от поддержки, так как пользователи не должны работать на таком низком уровне абстракции в Akka Streams.

Одна из сложных вещей заключается именно в том, о чем вы спрашиваете: управление противодавлением полностью находится в руках разработчика, пишущего ActorPublisher, если они используют этот API. Таким образом, вы должны получать сообщения Request(n) и следить за тем, чтобы вы никогда не сигнализировали больше элементов, чем получили запросы. Это поведение указано в Спецификации реактивных потоков, которую затем необходимо реализовать. правильно. По сути, вы сталкиваетесь со всеми сложностями Reactive Streams (это полная спецификация со многими крайними случаями - отказ от ответственности: я был / являюсь частью разработки Reactive Streams, а также Akka Streams).

Демонстрация того, как противодавление проявляется в GraphStage

Во-вторых, для создания пользовательских этапов вы должны использовать разработанный для этого API: GraphStage. Обратите внимание, что такой этап также является довольно низким уровнем. Обычно пользователям Akka Streams не нужно писать собственные этапы, однако совершенно ожидаемо и нормально писать собственные этапы, если они будут реализовывать некоторую логику, которую не предоставляют встроенные этапы.

Вот упрощенная реализация фильтра из кодовой базы Akka:


case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
  override def initialAttributes: Attributes = DefaultAttributes.filter

  override def toString: String = "Filter"

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler with InHandler {

    override def onPush(): Unit = {
      val elem = grab(in)
      if (p(elem)) push(out, elem)
      else pull(in)
    }

    // this method will NOT be called, if the downstream has not signalled enough demand!
    // this method being NOT called is how back-pressure manifests in stages
    override def onPull(): Unit = pull(in)

    setHandlers(in, out, this)
  }
}

Как видите, вместо самостоятельной реализации всей логики и правил Reactive Streams (что сложно) вы получаете простые обратные вызовы, такие как onPush и onPull. Akka Streams обрабатывает управление спросом и автоматически вызывает onPull, если нисходящий поток сигнализирует о спросе, и НЕ будет вызывать его, если запроса нет, что будет означать, что нисходящий поток оказывает противодавление на этом этапе.

person Konrad 'ktoso' Malawski    schedule 10.01.2018
comment
Спасибо, Конрад, ваше объяснение проясняет множество проблем, которые у меня были с ActorPublisher. Я бы еще немного проверил этапы графа. - person Sajith Silva; 12.01.2018

Этого можно добиться с помощью промежуточного Flow.buffer:

val flowBuffer = Flow[Int].buffer(10, OverflowStrategy.dropHead)

Source
  .fromPublisher(ActorPublisher[Int](dataPublisherRef))
  .via(flowBuffer)
  .runWith(sink)
person Ramón J Romero y Vigil    schedule 08.01.2018
comment
Спасибо, это работает. Но потом я понимаю, что меня беспокоит не буферизация, а противодавление. Потому что даже если я буферизирую поток, буфер издателя продолжает расти. Чтобы избежать этого, мне нужно оказать обратное давление на издателя, чтобы он замедлился. Как я это сделал. Я обновил вопрос. Извините, что ввел вас в заблуждение - person Sajith Silva; 09.01.2018
comment
Это полезно, хотя и не ограничивает почтовый ящик самого издателя актора (т. е. dataPublisherRef). - person Arunav Sanyal; 15.01.2019