大数据之kafka Consumer 订阅模式 正则表达式 / 主题列表 实践

大数据阿木 发布于 3 天前 1 次阅读


Kafka Consumer 订阅模式实践:正则表达式与主题列表

Apache Kafka 是一个分布式流处理平台,它提供了高吞吐量、可扩展性和容错性的消息队列服务。在 Kafka 中,Consumer 是用于从 Kafka 集群中读取消息的应用程序组件。Consumer 可以订阅一个或多个主题,并从这些主题中消费消息。本文将围绕 Kafka Consumer 的订阅模式,特别是正则表达式和主题列表的订阅方式,进行实践和探讨。

Kafka Consumer 简介

Kafka Consumer 是 Kafka 客户端库的一部分,它允许应用程序订阅 Kafka 主题并消费消息。Consumer 可以使用多种方式来订阅主题,包括正则表达式和主题列表。

正则表达式订阅模式

正则表达式订阅模式允许 Consumer 订阅所有匹配特定正则表达式的主题。这种方式在处理动态主题名称时非常有用。

实践步骤

1. 创建 Kafka 集群:需要有一个运行中的 Kafka 集群。

2. 创建主题:在 Kafka 集群中创建一些主题,例如 `topic1`、`topic2` 和 `topic3`。

3. 编写 Consumer 代码:使用 Kafka 客户端库编写 Consumer 代码,使用正则表达式订阅主题。

代码示例

java

import org.apache.kafka.clients.consumer.ConsumerConfig;


import org.apache.kafka.clients.consumer.ConsumerRecord;


import org.apache.kafka.clients.consumer.ConsumerRecords;


import org.apache.kafka.clients.consumer.KafkaConsumer;


import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;


import java.util.Arrays;


import java.util.Properties;

public class RegexConsumerExample {


public static void main(String[] args) {


Properties props = new Properties();


props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");


props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());


props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());


props.put(ConsumerConfig.GROUP_ID_CONFIG, "regex-group");


props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);


consumer.subscribe(Arrays.asList("topic."));

while (true) {


ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));


for (ConsumerRecord<String, String> record : records) {


System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());


}


}


}


}


在上面的代码中,我们订阅了所有以 `topic.` 开头的主题。

主题列表订阅模式

主题列表订阅模式允许 Consumer 订阅一组特定的主题。

实践步骤

1. 创建 Kafka 集群:确保 Kafka 集群正在运行。

2. 创建主题:在 Kafka 集群中创建一些主题,例如 `topic1`、`topic2` 和 `topic3`。

3. 编写 Consumer 代码:使用 Kafka 客户端库编写 Consumer 代码,订阅一组特定的主题。

代码示例

java

import org.apache.kafka.clients.consumer.ConsumerConfig;


import org.apache.kafka.clients.consumer.ConsumerRecord;


import org.apache.kafka.clients.consumer.ConsumerRecords;


import org.apache.kafka.clients.consumer.KafkaConsumer;


import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;


import java.util.Arrays;


import java.util.Properties;

public class ListConsumerExample {


public static void main(String[] args) {


Properties props = new Properties();


props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");


props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());


props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());


props.put(ConsumerConfig.GROUP_ID_CONFIG, "list-group");


props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);


consumer.subscribe(Arrays.asList("topic1", "topic2", "topic3"));

while (true) {


ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));


for (ConsumerRecord<String, String> record : records) {


System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());


}


}


}


}


在上面的代码中,我们订阅了 `topic1`、`topic2` 和 `topic3` 三个主题。

比较与选择

正则表达式订阅模式和主题列表订阅模式各有优缺点:

- 正则表达式订阅模式:

- 优点:可以动态地订阅所有匹配正则表达式的主题,适用于主题名称不固定的情况。

- 缺点:如果主题名称不符合正则表达式,则无法订阅到该主题。

- 主题列表订阅模式:

- 优点:可以精确地控制订阅的主题,适用于主题名称已知且固定的情况。

- 缺点:需要提前知道所有需要订阅的主题名称。

在实际应用中,应根据具体需求选择合适的订阅模式。

总结

本文介绍了 Kafka Consumer 的订阅模式,包括正则表达式订阅模式和主题列表订阅模式。通过实践示例,展示了如何使用这些模式来订阅 Kafka 主题。在实际应用中,应根据具体需求选择合适的订阅模式,以实现高效的 Kafka 消息消费。