背景
我环境需要降本,然后需要把阿里云上的kafka迁移到k8s集群中,不需要迁移数据,只迁移topic和group即可。topic有77个,group163个,一个一个创建还是有点麻烦了,所以通过脚本实现。
kafka-export.go
通过调用阿里云api来获取topic和group信息,导出到txt中。
package main
import (
"encoding/json"
"fmt"
"os"
alikafka "github.com/alibabacloud-go/alikafka-20190916/v3/client"
openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
util "github.com/alibabacloud-go/tea-utils/v2/service"
"github.com/alibabacloud-go/tea/tea"
)
// GroupInfo 消费者组信息
type GroupInfo struct {
GroupName string `json:"group_name"`
Remark string `json:"remark"`
}
// TopicInfo Topic信息
type TopicInfo struct {
TopicName string `json:"topic_name"`
PartitionNum int32 `json:"partition_num"`
Remark string `json:"remark"`
}
// CreateClient 使用AK/SK初始化账号Client
func CreateClient(regionId, accessKeyId, accessKeySecret string) (*alikafka.Client, error) {
config := &openapi.Config{
AccessKeyId: tea.String(accessKeyId),
AccessKeySecret: tea.String(accessKeySecret),
}
// Endpoint 请参考 https://api.aliyun.com/product/alikafka
config.Endpoint = tea.String(fmt.Sprintf("alikafka.%s.aliyuncs.com", regionId))
result, err := alikafka.NewClient(config)
return result, err
}
func getConsumerList(client *alikafka.Client, instanceId, regionId string) ([]*GroupInfo, error) {
request := &alikafka.GetConsumerListRequest{
InstanceId: tea.String(instanceId),
RegionId: tea.String(regionId),
}
runtime := &util.RuntimeOptions{}
var groups []*GroupInfo
tryErr := func() error {
defer func() {
if r := tea.Recover(recover()); r != nil {
fmt.Printf("捕获到异常: %v\n", r)
}
}()
resp, err := client.GetConsumerListWithOptions(request, runtime)
if err != nil {
return err
}
if resp.Body.ConsumerList != nil && resp.Body.ConsumerList.ConsumerVO != nil {
for _, consumer := range resp.Body.ConsumerList.ConsumerVO {
group := &GroupInfo{
GroupName: tea.StringValue(consumer.ConsumerId),
Remark: tea.StringValue(consumer.Remark),
}
groups = append(groups, group)
}
}
return nil
}()
if tryErr != nil {
var sdkError = &tea.SDKError{}
if _t, ok := tryErr.(*tea.SDKError); ok {
sdkError = _t
} else {
sdkError.Message = tea.String(tryErr.Error())
}
return nil, fmt.Errorf("获取消费者组列表失败: %s", tea.StringValue(sdkError.Message))
}
return groups, nil
}
func getTopicList(client *alikafka.Client, instanceId, regionId string) ([]*TopicInfo, error) {
request := &alikafka.GetTopicListRequest{
InstanceId: tea.String(instanceId),
RegionId: tea.String(regionId),
}
runtime := &util.RuntimeOptions{}
var topics []*TopicInfo
tryErr := func() error {
defer func() {
if r := tea.Recover(recover()); r != nil {
fmt.Printf("捕获到异常: %v\n", r)
}
}()
resp, err := client.GetTopicListWithOptions(request, runtime)
if err != nil {
return err
}
if resp.Body.TopicList != nil && resp.Body.TopicList.TopicVO != nil {
for _, topic := range resp.Body.TopicList.TopicVO {
topicInfo := &TopicInfo{
TopicName: tea.StringValue(topic.Topic),
PartitionNum: tea.Int32Value(topic.PartitionNum),
Remark: tea.StringValue(topic.Remark),
}
topics = append(topics, topicInfo)
}
}
return nil
}()
if tryErr != nil {
var sdkError = &tea.SDKError{}
if _t, ok := tryErr.(*tea.SDKError); ok {
sdkError = _t
} else {
sdkError.Message = tea.String(tryErr.Error())
}
return nil, fmt.Errorf("获取Topic列表失败: %s", tea.StringValue(sdkError.Message))
}
return topics, nil
}
func saveToFile(filename string, data interface{}) error {
jsonData, err := json.MarshalIndent(data, "", " ")
if err != nil {
return err
}
return os.WriteFile(filename, jsonData, 0644)
}
func main() {
// 直接指定AK/SK和配置
accessKeyId := "LTAI5tMVD2N5pACx2ghGVAtd" // 替换为你的AccessKeyId
accessKeySecret := "92WMctHJl4ZH2VhXteT0YAKZu9iWYk" // 替换为你的AccessKeySecret
regionId := "cn-hangzhou" // 地区
instanceId := "alikafka_pre-cn-j4g3tbd2v001" // 实例ID
// 创建客户端
fmt.Printf("正在创建客户端 (Region: %s, Instance: %s)...\n", regionId, instanceId)
client, err := CreateClient(regionId, accessKeyId, accessKeySecret)
if err != nil {
fmt.Printf("创建客户端失败: %v\n", err)
os.Exit(1)
}
// 获取消费者组列表
fmt.Println("正在获取消费者组列表...")
groups, err := getConsumerList(client, instanceId, regionId)
if err != nil {
fmt.Printf("获取消费者组列表失败: %v\n", err)
os.Exit(1)
}
fmt.Printf("成功获取 %d 个消费者组\n", len(groups))
// 保存消费者组到文件
err = saveToFile("groups.txt", groups)
if err != nil {
fmt.Printf("保存消费者组文件失败: %v\n", err)
os.Exit(1)
}
fmt.Println("消费者组信息已保存到 groups.txt")
// 获取Topic列表
fmt.Println("正在获取Topic列表...")
topics, err := getTopicList(client, instanceId, regionId)
if err != nil {
fmt.Printf("获取Topic列表失败: %v\n", err)
os.Exit(1)
}
fmt.Printf("成功获取 %d 个Topic\n", len(topics))
// 保存Topic到文件
err = saveToFile("topics.txt", topics)
if err != nil {
fmt.Printf("保存Topic文件失败: %v\n", err)
os.Exit(1)
}
fmt.Println("Topic信息已保存到 topics.txt")
fmt.Println("\n导出完成!")
fmt.Printf(" - 消费者组: %d 个\n", len(groups))
fmt.Printf(" - Topic: %d 个\n", len(topics))
}

