Rxjava2 и Retrofit2 Множественные/параллельные/одновременные вызовы api(post) в разных потоках

У меня есть этот код ниже, где я пытаюсь выполнить несколько последовательных вызовов API, используя Retrofit2 + RxJava2

@Override
        public void onClick(View v) {

            count++;
            request.setName("rober");
            request.setVarryingValue(count);

            mApiService.apiService()
                    .getAccessToken(<params>)
                    .subscribeOn(Schedulers.newThread())
                    .flatMap(new Function<Auth, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Auth authentication) throws Exception {

                            Observable<Void> postObservable = mApiService.apiService().postCall(request, authentication.getAuth())
                                    .subscribeOn(Schedulers.io());
                            postObservable.subscribe(new Observer<Void>() {

                                @Override
                                public void onSubscribe(Disposable d) {}

                                @Override
                                public void onNext(Void value) {
                                    Log.e("Thread", " Thread : " + Thread.currentThread());
                                }

                                @Override
                                public void onError(Throwable e) {
                                    e.printStackTrace();
                                }

                                @Override
                                public void onComplete() {}
                            });

                            return postObservable;
                        }
                    }).subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {}

                @Override
                public void onNext(Object value) {}

                @Override
                public void onError(Throwable e) {}

                @Override
                public void onComplete() {}
            });
        }
    });

я ожидаю, что для каждого щелчка отправьте вызов в новый/другой поток для выполнения определенного вызова API POST, но у меня есть эти значения сообщения из внутреннего вызова API

