fluent-operator部署

简介

官方地址:https://github.com/fluent/fluent-operator
fluent OperatorF包括fluentbit和fluentd。虽然fluentbit和fluentd都可以收集、处理(解析和过滤)并将日志转发到最终目的地,但它们也有各自的优势。

fluentbit小巧轻量,适合作为agent收集日志;fluentd插件丰富,功能强大,适合对日志进行集中处理,二者可以独立使用,也可以共同使用。

整体架构

file

fluent-operator有三种工作模式:

  • fluentbit-only:如果只需要收集日志并将日志发送到最终目的地,那么可以使用fluentbit。
  • fluentbit+fluentd:如果还需要对收集到的日志执行一些高级处理或将其发送到更多的接收器,那么您还需要fluentd。
  • fluentd-only:如果需要通过HTTP或Syslog等网络接收日志,然后处理并将日志发送到最终接收器,那么只需要fluentd。

fluentbit是daemonset,fluentd是statefulset。

CRD

fluent-operator为fluentbit和fluent分别定义了两个group:fluentbit.fluent.io 和 fluentd.fluent.io。

fluentbit.fluent.io

  • Fluentbit 定义了Fluent Bit 的属性,比如镜像版本、污点、亲和性等参数。
  • ClusterFluentbitConfig 定义了Fluent Bit 的配置文件。
  • ClusterInput 定义了Fluent Bit 的 input 插件,即输入插件。通过该插件,用户可以自定义采集何种日志。
  • ClusterFilter 定义了Fluent Bit 的 filter 插件,该插件主要负责过滤以及处理 fluentbit 采集到的信息。
  • ClusterParser 定义了Fluent Bit 的 parser 插件,该插件主要负责解析日志信息,可以将日志信息解析为其他格式。
  • ClusterOutput 定义了Fluent Bit 的 output 插件,该插件主要负责将处理后的日志信息转发到目的地。

fluentd.fluent.io

  • Fluentd 定义了 Fluentd 的属性,比如镜像版本、污点、亲和性等参数。
  • ClusterFluentdConfig 定义了 Fluentd 集群级别的配置文件。
  • FluentdConfig 定义了 Fluentd 的 namespace 范围的配置文件。
  • ClusterFilter 定义了 Fluentd 集群范围的 filter 插件,该插件主要负责过滤以及处理 Fluentd 采集到的信息。如果安装了 Fluent Bit,则可以更进一步的处理日志信息。
  • Filter CRD 该 定义了 Fluentd namespace 的 filter 插件,该插件主要负责过滤以及处理 Fluentd 采集到的信息。如果安装了 Fluent Bit,则可以更进一步的处理日志信息。
  • ClusterOutput 定义了 Fluentd 的集群范围的 output 插件,该插件主要负责将处理后的日志信息转发到目的地。
  • Output 定义了 Fluentd 的 namespace 范围的 output 插件,该插件主要负责将处理后的日志信息转发到目的地。

工作流

每个ClusterInput、ClusterParser、ClusterFilter、ClusterOutput代表一个FluentBit配置部分,由ClusterFluentBitConfig通过标签选择器选择。Fluent Operator监视这些对象,构造最终配置,最后创建一个Secret来存储将挂载到Fluent Bit DaemonSet中的配置。整个工作流程如下所示:

file

fluent-operator 的pod中包含了一个fluentbit、fluentd watcher,他可以监控fluentbit和fluentd的配置,当配置发生改变时,会自动热重启,而不用重启进程。

file

部署fluent-operator

部署方式有yaml和helm两种,我这里采用的是helm安装。

# kubectl apply -f https://raw.githubusercontent.com/fluent/fluent-operator/release-1.7/manifests/setup/setup.yaml
helm install fluent-operator --create-namespace -n fluent https://github.com/fluent/fluent-operator/releases/download/v1.7.0/fluent-operator.tgz
# 我这里的cri是docker,如果你是1.22以上的k8s,使用的containerd,那使用下面的命令
helm repo add elasticsearch https://helm.elastic.co
helm install fluent-operator --create-namespace -n fluent charts/fluent-operator/  --set containerRuntime=containerd

修改valuse文件,可以选择是否安装fluentd,输出到es等。我这里都使用的是false,后面手动安装。安装结果如下:

部署fluentbit/fluentd

参考文档:https://github.com/kubesphere-sigs/fluent-operator-walkthrough#how-to-check-the-configuration-and-data

部署好fluent-operator后,接着需要部署logging-stack,即日志的输出端,这里介绍es,loki,kafka。部署方式如下:

