Merge pull request #17389 from sparty02

* gh-17389:
  Polish "Add config property for Kafka consumer isolation level"
  Add config property for Kafka consumer isolation level

Closes gh-17389
pull/17448/head
Andy Wilkinson 5 years ago
commit dde79e5308

@ -23,12 +23,14 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
@ -266,6 +268,11 @@ public class KafkaProperties {
*/ */
private Duration heartbeatInterval; private Duration heartbeatInterval;
/**
* Isolation level for reading messages that have been written transactionally.
*/
private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;
/** /**
* Deserializer class for keys. * Deserializer class for keys.
*/ */
@ -362,6 +369,14 @@ public class KafkaProperties {
this.heartbeatInterval = heartbeatInterval; this.heartbeatInterval = heartbeatInterval;
} }
public IsolationLevel getIsolationLevel() {
return this.isolationLevel;
}
public void setIsolationLevel(IsolationLevel isolationLevel) {
this.isolationLevel = isolationLevel;
}
public Class<?> getKeyDeserializer() { public Class<?> getKeyDeserializer() {
return this.keyDeserializer; return this.keyDeserializer;
} }
@ -406,6 +421,8 @@ public class KafkaProperties {
map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG)); map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG));
map.from(this::getHeartbeatInterval).asInt(Duration::toMillis) map.from(this::getHeartbeatInterval).asInt(Duration::toMillis)
.to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)); .to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG));
map.from(() -> getIsolationLevel().name().toLowerCase(Locale.ROOT))
.to(properties.in(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));

@ -105,6 +105,7 @@ class KafkaAutoConfigurationTests {
"spring.kafka.consumer.enable-auto-commit=false", "spring.kafka.consumer.fetch-max-wait=456", "spring.kafka.consumer.enable-auto-commit=false", "spring.kafka.consumer.fetch-max-wait=456",
"spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=1KB", "spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=1KB",
"spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234", "spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234",
"spring.kafka.consumer.isolation-level = read-committed",
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer", "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer") "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer")
.run((context) -> { .run((context) -> {
@ -133,6 +134,7 @@ class KafkaAutoConfigurationTests {
assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(1024); assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(1024);
assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar"); assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234); assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234);
assertThat(configs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)).isEqualTo("read_committed");
assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
.isEqualTo(LongDeserializer.class); .isEqualTo(LongDeserializer.class);
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))

Loading…
Cancel
Save