Мы используем spring-integration-kafka версии 3.1.2.RELEASE и int-kafka: message-driven-channel-adapter для получения сообщений из удаленной темы kafka. Производитель отправляет зашифрованное сообщение, и мы расшифровываем фактическое сообщение с помощью десериализатора. Мы можем использовать все сообщения, опубликованные в теме. Мы использовали автоматическую фиксацию как ложную. Мы хотели бы знать, как зафиксировать или подтвердить сообщение от нашей службы после успешной обработки сообщения. Может ли кто-нибудь помочь нам, как зафиксировать сообщения, прочитанные из канала, управляемого сообщениями, и предоставить некоторую эталонную реализацию?
Когда мы устанавливаем для автоматической фиксации значение true, мы предполагаем, что сообщение будет зафиксировано после интервала фиксации, но мы хотели бы обработать его в нашей службе. Я столкнулся с приведенным ниже примером, но мы получаем настраиваемый объект после десериализации, а не сообщение о весенней интеграции. поэтому мы хотели бы знать, как реализовать подобное подтверждение в преобразователе, чтобы мы не фиксировали сообщение в случае каких-либо ошибок во время преобразования. Зафиксируйте сообщение после успешного преобразования.
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if(acknowledgment != null) { System.out.println("Acknowledgment provided");
acknowledgment.acknowledge(); }
}
<int-kafka:message-driven-channel-adapter
id="kafkaMessageListener"
listener-container="kafkaMessageContainer" auto-startup="true"
phase="100" send-timeout="5000" mode="record"
message-converter="messageConverter"
recovery-callback="recoveryCallback" error-message-strategy="ems"
channel="inputFromKafkaChannel" error-channel="errorChannel" />
<int:transformer id="transformerid"
ref="transformerBean"
input-channel="inputFromKafkaChannel" method="transform"
output-channel="messageTransformer" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${spring.kafka.bootstrap-servers}" />
<entry key="enable.auto.commit" value="false" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="group.id" value="${spring.kafka.consumer.group-id}" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
<entry key="value.deserializer"
value="com.test.CustomDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="${spring.kafka.topics}" />
</bean>
</constructor-arg>
</bean>