# Setup a Kafka cluster in the kafka namespace
./deploy-kafka.sh

# Setup an Elasticsearch cluster in the elastic namespace
# run 'export INSTALL_HELM=yes' first if helm is not installed
./deploy-es.sh

# Setup Loki
./deploy-loki.sh

插件

部署fluentbit/fluentd前还需要先介绍一下他们各自支持的插件,因为除了fluentbit/fluentd
外其他的crd实际上就是一个个的插件。每个插件的格式请参照下面的官方文档。

fluentbit

https://github.com/fluent/fluent-operator/blob/master/docs/plugins/fluentbit/index.md

fluentd

https://github.com/fluent/fluent-operator/blob/master/docs/plugins/fluentd/index.md

fluentbit-only

收集kubelet日志输出到es

cat <<EOF | kubectl apply -f -
apiVersion: fluentbit.fluent.io/v1alpha2
kind: FluentBit
metadata:
  name: fluent-bit
  namespace: fluent
  labels:
    app.kubernetes.io/name: fluent-bit
spec:
  image: kubesphere/fluent-bit:v1.8.11
  positionDB:
    hostPath:
      path: /var/lib/fluent-bit/
  resources:
    requests:
      cpu: 10m
      memory: 25Mi
    limits:
      cpu: 500m
      memory: 200Mi
  fluentBitConfigName: fluent-bit-only-config
  tolerations:
    - operator: Exists
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFluentBitConfig
metadata:
  name: fluent-bit-only-config
  labels:
    app.kubernetes.io/name: fluent-bit
spec:
  service:
    parsersFile: parsers.conf
  inputSelector:
    matchLabels:
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "fluentbit-only"
  filterSelector:
    matchLabels:
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "fluentbit-only"
  outputSelector:
    matchLabels:
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "fluentbit-only"
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterInput
metadata:
  name: kubelet
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "fluentbit-only"
spec:
  systemd:
    tag: service.kubelet
    path: /var/log/journal
    db: /fluent-bit/tail/kubelet.db
    dbSync: Normal
    systemdFilter:
      - _SYSTEMD_UNIT=kubelet.service
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFilter
metadata:
  name: systemd
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "fluentbit-only"
spec:
  match: service.*
  filters:
  - lua:
      script:
        key: systemd.lua
        name: fluent-bit-lua
      call: add_time
      timeAsTable: true
---
apiVersion: v1
data:
  systemd.lua: |
    function add_time(tag, timestamp, record)
      new_record = {}
      timeStr = os.date("!*t", timestamp["sec"])
      t = string.format("%4d-%02d-%02dT%02d:%02d:%02d.%sZ", timeStr["year"], timeStr["month"], timeStr["day"], timeStr["hour"], timeStr["min"], timeStr["sec"], timestamp["nsec"])
      kubernetes = {}
      kubernetes["pod_name"] = record["_HOSTNAME"]
      kubernetes["container_name"] = record["SYSLOG_IDENTIFIER"]
      kubernetes["namespace_name"] = "kube-system"
      new_record["time"] = t
      new_record["log"] = record["MESSAGE"]
      new_record["kubernetes"] = kubernetes
      return 1, timestamp, new_record
    end
kind: ConfigMap
metadata:
  labels:
    app.kubernetes.io/component: operator
    app.kubernetes.io/name: fluent-bit-lua
  name: fluent-bit-lua
  namespace: fluent
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
  name: es
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "fluentbit-only"
spec:
  matchRegex: (?:kube|service)\.(.*)
  es:
    host: elasticsearch-master.elastic.svc
    port: 9200
    generateID: true
    logstashPrefix: fluent-log-fb-only
    logstashFormat: true
    timeKey: "@timestamp"  
EOF

查看安装的cr

kubectl -n fluent get daemonset
kubectl -n fluent get fluentbit
kubectl -n fluent get clusterfluentbitconfig
kubectl -n fluent get clusterinput.fluentbit.fluent.io
kubectl -n fluent get clusterfilter.fluentbit.fluent.io
kubectl -n fluent get clusteroutput.fluentbit.fluent.io

收集k8s日志输出到es。loki,kafka

cat <<EOF | kubectl apply -f -
apiVersion: fluentbit.fluent.io/v1alpha2
kind: FluentBit
metadata:
  name: fluent-bit
  namespace: fluent
  labels:
    app.kubernetes.io/name: fluent-bit
