go, k8s

阿里云kafka导入到k8s

背景

我环境需要降本,然后需要把阿里云上的kafka迁移到k8s集群中,不需要迁移数据,只迁移topic和group即可。topic有77个,group163个,一个一个创建还是有点麻烦了,所以通过脚本实现。

kafka-export.go

通过调用阿里云api来获取topic和group信息,导出到txt中。

package main

import (
    "encoding/json"
    "fmt"
    "os"

    alikafka "github.com/alibabacloud-go/alikafka-20190916/v3/client"
    openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client"
    util "github.com/alibabacloud-go/tea-utils/v2/service"
    "github.com/alibabacloud-go/tea/tea"
)

// GroupInfo 消费者组信息
type GroupInfo struct {
    GroupName string `json:"group_name"`
    Remark    string `json:"remark"`
}

// TopicInfo Topic信息
type TopicInfo struct {
    TopicName    string `json:"topic_name"`
    PartitionNum int32  `json:"partition_num"`
    Remark       string `json:"remark"`
}

// CreateClient 使用AK/SK初始化账号Client
func CreateClient(regionId, accessKeyId, accessKeySecret string) (*alikafka.Client, error) {
    config := &openapi.Config{
        AccessKeyId:     tea.String(accessKeyId),
        AccessKeySecret: tea.String(accessKeySecret),
    }
    // Endpoint 请参考 https://api.aliyun.com/product/alikafka
    config.Endpoint = tea.String(fmt.Sprintf("alikafka.%s.aliyuncs.com", regionId))

    result, err := alikafka.NewClient(config)
    return result, err
}

func getConsumerList(client *alikafka.Client, instanceId, regionId string) ([]*GroupInfo, error) {
    request := &alikafka.GetConsumerListRequest{
        InstanceId: tea.String(instanceId),
        RegionId:   tea.String(regionId),
    }

    runtime := &util.RuntimeOptions{}
    var groups []*GroupInfo

    tryErr := func() error {
        defer func() {
            if r := tea.Recover(recover()); r != nil {
                fmt.Printf("捕获到异常: %v\n", r)
            }
        }()

        resp, err := client.GetConsumerListWithOptions(request, runtime)
        if err != nil {
            return err
        }

        if resp.Body.ConsumerList != nil && resp.Body.ConsumerList.ConsumerVO != nil {
            for _, consumer := range resp.Body.ConsumerList.ConsumerVO {
                group := &GroupInfo{
                    GroupName: tea.StringValue(consumer.ConsumerId),
                    Remark:    tea.StringValue(consumer.Remark),
                }
                groups = append(groups, group)
            }
        }

        return nil
    }()

    if tryErr != nil {
        var sdkError = &tea.SDKError{}
        if _t, ok := tryErr.(*tea.SDKError); ok {
            sdkError = _t
        } else {
            sdkError.Message = tea.String(tryErr.Error())
        }
        return nil, fmt.Errorf("获取消费者组列表失败: %s", tea.StringValue(sdkError.Message))
    }

    return groups, nil
}

func getTopicList(client *alikafka.Client, instanceId, regionId string) ([]*TopicInfo, error) {
    request := &alikafka.GetTopicListRequest{
        InstanceId: tea.String(instanceId),
        RegionId:   tea.String(regionId),
    }

    runtime := &util.RuntimeOptions{}
    var topics []*TopicInfo

    tryErr := func() error {
        defer func() {
            if r := tea.Recover(recover()); r != nil {
                fmt.Printf("捕获到异常: %v\n", r)
            }
        }()

        resp, err := client.GetTopicListWithOptions(request, runtime)
        if err != nil {
            return err
        }

        if resp.Body.TopicList != nil && resp.Body.TopicList.TopicVO != nil {
            for _, topic := range resp.Body.TopicList.TopicVO {
                topicInfo := &TopicInfo{
                    TopicName:    tea.StringValue(topic.Topic),
                    PartitionNum: tea.Int32Value(topic.PartitionNum),
                    Remark:       tea.StringValue(topic.Remark),
                }
                topics = append(topics, topicInfo)
            }
        }

        return nil
    }()

    if tryErr != nil {
        var sdkError = &tea.SDKError{}
        if _t, ok := tryErr.(*tea.SDKError); ok {
            sdkError = _t
        } else {
            sdkError.Message = tea.String(tryErr.Error())
        }
        return nil, fmt.Errorf("获取Topic列表失败: %s", tea.StringValue(sdkError.Message))
    }

    return topics, nil
}

