Я пишу несколько примеров, чтобы понять потоки akka и обратное давление. Я пытаюсь понять, как медленный потребитель воздействует на AkkaPublisher.
Мой код выглядит следующим образом.
class DataPublisher extends ActorPublisher[Int] {
import akka.stream.actor.ActorPublisherMessage._
var items: List[Int] = List.empty
def receive = {
case s: String =>
println(s"Producer buffer size ${items.size}")
if (totalDemand == 0)
items = items :+ s.toInt
else
onNext(s.toInt)
case Request(demand) =>
if (demand > items.size) {
items foreach (onNext)
items = List.empty
}
else {
val (send, keep) = items.splitAt(demand.toInt)
items = keep
send foreach (onNext)
}
case other =>
println(s"got other $other")
}
}
а также
Source.fromPublisher(ActorPublisher[Int](dataPublisherRef)).runWith(sink)
Где приемник — это подписчик со сном, чтобы подражать медленному потребителю. И издатель продолжает выпускать данные, несмотря ни на что.
--EDIT-- Мой вопрос заключается в том, что когда требование равно 0, данные программно буферизуются. Как я могу использовать обратное давление, чтобы замедлить работу издателя
Что-то типа
throttledSource().buffer(10, OverflowStrategy.backpressure).runWith(throttledSink())
Это не повлияет на издателя, и его буфер продолжает работать.
Спасибо, Саджит