{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte 
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST 
http://mydomain.test.post.server http/1.1

{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte 
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST 
http://mydomain.test.post.server http/1.1


{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte 
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST 
http://mydomain.test.post.server http/1.1


{"name ":"rober"," some_varrying_number ":"4"}
04-03 07:23:08.319 27225-27378/edu.rx.test D/OkHttp: --> END POST (1679-byte 
body)
04-03 07:23:08.322 27225-27380/edu.rx.test D/OkHttp: --> POST 
http://mydomain.test.post.server http/1.1

похоже, что он выполняет только последний вызов, я узнал, что в Rx нет точно «параллельного» вызова, потому что если это произойдет, это нарушит все реактивные принципы, но они говорят, что есть много обходных путей для этого, и теперь Я пытаюсь выполнить этот «параллельный» вызов с кодами, которые я разместил, но не повезло :(, мне нужна помощь здесь,

Любая помощь будет принята с благодарностью. Спасибо!

РЕДАКТИРОВАТЬ: процесс будет таким: 1. сначала получить токен аутентификации 2. после успешного токена аутентификации перейти к вызовам API POST

число 2 зависит от числа 1

1 и 2 всегда будут выполняться по событиям щелчка.

Изменить: я опубликую изображение, чтобы наглядно проиллюстрировать его.

  • У меня есть 2 источника (это может быть 3 или больше)
  • Эти источники поступают из событий кликов
  • Каждый источник будет обрабатывать разные сетевые вызовы
  • Я пытаюсь выполнить его в другом потоке (чтобы сделать его асинхронным)
  • каждый будет публиковать разные значения на сервере введите здесь описание изображения

  • но похоже, что он обрабатывает только последнее событие (щелкнул)

  • используя изображение выше, я ожидаю, что POST A и POST B будут происходить параллельно/по-разному
  • но он выполняет только POST B (фактически 2 POST B)
  • я ожидаю, что он будет выполнять POST A и POST B по-разному

person Robert    schedule 03.04.2017    source источник
comment
Является ли «запрос» полем в классе, которое используется для каждого onClick? в чем проблема с потоком, если есть? У вас в журнале 4 звонка, не так ли? с теми же параметрами в звонилке, что у тебя проблемы?   -  person yosriz    schedule 03.04.2017
comment
я не совсем уверен, что это проблема с потоком, я ожидаю, что счетчик будет увеличиваться, а some_varrying_number будет иметь значение из счетчика (some_varrying_number также будет увеличиваться), some_varrying_number: 1 some_varrying_number: 2 some_varrying_number: 3 some_varrying_number: 4 , но похоже, что some_varrying_number : 4 — единственное значение, которое выполняется   -  person Robert    schedule 04.04.2017


Ответы (3)


В то время, когда вы отправляете самый первый запрос POST, переменное значение request уже равно 4. Это потому, что после того, как вы щелкнете 4 раза, первый запрос POST примет request в то время (с 4), а не request в то время устанавливается его изменяющееся значение.

Решение заключается в том, чтобы сделать объект request локальным внутри метода onClick().

public void onClick(View v) {
    RequestClass request = new RequestClass();
    request.setName("rober");
    request.setVarryingValue(count);

    //Your code
}

Или присвойте значение count временной переменной и установите его в request прямо перед отправкой POST-запроса, но следите за потокобезопасностью.

public void onClick(View v) {
    //Saving the value
    int temp = count++;
    request.setName("rober");

    mApiService.apiService()
            .getAccessToken(<params>)
            .subscribeOn(Schedulers.newThread())
            .flatMap(new Function<Auth, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Auth authentication) throws Exception {
                    //Set it to the request
                    //BE CAREFUL  because the `request` object is now being accessed from multiple threads
                    request.setVarryingValue(temp);

                    Observable<Void> postObservable = 
                        mApiService
                            .apiService()
                            .postCall(request, authentication.getAuth())
                            .subscribeOn(Schedulers.io())
                            .subscribe();

                    return postObservable;
                }
            })
            .subscribe();
}
person Lê Thái Phúc Quang    schedule 04.04.2017
comment
Я хочу поддержать как йосриз, так и ваш ответ как принятый, мне удается сделать то, что я хочу (это решило это), сделав поле локальным, а не полем, общим для класса, но, пожалуйста, не могли бы вы объяснить, почему ? Я знаю, что это как-то связано с потоками, или вы можете дать мне ссылку, связанную с моей проблемой и решением, которое вы дали? там по крайней мере я могу понять разницу между полем и локалом, когда дело доходит до rx - person Robert; 05.04.2017
comment
Изменить: я сделал запрос локальным вместо поля, а не переменной count, как вы правы, здесь могут возникнуть условия гонки, и я не хочу вводить синхронизацию - person Robert; 05.04.2017
comment
Если вы используете request в качестве поля, все вызовы onClick будут обращаться к одному и тому же экземпляру RequestClass. В вашей ситуации, когда один из четырех запросов GET возвращает ПОСЛЕ 4 onClick выполнения, вы отправляете первый запрос POST с общим экземпляром RequestClass, его переменное значение равно 4 после четвертого onClick. В локальном случае все четыре выполнения onClick будут использовать четыре разных экземпляра RequestCall, поэтому ни один onClick не сможет редактировать значение других. - person Lê Thái Phúc Quang; 05.04.2017
comment
Это совершенно не связано с потоками, так как все четыре onClick выполняются в основном потоке, но это разница между локальной переменной и полем. Попробуйте несколько руководств по ООП по этой теме, которые помогут вам прояснить ситуацию. - person Lê Thái Phúc Quang; 05.04.2017
comment
Попробуйте несколько руководств по ООП по этой теме, которые помогут вам прояснить ситуацию - да, я думаю, мне нужно немного освежить в памяти основы, Изменить: я пошел в комнату с белой доской и нарисовал все, что вы сказали, да, вы правы, это не параллелизм , я просто пропустил фундаментальные характеристики java, при 4-м запросе GET значение уже увеличено для объекта запроса, теперь, когда GET ответил, он будет использовать последнее значение объекта запроса, равное 4, большое спасибо, действительно Благодарность - person Robert; 05.04.2017
comment
Я должен был немного отступить, я забыл основы, слишком много думая о вещах Reactive/Concurrency. - person Robert; 05.04.2017
comment
Я рад, что вы сделали. Удачного программирования! - person Lê Thái Phúc Quang; 05.04.2017

Мне не совсем понятно, почему все почтовые запросы выполняются одновременно, полный журнал с onClick() временем и getAccessToken() запросами, и onNext() выбросами может помочь.

Кажется, ваш код делает это правильно, вы будете открывать новый поток для каждого запроса.
Но, так или иначе, поскольку у вас есть 4 отпечатка в вашем журнале, но все они имеют одинаковое количество, проблема, вероятно, связана с параметром request является полем и, таким образом, используется совместно и доступно после getAccessToken() создания элемента, таким образом, вы в основном выполняете 4 запроса, но с одними и теми же данными.
Вы должны назначить для каждого нового созданного flatMap() выделенную переменную счетчика.

Кроме того, неправильно подписываться на postObservable внутри оператора flatMap(), вы должны просто вернуть его, и поток подпишется на него и объединит его обратно в виде onNext() выбросов.
С вашим кодом вы выполняете каждый пост-запрос < strong>дважды, один явно вами, а другой оператором flatMap() (это также может объяснить наличие нескольких журналов одного и того же запроса, но опять же трудно сказать, не видя всех данных в журнале).

person yosriz    schedule 04.04.2017
comment
спасибо вам обоим за разъяснение проблемы, что я должен сделать значение локальным для метода, а не для поля, не могли бы вы уточнить это, неправильно подписываться на postObservable внутри оператора flatMap(), вы должны просто вернуть его и поток подпишется на него и объединит его обратно как выбросы onNext(). Как я могу связать вызов без использования/подписки другого наблюдателя внутри плоской карты? - person Robert; 05.04.2017
comment
Я предлагаю вам прочитать немного больше о flatMap, это именно то, что делает flatMap(), он создаст Observable из каждого значения onNext (в соответствии с вашей функцией сопоставления), затем он подпишется на него и объединит эмиссию из этого нового Observable в исходный поток. - person yosriz; 05.04.2017

Основываясь на обоих предложенных вами ответах, я придумал следующий код:

  • Запрос стал локальной переменной вместо поля
  • Удаление подписки внутри плоской карты
  • Привязал его к onNext внешнего наблюдаемого

    @Override
    public void onClick(View v) {
    
    count++;
    Request request = new Request();
    request.setName("rober");
    request.setVarryingValue(count);
    
    mApiService.apiService()
            .getAccessToken(<params>)             
            .subscribeOn(Schedulers.io())
            .flatMap(new Function<Auth, ObservableSource<Void>>() {
                @Override
                public ObservableSource<Void> apply(Auth authentication) throws Exception {
                    return mApiService.apiService().postCall(request, authentication.getAuth());
                }
            })  
            .subscribe(new Observer<ObservableSource<Void>>() {
    
                @Override
                public void onSubscribe(Disposable d) {}
    
                @Override
                public void onNext(ObservableSource<Void> sourceFromFlatMap) {
    
                    sourceFromFlatMap.subscribe(new Observer<Void>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {}
    
                        @Override
                        public void onNext(Void value) {
                        }
    
                        @Override
                        public void onError(Throwable e) {}
    
                        @Override
                        public void onComplete() {}
                    });
                }
    
                @Override
                public void onError(Throwable e) {}
    
                @Override
                public void onComplete() {}
            });
     });
    

хотя я до сих пор не уверен, правильно ли я использую Rx

person Robert    schedule 05.04.2017