Повторное подключение WebSocket с общим наблюдаемым RxJS

У меня есть такая наблюдаемая:

const records$ =
    Rx.DOM.fromWebSocket('ws://192.168.2.4:9001/feed/', null)
    .map(ev => parseRecord(ev.data))
    .share();

У меня много подписчиков. При потере связи все абоненты отписываются:

let records$Subscription;
records$Subscription = records$.subscribe(
    record => { ... },
    error => records$Subscription.dispose()
);

Я убедился, что вызов dispose действительно выполняется один раз для каждой подписки. Таким образом, счетчик ссылок share достиг нуля.

Однако, когда я снова подписываюсь на records$, новое соединение WebSocket не устанавливается. Однако когда я удаляю вызов share, он удаляется. Почему это не работает должным образом с share?


person rightfold    schedule 17.04.2016    source источник
comment
какую версию Rxjs вы используете?   -  person user3743222    schedule 17.04.2016
comment
@ user3743222 RxJS v4.1.0 и RxJS-DOM v7.0.3   -  person rightfold    schedule 17.04.2016


Ответы (1)


Я верю в rxjs v5, share позволяет вам повторно подключаться, но не в Rxjs v4.

В Rxjs 4 share в основном multicast.refCount, и после того, как тема, используемая для многоадресной рассылки, завершена, ее нельзя использовать повторно (согласно правилам грамматики Rxjs, посмотрите Какова семантика различных тем RxJS? тоже), что приводит к поведению, которое вы наблюдали.

В Rxjs 5 используется фабрика тем (что-то вроде multicast(() => new Rx.Suject().refCount())), поэтому при необходимости тема создается заново.

См. проблемы здесь и здесь для более подробной информации.

Короче говоря, если вам не подходит текущее поведение, вы можете переключиться на версию 5 (обратите внимание, что она все еще находится в стадии бета-тестирования, и есть некоторые критические изменения).

person user3743222    schedule 17.04.2016