func saveToFile(filename string, data interface{}) error {
    jsonData, err := json.MarshalIndent(data, "", "  ")
    if err != nil {
        return err
    }
    return os.WriteFile(filename, jsonData, 0644)
}

func main() {
    // 直接指定AK/SK和配置
    accessKeyId := "LTAI5tMVD2N5pACx2ghGVAtd"           // 替换为你的AccessKeyId
    accessKeySecret := "92WMctHJl4ZH2VhXteT0YAKZu9iWYk" // 替换为你的AccessKeySecret
    regionId := "cn-hangzhou"                           // 地区
    instanceId := "alikafka_pre-cn-j4g3tbd2v001"        // 实例ID

    // 创建客户端
    fmt.Printf("正在创建客户端 (Region: %s, Instance: %s)...\n", regionId, instanceId)
    client, err := CreateClient(regionId, accessKeyId, accessKeySecret)
    if err != nil {
        fmt.Printf("创建客户端失败: %v\n", err)
        os.Exit(1)
    }

    // 获取消费者组列表
    fmt.Println("正在获取消费者组列表...")
    groups, err := getConsumerList(client, instanceId, regionId)
    if err != nil {
        fmt.Printf("获取消费者组列表失败: %v\n", err)
        os.Exit(1)
    }
    fmt.Printf("成功获取 %d 个消费者组\n", len(groups))

    // 保存消费者组到文件
    err = saveToFile("groups.txt", groups)
    if err != nil {
        fmt.Printf("保存消费者组文件失败: %v\n", err)
        os.Exit(1)
    }
    fmt.Println("消费者组信息已保存到 groups.txt")

    // 获取Topic列表
    fmt.Println("正在获取Topic列表...")
    topics, err := getTopicList(client, instanceId, regionId)
    if err != nil {
        fmt.Printf("获取Topic列表失败: %v\n", err)
        os.Exit(1)
    }
    fmt.Printf("成功获取 %d 个Topic\n", len(topics))

    // 保存Topic到文件
    err = saveToFile("topics.txt", topics)
    if err != nil {
        fmt.Printf("保存Topic文件失败: %v\n", err)
        os.Exit(1)
    }
    fmt.Println("Topic信息已保存到 topics.txt")

    fmt.Println("\n导出完成!")
    fmt.Printf("  - 消费者组: %d 个\n", len(groups))
    fmt.Printf("  - Topic: %d 个\n", len(topics))
}

file

kafka-import.sh

#!/bin/bash

# Kafka批量导入脚本
# 用途: 将从阿里云导出的Topic和消费者组导入到K8s Kafka集群

set -e  # 遇到错误立即退出

# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color

# 配置
KAFKA_BROKER="${KAFKA_BROKER:-127.0.0.1:9092}"
TOPICS_FILE="${TOPICS_FILE:-topics.txt}"
GROUPS_FILE="${GROUPS_FILE:-groups.txt}"
REPLICATION_FACTOR="${REPLICATION_FACTOR:-2}"
POD_NAME="${POD_NAME:-kafka-0}"
NAMESPACE="${NAMESPACE:-auto}"

# 打印带颜色的消息
print_info() {
    echo -e "${BLUE}[INFO]${NC} $1"
}

print_success() {
    echo -e "${GREEN}[SUCCESS]${NC} $1"
}

print_warning() {
    echo -e "${YELLOW}[WARNING]${NC} $1"
}

print_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

# 检查必要文件
check_files() {
    print_info "检查必要文件..."

    if [ ! -f "$TOPICS_FILE" ]; then
        print_error "文件不存在: $TOPICS_FILE"
        exit 1
    fi

    if [ ! -f "$GROUPS_FILE" ]; then
        print_error "文件不存在: $GROUPS_FILE"
        exit 1
    fi

    print_success "文件检查完成"
}

# 检查kubectl和jq
check_dependencies() {
    print_info "检查依赖..."

    if ! command -v kubectl &> /dev/null; then
        print_error "kubectl 未安装,请先安装 kubectl"
        exit 1
    fi

    if ! command -v jq &> /dev/null; then
        print_error "jq 未安装,请先安装 jq (用于解析JSON)"
        print_info "安装方法: sudo apt-get install jq 或 brew install jq"
        exit 1
    fi

    print_success "依赖检查完成"
}