spec:
  image: kubesphere/fluent-bit:v1.8.11
  positionDB:
    hostPath:
      path: /var/lib/fluent-bit/
  resources:
    requests:
      cpu: 10m
      memory: 25Mi
    limits:
      cpu: 500m
      memory: 200Mi
  fluentBitConfigName: fluent-bit-config
  tolerations:
    - operator: Exists
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFluentBitConfig
metadata:
  name: fluent-bit-config
  labels:
    app.kubernetes.io/name: fluent-bit
spec:
  service:
    parsersFile: parsers.conf
  inputSelector:
    matchLabels:
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "k8s"
  filterSelector:
    matchLabels:
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "k8s"
  outputSelector:
    matchLabels:
      fluentbit.fluent.io/enabled: "true"
      fluentbit.fluent.io/mode: "k8s"
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterInput
metadata:
  name: tail
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "k8s"
spec:
  tail:
    tag: kube.*
    path: /var/log/containers/*.log
    # Exclude logs from util pod
    excludePath: /var/log/containers/utils_default_utils-*.log
    parser: docker   #If it is a containerd environment, then this item should be set to cri
    refreshIntervalSeconds: 10
    memBufLimit: 5MB
    skipLongLines: true
    db: /fluent-bit/tail/pos.db
    dbSync: Normal
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFilter
metadata:
  name: kubernetes
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "k8s"
spec:
  match: kube.*
  filters:
  - kubernetes:
      kubeURL: https://kubernetes.default.svc:443
      kubeCAFile: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
      kubeTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token
      labels: false
      annotations: false
  - nest:
      operation: lift
      nestedUnder: kubernetes
      addPrefix: kubernetes_
  - modify:
      rules:
      - remove: stream
      - remove: kubernetes_pod_id
      - remove: kubernetes_host
      - remove: kubernetes_container_hash
  - nest:
      operation: nest
      wildcard:
      - kubernetes_*
      nestUnder: kubernetes
      removePrefix: kubernetes_
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
  name: kafka
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "k8s"
spec:
  matchRegex: (?:kube|service)\.(.*)
  kafka:
    brokers: my-cluster-kafka-brokers.kafka.svc:9092
    topics: fluent-log
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
  name: k8s-app-es
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "k8s"
spec:
  matchRegex: (?:kube|service)\.(.*)
  es:
    host: elasticsearch-master.elastic.svc
    port: 9200
    generateID: true
    logstashPrefix: fluent-app-log-fb-only
    logstashFormat: true
    timeKey: "@timestamp"
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
  name: k8s-app-loki
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "k8s"
spec:
  matchRegex: (?:kube|service)\.(.*)
  loki:
    host: loki.loki.svc
    labels:
      - job=fluentbit
EOF

Filter过滤过程

参考地址:https://zhuanlan.zhihu.com/p/425167977

查看fluentbit配置

kubectl -n elastic get secrets fluent-bit-config -ojson | jq '.data."fluent-bit.conf"' | awk -F '"' '{printf $2}' | base64 --decode

fluentbit+fluentd

实际上就是把输出端选择插件forward,转发给fluentd.ns.svc。

fluentbit-output配置

cat <<EOF | kubectl apply -f -
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
  name: fluentd
  labels:
    fluentbit.fluent.io/enabled: "true"
    fluentbit.fluent.io/mode: "k8s"
spec:
  matchRegex: (?:kube|service)\.(.*)
  forward:
    host: fluentd.fluent.svc
    port: 24224
EOF

fluentd接收配置

cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: Fluentd
metadata:
  name: fluentd
  namespace: fluent
  labels:
    app.kubernetes.io/name: fluentd
spec:
  globalInputs:
  - forward: 
      bind: 0.0.0.0
      port: 24224
  replicas: 1
  image: kubesphere/fluentd:v1.14.6
  fluentdCfgSelector: 
    matchLabels:
      config.fluentd.fluent.io/enabled: "true"
# 支持buffer
  buffer:
    hostPath:
      path: "/var/log/fluentd-buffer"

# 需要修改buffer目录权限,否则fluentd会报错没有权限访问。
chmod 777 /var/log/fluentd-buffer
EOF

fluentd配置

cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFluentdConfig
metadata:
  name: cluster-fluentd-config
  labels:
    config.fluentd.fluent.io/enabled: "true"
spec:
  watchedNamespaces: 
  - kube-system
  - default
  - kafka
  - elastic
  - fluent
  clusterOutputSelector:
    matchLabels:
      output.fluentd.fluent.io/scope: "cluster"
      output.fluentd.fluent.io/enabled: "true"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterOutput
metadata:
  name: cluster-fluentd-output-es
  labels:
    output.fluentd.fluent.io/scope: "cluster"
    output.fluentd.fluent.io/enabled: "true"
spec: 
  outputs: 
  - elasticsearch:
      host: elasticsearch-master.elastic.svc
      port: 9200
      logstashFormat: true
      logstashPrefix: fluent-log-cluster-fd
    buffer:
      type: file
      path: /buffers/es_buffer
EOF

查看cr

kubectl -n fluent get statefulset
kubectl -n fluent get fluentd
kubectl -n fluent get clusterfluentdconfig.fluentd.fluent.io
kubectl -n fluent get clusteroutput.fluentd.fluent.io

查看fluentd配置

kubectl -n fluent get secrets fluentd-config -ojson | jq '.data."app.conf"' | awk -F '"' '{printf $2}' | base64 --decode 

file

查看es中的数据

kubectl -n elastic exec -it elasticsearch-master-0 -c elasticsearch --  curl -X GET "localhost:9200/fluent-log*/_search?pretty" -H 'Content-Type: application/json' -d '{
   "size" : 0,
   "aggs" : {
      "kubernetes_ns": {
         "terms" : {
           "field": "kubernetes.namespace_name.keyword"
         }
      }
   }
}'