kafka-import.sh
#!/bin/bash
# Kafka批量导入脚本
# 用途: 将从阿里云导出的Topic和消费者组导入到K8s Kafka集群
set -e # 遇到错误立即退出
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# 配置
KAFKA_BROKER="${KAFKA_BROKER:-127.0.0.1:9092}"
TOPICS_FILE="${TOPICS_FILE:-topics.txt}"
GROUPS_FILE="${GROUPS_FILE:-groups.txt}"
REPLICATION_FACTOR="${REPLICATION_FACTOR:-2}"
POD_NAME="${POD_NAME:-kafka-0}"
NAMESPACE="${NAMESPACE:-auto}"
# 打印带颜色的消息
print_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
print_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# 检查必要文件
check_files() {
print_info "检查必要文件..."
if [ ! -f "$TOPICS_FILE" ]; then
print_error "文件不存在: $TOPICS_FILE"
exit 1
fi
if [ ! -f "$GROUPS_FILE" ]; then
print_error "文件不存在: $GROUPS_FILE"
exit 1
fi
print_success "文件检查完成"
}
# 检查kubectl和jq
check_dependencies() {
print_info "检查依赖..."
if ! command -v kubectl &> /dev/null; then
print_error "kubectl 未安装,请先安装 kubectl"
exit 1
fi
if ! command -v jq &> /dev/null; then
print_error "jq 未安装,请先安装 jq (用于解析JSON)"
print_info "安装方法: sudo apt-get install jq 或 brew install jq"
exit 1
fi
print_success "依赖检查完成"
}
# 测试Kafka连接
test_kafka_connection() {
print_info "测试Kafka连接: $KAFKA_BROKER"
if kubectl exec -n "$NAMESPACE" "$POD_NAME" -- \
kafka-broker-api-versions.sh --bootstrap-server "$KAFKA_BROKER" &> /dev/null; then
print_success "Kafka连接成功"
else
print_error "无法连接到Kafka: $KAFKA_BROKER"
print_info "请检查:"
print_info " 1. POD名称是否正确: $POD_NAME"
print_info " 2. Namespace是否正确: $NAMESPACE"
print_info " 3. Kafka地址是否正确: $KAFKA_BROKER"
exit 1
fi
}
# 创建Topics
create_topics() {
print_info "开始创建Topics..."
local topic_count=$(jq '. | length' "$TOPICS_FILE")
print_info "需要创建 $topic_count 个Topic"
local success_count=0
local failed_count=0
local exists_count=0
# 修复:使用进程替换而不是管道,避免子shell问题
while IFS= read -r topic; do
local topic_name=$(echo "$topic" | jq -r '.topic_name')
local partition_num=$(echo "$topic" | jq -r '.partition_num')
local remark=$(echo "$topic" | jq -r '.remark')
print_info "创建Topic: $topic_name (分区数: $partition_num)"
# 创建Topic
local result=$(kubectl exec -n "$NAMESPACE" "$POD_NAME" -- \
kafka-topics.sh \
--bootstrap-server "$KAFKA_BROKER" \
--create \
--topic "$topic_name" \
--partitions "$partition_num" \
--replication-factor "$REPLICATION_FACTOR" \
2>&1 || true)
if echo "$result" | grep -q "Created topic"; then
print_success "✓ Topic创建成功: $topic_name"
success_count=$((success_count + 1))
elif echo "$result" | grep -q "already exists"; then
print_warning "⊙ Topic已存在: $topic_name"
exists_count=$((exists_count + 1))
else
print_error "✗ Topic创建失败: $topic_name"
print_error " 错误信息: $result"
failed_count=$((failed_count + 1))
fi
# 如果有备注,打印出来
if [ "$remark" != "null" ] && [ -n "$remark" ]; then
print_info " 备注: $remark"
fi
done < <(jq -c '.[]' "$TOPICS_FILE")
echo ""
print_info "Topic创建统计:"
print_success " 成功: $success_count"
print_warning " 已存在: $exists_count"
print_error " 失败: $failed_count"
# 如果所有topic都已存在,提示用户
if [ $exists_count -eq $topic_count ] && [ $topic_count -gt 0 ]; then
print_warning "所有Topic都已存在,是否要重新执行导入所有Topic?"
print_warning "注意:重新创建已存在的Topic可能需要先删除现有Topic"
fi
}
# 创建消费者组
create_consumer_groups() {
print_info "开始创建消费者组..."
local group_count=$(jq '. | length' "$GROUPS_FILE")
print_info "需要创建 $group_count 个消费者组"
# 获取第一个topic作为占位topic
local placeholder_topic=$(jq -r '.[0].topic_name' "$TOPICS_FILE" 2>/dev/null || echo "")
if [ -z "$placeholder_topic" ] || [ "$placeholder_topic" = "null" ]; then
print_warning "没有可用的Topic,跳过消费者组创建"
return
fi
print_info "使用占位Topic: $placeholder_topic"
local success_count=0
local failed_count=0
# 使用进程替换而不是管道
while IFS= read -r group; do
local group_name=$(echo "$group" | jq -r '.group_name')
local remark=$(echo "$group" | jq -r '.remark')
print_info "创建消费者组: $group_name"
# 通过消费一条消息来创建消费者组
# 使用 --from-beginning --max-messages 0 只是为了触发组的创建
local result=$(kubectl exec -n "$NAMESPACE" "$POD_NAME" -- \
timeout 5 kafka-console-consumer.sh \
--bootstrap-server "$KAFKA_BROKER" \
--topic "$placeholder_topic" \
--group "$group_name" \
--max-messages 0 \
--timeout-ms 3000 \
2>&1 || true)
# 验证消费者组是否创建成功
local verify=$(kubectl exec -n "$NAMESPACE" "$POD_NAME" -- \
kafka-consumer-groups.sh \
--bootstrap-server "$KAFKA_BROKER" \
--list 2>/dev/null | grep -w "^$group_name$" || echo "")
if [ -n "$verify" ]; then
print_success "✓ 消费者组创建成功: $group_name"
success_count=$((success_count + 1))
else
print_error "✗ 消费者组创建失败: $group_name"
failed_count=$((failed_count + 1))
fi
# 如果有备注,打印出来
if [ "$remark" != "null" ] && [ -n "$remark" ]; then
print_info " 备注: $remark"
fi
done < <(jq -c '.[]' "$GROUPS_FILE")
echo ""
print_info "消费者组创建统计:"
print_success " 成功: $success_count"
print_error " 失败: $failed_count"
}
# 验证导入结果
verify_import() {
print_info "验证导入结果..."
# 列出所有topics
print_info "Kafka中的所有Topic:"
kubectl exec -n "$NAMESPACE" "$POD_NAME" -- \
kafka-topics.sh \
--bootstrap-server "$KAFKA_BROKER" \
--list
echo ""
# 列出所有消费者组
print_info "Kafka中的所有消费者组:"
kubectl exec -n "$NAMESPACE" "$POD_NAME" -- \
kafka-consumer-groups.sh \
--bootstrap-server "$KAFKA_BROKER" \
--list
}
# 主函数
main() {
echo ""
print_info "================================================"
print_info " Kafka 批量导入工具"
print_info "================================================"
echo ""
print_info "配置信息:"
print_info " Kafka Broker: $KAFKA_BROKER"
print_info " Pod名称: $POD_NAME"
print_info " Namespace: $NAMESPACE"
print_info " 副本因子: $REPLICATION_FACTOR"
print_info " Topics文件: $TOPICS_FILE"
print_info " Groups文件: $GROUPS_FILE"
echo ""
check_dependencies
check_files
test_kafka_connection
echo ""
read -p "确认开始导入? (y/N): " -n 1 -r
echo ""
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
print_warning "操作已取消"
exit 0
fi
echo ""
create_topics
echo ""
#create_consumer_groups
echo ""
verify_import
echo ""
print_success "================================================"
print_success " 导入完成!"
print_success "================================================"
}
# 显示帮助信息
show_help() {
cat << EOF
用法: $0 [选项]
选项:
-b, --broker BROKER Kafka broker地址 (默认: 127.0.0.1:9092)
-p, --pod POD_NAME Kafka pod名称 (默认: kafka-0)
-n, --namespace NS K8s namespace (默认: default)
-r, --replication RF 副本因子 (默认: 2)
-t, --topics FILE Topics文件路径 (默认: topics.txt)
-g, --groups FILE Groups文件路径 (默认: groups.txt)
-h, --help 显示此帮助信息
环境变量:
KAFKA_BROKER Kafka broker地址
POD_NAME Kafka pod名称
NAMESPACE K8s namespace
REPLICATION_FACTOR 副本因子
TOPICS_FILE Topics文件路径
GROUPS_FILE Groups文件路径
示例:
$0
$0 --broker 127.0.0.1:9092 --pod kafka-0
KAFKA_BROKER=127.0.0.1:9092 POD_NAME=kafka-1 $0
EOF
}
# 解析命令行参数
while [[ $# -gt 0 ]]; do
case $1 in
-b|--broker)
KAFKA_BROKER="$2"
shift 2
;;
-p|--pod)
POD_NAME="$2"
shift 2
;;
-n|--namespace)
NAMESPACE="$2"
shift 2
;;
-r|--replication)
REPLICATION_FACTOR="$2"
shift 2
;;
-t|--topics)
TOPICS_FILE="$2"
shift 2
;;
-g|--groups)
GROUPS_FILE="$2"
shift 2
;;
-h|--help)
show_help
exit 0
;;
*)
print_error "未知选项: $1"
show_help
exit 1
;;
esac
done
# 执行主函数
main
不用创建group,kafka集群已经开启了自动创建group:
bash -c "echo 'test_message' | kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test"
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --group test12--from-beginning --max-messages 1
kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
