Как подтвердить сообщение kafka, прочитанное потребителем, с помощью Spring интеграции kafka

Мы используем spring-integration-kafka версии 3.1.2.RELEASE и int-kafka: message-driven-channel-adapter для получения сообщений из удаленной темы kafka. Производитель отправляет зашифрованное сообщение, и мы расшифровываем фактическое сообщение с помощью десериализатора. Мы можем использовать все сообщения, опубликованные в теме. Мы использовали автоматическую фиксацию как ложную. Мы хотели бы знать, как зафиксировать или подтвердить сообщение от нашей службы после успешной обработки сообщения. Может ли кто-нибудь помочь нам, как зафиксировать сообщения, прочитанные из канала, управляемого сообщениями, и предоставить некоторую эталонную реализацию?

Когда мы устанавливаем для автоматической фиксации значение true, мы предполагаем, что сообщение будет зафиксировано после интервала фиксации, но мы хотели бы обработать его в нашей службе. Я столкнулся с приведенным ниже примером, но мы получаем настраиваемый объект после десериализации, а не сообщение о весенней интеграции. поэтому мы хотели бы знать, как реализовать подобное подтверждение в преобразователе, чтобы мы не фиксировали сообщение в случае каких-либо ошибок во время преобразования. Зафиксируйте сообщение после успешного преобразования.

  Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); 
  if(acknowledgment != null) { System.out.println("Acknowledgment provided");
  acknowledgment.acknowledge(); }
   }



<int-kafka:message-driven-channel-adapter
    id="kafkaMessageListener"
    listener-container="kafkaMessageContainer" auto-startup="true"
    phase="100" send-timeout="5000" mode="record"
    message-converter="messageConverter"
    recovery-callback="recoveryCallback" error-message-strategy="ems"
    channel="inputFromKafkaChannel" error-channel="errorChannel" />

<int:transformer id="transformerid"
    ref="transformerBean"
    input-channel="inputFromKafkaChannel" method="transform"
    output-channel="messageTransformer" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="${spring.kafka.bootstrap-servers}" />
                    <entry key="enable.auto.commit" value="false" />
                    <entry key="auto.commit.interval.ms" value="100" />
                    <entry key="session.timeout.ms" value="15000" />
                    <entry key="group.id" value="${spring.kafka.consumer.group-id}" />
                    <entry key="key.deserializer"
                        value="org.apache.kafka.common.serialization.StringDeserializer" />
                    <entry key="value.deserializer"
                        value="com.test.CustomDeserializer" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.ContainerProperties">
            <constructor-arg name="topics" value="${spring.kafka.topics}" />
        </bean>
    </constructor-arg>
</bean>

person Muthu    schedule 19.05.2019    source источник


Ответы (1)


auto.commit.offset=true означает, что библиотека kafka-clients фиксирует смещения.

При значении false (предпочтительно в Spring для Apache Kafka) контейнер слушателя фиксирует смещения после каждого пакета, полученного poll() по умолчанию, но механизм управляется свойством контейнера AckMode.

См. Выполнение смещений. .

Если вы установите AckMode контейнера в MANUAL или MANUAL_IMMEDIATE, тогда ваше приложение должно выполнять фиксации, используя объект Acknowledgment.

При использовании Spring Integration объект Acknowledgment доступен в заголовке KafkaHeaders.ACKNOWLEDGMENT.

В большинстве случаев следует использовать AckMode.BATCH (по умолчанию) или AckMode.RECORD, и вашему приложению не нужно беспокоиться о фиксации смещений.

person Gary Russell    schedule 19.05.2019
comment
Спасибо, Гэри Рассел, за оперативный ответ. - person Muthu; 19.05.2019