kubectl -n elastic exec -it elasticsearch-master-0 -c elasticsearch -- curl 'localhost:9200/_cat/indices?v'

file

file

其他配置

上面的配置是集群cluster级别的,fluentd还支持namespace级别的,即FluentdConfig指定namespace。

fluentd还支持多租户。即在FluentdConfig,Output/ClusterOutput的labels中加上output.fluentd.fluent.io/user: "user1"

fluentd-only

使用fluentd接收http日志输出到标准输出

cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: Fluentd
metadata:
  name: fluentd-http
  namespace: fluent
  labels:
    app.kubernetes.io/name: fluentd
spec:
  globalInputs:
    - http: 
        bind: 0.0.0.0
        port: 9880
  replicas: 1
  image: kubesphere/fluentd:v1.14.4
  fluentdCfgSelector: 
    matchLabels:
      config.fluentd.fluent.io/mode: "fluentd-only" 
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: FluentdConfig
metadata:
  name: fluentd-only-config
  namespace: fluent
  labels:
    config.fluentd.fluent.io/mode: "fluentd-only" 
spec:
  outputSelector:
    matchLabels:
      output.fluentd.fluent.io/mode: "fluentd-only"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: Output
metadata:
  name: fluentd-only-stdout
  namespace: fluent
  labels:
    output.fluentd.fluent.io/mode: "fluentd-only"
spec: 
  outputs: 
  - stdout: {}
EOF

发送测试日志

curl -X POST -d 'json={"foo":"bar"}' http://fluentd-http.fluent.svc:9880/app.log

1.es报错:illegal_argument_exception,Action/metadata line [1] contains an unknown parameter [_type]

loggin-stack中部署的es版本是8.5.1。

file

index _type这个字段已经在es8版本以上已经被删掉了,所以无法识别。参考地址:https://elasticsearch.cn/article/14504

解决

将es版本替换为7.x即可。

helm search repo elasticsearch/elasticsearch -l

file

2.fluentbit无法解析域名

报错信息如下:

failed to flush chunk "xxx.fib",retry in 10 seconds,task_id=49,input=tail.1 > output=forward.0(out_id=0)
chunk "xxx.fib" cannot be retried,task_id=7,input=tail.1 > output=forward.0
connection #134 to fluentd.elastic.svc:24224 timed out after 10 seconds
getaddrinfo(host='fluentd.elastic.svc', err=12): Timeout while contacting DNS servers

file

解决

相关issue:https://github.com/fluent/fluent-bit/issues/4260

file

查看ClusterOutput crd,没有配置ndt.dns.mode=TCP的地方,修改dnsproxy也没有明显效果。

      dnsPolicy: none
      dnsConfig:
        nameservers:
        - 10.43.0.10
        searchs:
        - elastic.svc.cluster.local
        - svc.cluster.local
        - cluster.local
        - localdomain
        options:
        - name: ndots
          values: "2"

替换fluentbit镜像到最新。

file

查看fluenbit日志恢复正常。

file

3.fluentd输出到有密码的es,fluent-operator报错

相关问题我已经提交了issue:https://github.com/fluent/fluent-operator/issues/485
输出到没有密码的es就没有报错,不知道是我的环境问题还是哪里,等待回复。

0 0 投票数
文章评分
订阅评论
提醒
guest

0 评论
最旧
最新 最多投票
内联反馈
查看所有评论

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部
0
希望看到您的想法,请您发表评论x