# 测试Kafka连接
test_kafka_connection() {
    print_info "测试Kafka连接: $KAFKA_BROKER"

    if kubectl exec -n "$NAMESPACE" "$POD_NAME" -- \
        kafka-broker-api-versions.sh --bootstrap-server "$KAFKA_BROKER" &> /dev/null; then
        print_success "Kafka连接成功"
    else
        print_error "无法连接到Kafka: $KAFKA_BROKER"
        print_info "请检查:"
        print_info "  1. POD名称是否正确: $POD_NAME"
        print_info "  2. Namespace是否正确: $NAMESPACE"
        print_info "  3. Kafka地址是否正确: $KAFKA_BROKER"
        exit 1
    fi
}

# 创建Topics
create_topics() {
    print_info "开始创建Topics..."

    local topic_count=$(jq '. | length' "$TOPICS_FILE")
    print_info "需要创建 $topic_count 个Topic"

    local success_count=0
    local failed_count=0
    local exists_count=0

    # 修复:使用进程替换而不是管道,避免子shell问题
    while IFS= read -r topic; do
        local topic_name=$(echo "$topic" | jq -r '.topic_name')
        local partition_num=$(echo "$topic" | jq -r '.partition_num')
        local remark=$(echo "$topic" | jq -r '.remark')

        print_info "创建Topic: $topic_name (分区数: $partition_num)"

        # 创建Topic
        local result=$(kubectl exec -n "$NAMESPACE" "$POD_NAME" -- \
            kafka-topics.sh \
            --bootstrap-server "$KAFKA_BROKER" \
            --create \
            --topic "$topic_name" \
            --partitions "$partition_num" \
            --replication-factor "$REPLICATION_FACTOR" \
            2>&1 || true)

        if echo "$result" | grep -q "Created topic"; then
            print_success "✓ Topic创建成功: $topic_name"
            success_count=$((success_count + 1))
        elif echo "$result" | grep -q "already exists"; then
            print_warning "⊙ Topic已存在: $topic_name"
            exists_count=$((exists_count + 1))
        else
            print_error "✗ Topic创建失败: $topic_name"
            print_error "  错误信息: $result"
            failed_count=$((failed_count + 1))
        fi

        # 如果有备注,打印出来
        if [ "$remark" != "null" ] && [ -n "$remark" ]; then
            print_info "  备注: $remark"
        fi
    done < <(jq -c '.[]' "$TOPICS_FILE")

    echo ""
    print_info "Topic创建统计:"
    print_success "  成功: $success_count"
    print_warning "  已存在: $exists_count"
    print_error "  失败: $failed_count"

    # 如果所有topic都已存在,提示用户
    if [ $exists_count -eq $topic_count ] && [ $topic_count -gt 0 ]; then
        print_warning "所有Topic都已存在,是否要重新执行导入所有Topic?"
        print_warning "注意:重新创建已存在的Topic可能需要先删除现有Topic"
    fi
}

# 创建消费者组
create_consumer_groups() {
    print_info "开始创建消费者组..."

    local group_count=$(jq '. | length' "$GROUPS_FILE")
    print_info "需要创建 $group_count 个消费者组"

    # 获取第一个topic作为占位topic
    local placeholder_topic=$(jq -r '.[0].topic_name' "$TOPICS_FILE" 2>/dev/null || echo "")

    if [ -z "$placeholder_topic" ] || [ "$placeholder_topic" = "null" ]; then
        print_warning "没有可用的Topic,跳过消费者组创建"
        return
    fi

    print_info "使用占位Topic: $placeholder_topic"

    local success_count=0
    local failed_count=0

    # 使用进程替换而不是管道
    while IFS= read -r group; do
        local group_name=$(echo "$group" | jq -r '.group_name')
        local remark=$(echo "$group" | jq -r '.remark')

        print_info "创建消费者组: $group_name"

        # 通过消费一条消息来创建消费者组
        # 使用 --from-beginning --max-messages 0 只是为了触发组的创建
        local result=$(kubectl exec -n "$NAMESPACE" "$POD_NAME" -- \
            timeout 5 kafka-console-consumer.sh \
            --bootstrap-server "$KAFKA_BROKER" \
            --topic "$placeholder_topic" \
            --group "$group_name" \
            --max-messages 0 \
            --timeout-ms 3000 \
            2>&1 || true)

        # 验证消费者组是否创建成功
        local verify=$(kubectl exec -n "$NAMESPACE" "$POD_NAME" -- \
            kafka-consumer-groups.sh \
            --bootstrap-server "$KAFKA_BROKER" \
            --list 2>/dev/null | grep -w "^$group_name$" || echo "")

        if [ -n "$verify" ]; then
            print_success "✓ 消费者组创建成功: $group_name"
            success_count=$((success_count + 1))
        else
            print_error "✗ 消费者组创建失败: $group_name"
            failed_count=$((failed_count + 1))
        fi

        # 如果有备注,打印出来
        if [ "$remark" != "null" ] && [ -n "$remark" ]; then
            print_info "  备注: $remark"
        fi
    done < <(jq -c '.[]' "$GROUPS_FILE")

    echo ""
    print_info "消费者组创建统计:"
    print_success "  成功: $success_count"
    print_error "  失败: $failed_count"
}

