近期项目中有一个数据展示功能,其数据来源于Kafka,项目要求Kafka每次都消费最新的数据,简要记录下其实现方案。

固定组实现-不生效

auto.offset.reset设置为latest同时采用固定group,此种方式只在第一次读取时有效,后续再次读取时仍然从上次读取的地方开始继续读,不满足使用要求。

Properties props = new Properties();
props.put("bootstrap.servers", "10.10.2.98:9004");
// 每次将组名动态生成
props.put("group.id", "test-group-1");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

String topic = "raw_message";
consumer.subscribe(Arrays.asList(topic));

动态组实现-生效

auto.offset.reset设置为latest,同时每次消费时将group的名称动态生成,这样即可确保每次读取的都是最新的消息。

其缺点是group的数量会不断增加,偏移量___consumer_offsets 多次保存,且没找到有效的删除group的方法,也没办法做到定期清理,会对性能产生影响,通常只作为测试与实现使用。

Properties props = new Properties();
props.put("bootstrap.servers", "10.10.2.98:9004");
// 每次将组名动态生成
props.put("group.id", "test-group-" + RandomStringUtils.randomAlphabetic(6));
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

String topic = "raw_message";
consumer.subscribe(Arrays.asList(topic));

固定组实现-生效

auto.offset.reset设置为latest的同时,group名称固定不变 ,给对应Consumer调用seekToEnd()方法,此种方式不需要动态切换组,推荐使用此方式。

Properties props = new Properties();
props.put("bootstrap.servers", "10.10.2.98:9004");
props.put("group.id", "test-group-1");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

String topic = "raw_message";
TopicPartition topicPartition = new TopicPartition(topic, 0);
List<TopicPartition> topics = Arrays.asList(topicPartition);
consumer.assign(topics);
consumer.seekToEnd(topics);