Как создать записи в базе данных с инкрементным идентификатором в реактивном программировании?

Я новичок в реактивном программировании. Нужна помощь, чтобы понять поведение.

Чего я хочу достичь?

Я хочу создать 50 записей, идентификатор которых должен быть в возрастающем порядке. Если запись с идентификатором 1 присутствует в db, она должна создать запись с идентификатором 2.

Моя текущая реализация выглядит следующим образом:

// create entry 50 times 
 void createEntries() {
        LOGGER.info("going to create 50 entries);
        Flux.range(1, 50)
            .flatMap(i -> createEntry(5))
            .subscribe();
    }

//method to create an entry in db with incremental id  
private Mono<Integer> createEntry(long retryInterval) {
    return (customRepository.findAllEntry()) //-->A db call which returns all entries flux<Entrys> 
            .map(entry -> entry.getEntryId())
            .sort()
            //get last existing entry id
            .last(0)          
            //try to create the entry with new incremented id
            .flatMap(id -> createEntry(id + 1, retryInterval));
}

private Mono<? extends Integer> createEntry(int newEntryId, long retryInterval) {
    return saveEntry(newEntryId) //--> return Mono<Boolean> true if saved false if id already exists
            .doOnNext(applied -> LOGGER.info("Successfully created entry with id: {} ? {} ", newEntryId, applied)) //--> Why this is called multiple times??
            .flatMap(applied -> !applied
                    //applied false shows id already exists, so try again recursively with new incremented id
                    ? createEntry(newEntryId + 1, retryInterval)
                    : Mono.just(newEntryId))
            .doOnError(e -> LOGGER.warn("Error creating entry with id {} ? {} : ", newEntryId, e));
            .retryWhen(Retry.anyOf(RuntimeException.class)
                            .exponentialBackoff(Duration.ofSeconds(retryInterval), Duration.ofSeconds(retryInterval))); //-->retry on creation if any exception 
}

Вышеупомянутая реализация дает мне неожиданное поведение: регистратор данных Успешно созданная запись с идентификатором: вызывается несколько раз для одного и того же идентификатора .. однако я ожидаю, что он будет вызван только один раз. Примечание: поведение останется таким же, даже если я удалю retryWhen.


person Laxmikant    schedule 18.09.2020    source источник


Ответы (1)


Наконец я выяснил проблему в своем коде. Проблема была в фрагменте кода ниже

// create entry 50 times 
 void createEntries() {
        LOGGER.info("going to create 50 entries);
        Flux.range(1, 50)
            .flatMap(i -> createEntry(5))
            .subscribe();
    }

Это вызвало метод 50 раз, прежде чем даже saveEntry(newEntryId) был завершен. Я исправил проблему, используя repeat api, как показано ниже:

// create entry 50 times 
 void createEntries() {
        LOGGER.info("going to create 50 entries);
        createEntry(5).subscribe();
    }

//method to create an entry in db with incremental id  
private Flux<? extends Integer> createEntry(long retryInterval) {
    return (customRepository.findAllEntry()) //-->A db call which returns all entries flux<Entrys> 
            .map(entry -> entry.getEntryId())
            .sort()
            //get last existing entry id
            .last(0)          
            //try to create the entry with new incremented id
            .flatMap(id -> createEntry(id + 1, retryInterval))
            .repeat(49); //-->This fixes my issue will only be invoked  49 times again onComplete(). And hence will create 50 entries
}

private Mono<? extends Integer> createEntry(int newEntryId, long retryInterval) {
    return saveEntry(newEntryId) //--> return Mono<Boolean> true if saved false if id already exists
            .doOnNext(applied -> LOGGER.info("Successfully created entry with id: {} ? {} ", newEntryId, applied)) 
            .flatMap(applied -> !applied
                    //applied false shows id already exists, so try again recursively with new incremented id
                    ? createEntry(newEntryId + 1, retryInterval)
                    : Mono.just(newEntryId))
            .doOnError(e -> LOGGER.warn("Error creating entry with id {} ? {} : ", newEntryId, e));
            .retryWhen(Retry.anyOf(RuntimeException.class)
                            .exponentialBackoff(Duration.ofSeconds(retryInterval), Duration.ofSeconds(retryInterval))); //-->retry on creation if any exception 
}
person Laxmikant    schedule 20.09.2020