Синхронизируйте два асинхронных вызова API с RxJava

Как мы можем синхронизировать два асинхронных вызова с помощью RxJava? В приведенном ниже примере метод contentService.listContents, который является вызовом API, должен сначала завершиться перед выполнением метода processSchema для каждой схемы.

schemaService.listSchema()
    .toObservable()
    .flatMapIterable(schemas -> {
        schemas.forEach(schema -> {
            // async call
            contentService.listContents(schema.getName()).subscribe(contents -> {
                   doSomethingWithThe(contents); 
            });
        });
        // contentService.listContents` must complete first before 
        // processSchema should be called for each schema
        return schemas;
    }).subscribe(schema -> { processSchema(schema); }, 
                 error -> { Console.error(error.getMessage()); });

Проблема с кодом над processSchema не будет ждать contentService.listContents, так как он асинхронный и не синхронизируется друг с другом.


person quarks    schedule 21.01.2020    source источник


Ответы (2)


Вы должны использовать flatMap для обработки schemas и, поскольку это список, вам нужно развернуть его и flatMap снова:

schemaService.listSchema()
.toObservable()
.flatMap(schemas -> 
     Observable.fromIterable(schemas)
     .flatMap(schema -> 
         contentService.listContents(schema.getName())
         .doOnNext(contents -> doSomethingWith(contents))
     )
     // probably you don't care about the inner contents
     .ignoreElements()
     // andThen will switch to this only when the sequence above completes
     .andThen(Observable.just(schemas))
)
.subscribe(
    schema -> processSchema(schema), 
    error -> Console.error(error.getMessage())
);

Обратите внимание, что вы не определили возвращаемые типы вызовов службы, поэтому вам, возможно, придется использовать, например, flatMapSingle и doOnSuccess.

person akarnokd    schedule 22.01.2020

Вероятно, вы ищете flatMap.

Из docs

Продолжение

Иногда, когда элемент становится доступным, нужно выполнить над ним некоторые зависимые вычисления. Это иногда называют продолжениями, и, в зависимости от того, что должно произойти и какие типы задействованы, для выполнения могут потребоваться различные операторы.

Зависимый

Наиболее типичным сценарием является присвоение значения, вызов другой службы, ожидание и продолжение с результатом:

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

Также часто случается так, что более поздние последовательности требуют значений из более ранних отображений. Этого можно добиться, переместив внешнюю плоскую карту во внутренние части предыдущей плоской карты, например:

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)
person Abdullah Khan    schedule 21.01.2020
comment
Обратите внимание, что значение в моем случае является Iterable, поэтому ваш код не показывает, что - person quarks; 21.01.2020