Поддерживают ли Play WebSockets обратное давление?

Мне нужно добавить прокси-сервер WebSocket-to-TCP в мое приложение Play 2.3, но, хотя исходящее TCP-соединение с использованием Akka I/O поддерживает обратное давление, я ничего не вижу для WebSocket. API на основе акторов явно не поддерживается, но Джеймс Ропер говорит:

Итерации обрабатывают это по дизайну, вы не можете передать новый элемент в итерацию до тех пор, пока последнее будущее, которое он возвращает, не будет погашено, потому что до этого у вас нет ссылки на него.

Однако я не понимаю, о чем он говорит. Iteratee.foreach, используемый в примерах, кажется слишком простым. Единственное будущее, которое я вижу в итерируемом API, — это завершение результата вычисления. Должен ли я заполнять Future[Unit] для каждого сообщения или как?


person Isvara    schedule 06.01.2015    source источник


Ответы (2)


Iteratee.foldM позволяет передавать состояние каждому шагу, подобно обычной операции сворачивания, и возвращать будущее. Если у вас нет такого состояния, вы можете просто передать Unit, и оно будет вести себя как foreach, который не примет следующий шаг, пока не завершится будущее.

Вот пример функции полезности, которая делает именно это:

 def foreachM[E](f: E => Future[Unit])(implicit ec: ExecutionContext): Iteratee[E, Unit] = 
   Iteratee.foldM[E, Unit](Unit)((_, e) => f(e))
person johanandren    schedule 20.01.2015

Iteratee не то же самое, что Iterator. Iteratee действительно по своей сути поддерживает обратное давление (на самом деле вы столкнетесь с противоположной проблемой - по умолчанию они не выполняют никакой буферизации (по крайней мере, внутри конвейера - конечно, асинхронные сокеты все еще имеют приемные буферы), поэтому вы иногда приходится добавлять явный шаг буферизации в конвейер перечислителя/итерируемого, чтобы получить разумную производительность). Примеры выглядят простыми, но это просто означает, что фреймворк делает то, что делает фреймворк, и упрощает задачу. Если вы выполняете значительный объем работы или выполняете асинхронные вызовы в своих обработчиках, вам не следует использовать простой Iteratee.foreach, а вместо этого использовать API, который принимает обработчик на основе Future; если вы блокируете внутри Iteratee, то вы блокируете все это, тратите впустую свои потоки и теряете смысл их использовать вообще.

person lmm    schedule 06.01.2015
comment
Iteratee — это не то же самое, что Iterator — это опечатка. Отредактировано. - person Isvara; 06.01.2015
comment
используйте API, который принимает обработчик на основе Future - есть ли он? Можете ли вы привести пример? - person Isvara; 06.01.2015
comment
Вы можете сделать это вручную, возвращая Future[Iteratee[...]] на каждом шаге (используя явный Cont, а не Iteratee.foreach) и используя Iteratee.flatten. Я недостаточно хорошо знаю игровой API, чтобы знать, есть ли более приятный вариант (я привык к scalaz iteratees), но, черт возьми, вы могли бы написать его, если потребуется. - person lmm; 06.01.2015
comment
Iteratee.foldM позволяет передавать состояние каждому шагу, подобно обычной операции сворачивания, и возвращать будущее. Если у вас нет такого состояния, вы можете просто передать Unit, и он будет вести себя как foreach, который не примет следующий шаг, пока не завершится будущее. - person johanandren; 07.01.2015
comment
@johanandren У меня это работает сейчас. Если вы поставите это как ответ, я приму это. - person Isvara; 20.01.2015
comment
@johanandren Вы знаете, как это работает в другом направлении? Если я нажму на канал Enumerator, как я узнаю, что данные были обработаны? - person Isvara; 20.01.2015
comment
stackoverflow.com/questions/28041042/ - person Isvara; 20.01.2015