# 验证导入结果
verify_import() {
    print_info "验证导入结果..."

    # 列出所有topics
    print_info "Kafka中的所有Topic:"
    kubectl exec -n "$NAMESPACE" "$POD_NAME" -- \
        kafka-topics.sh \
        --bootstrap-server "$KAFKA_BROKER" \
        --list

    echo ""

    # 列出所有消费者组
    print_info "Kafka中的所有消费者组:"
    kubectl exec -n "$NAMESPACE" "$POD_NAME" -- \
        kafka-consumer-groups.sh \
        --bootstrap-server "$KAFKA_BROKER" \
        --list
}

# 主函数
main() {
    echo ""
    print_info "================================================"
    print_info "  Kafka 批量导入工具"
    print_info "================================================"
    echo ""
    print_info "配置信息:"
    print_info "  Kafka Broker: $KAFKA_BROKER"
    print_info "  Pod名称: $POD_NAME"
    print_info "  Namespace: $NAMESPACE"
    print_info "  副本因子: $REPLICATION_FACTOR"
    print_info "  Topics文件: $TOPICS_FILE"
    print_info "  Groups文件: $GROUPS_FILE"
    echo ""

    check_dependencies
    check_files
    test_kafka_connection

    echo ""
    read -p "确认开始导入? (y/N): " -n 1 -r
    echo ""
    if [[ ! $REPLY =~ ^[Yy]$ ]]; then
        print_warning "操作已取消"
        exit 0
    fi

    echo ""
    create_topics

    echo ""
    #create_consumer_groups

    echo ""
    verify_import

    echo ""
    print_success "================================================"
    print_success "  导入完成!"
    print_success "================================================"
}

# 显示帮助信息
show_help() {
    cat << EOF
用法: $0 [选项]

选项:
  -b, --broker BROKER       Kafka broker地址 (默认: 127.0.0.1:9092)
  -p, --pod POD_NAME        Kafka pod名称 (默认: kafka-0)
  -n, --namespace NS        K8s namespace (默认: default)
  -r, --replication RF      副本因子 (默认: 2)
  -t, --topics FILE         Topics文件路径 (默认: topics.txt)
  -g, --groups FILE         Groups文件路径 (默认: groups.txt)
  -h, --help                显示此帮助信息

环境变量:
  KAFKA_BROKER              Kafka broker地址
  POD_NAME                  Kafka pod名称
  NAMESPACE                 K8s namespace
  REPLICATION_FACTOR        副本因子
  TOPICS_FILE               Topics文件路径
  GROUPS_FILE               Groups文件路径

示例:
  $0
  $0 --broker 127.0.0.1:9092 --pod kafka-0
  KAFKA_BROKER=127.0.0.1:9092 POD_NAME=kafka-1 $0
EOF
}

# 解析命令行参数
while [[ $# -gt 0 ]]; do
    case $1 in
        -b|--broker)
            KAFKA_BROKER="$2"
            shift 2
            ;;
        -p|--pod)
            POD_NAME="$2"
            shift 2
            ;;
        -n|--namespace)
            NAMESPACE="$2"
            shift 2
            ;;
        -r|--replication)
            REPLICATION_FACTOR="$2"
            shift 2
            ;;
        -t|--topics)
            TOPICS_FILE="$2"
            shift 2
            ;;
        -g|--groups)
            GROUPS_FILE="$2"
            shift 2
            ;;
        -h|--help)
            show_help
            exit 0
            ;;
        *)
            print_error "未知选项: $1"
            show_help
            exit 1
            ;;
    esac
done

# 执行主函数
main

不用创建group,kafka集群已经开启了自动创建group:

bash -c "echo 'test_message' | kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test"
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --group test12--from-beginning --max-messages 1
kafka-consumer-groups.sh   --bootstrap-server 127.0.0.1:9092   --list

file

0 0 投票数
文章评分
订阅评论
提醒
guest

0 评论
最旧
最新 最多投票
内联反馈
查看所有评论

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部
0
希望看到您的想法,请您发表评论x