简介
官方地址:https://github.com/fluent/fluent-operator
fluent OperatorF包括fluentbit和fluentd。虽然fluentbit和fluentd都可以收集、处理(解析和过滤)并将日志转发到最终目的地,但它们也有各自的优势。
fluentbit小巧轻量,适合作为agent收集日志;fluentd插件丰富,功能强大,适合对日志进行集中处理,二者可以独立使用,也可以共同使用。
整体架构
fluent-operator有三种工作模式:
- fluentbit-only:如果只需要收集日志并将日志发送到最终目的地,那么可以使用fluentbit。
- fluentbit+fluentd:如果还需要对收集到的日志执行一些高级处理或将其发送到更多的接收器,那么您还需要fluentd。
- fluentd-only:如果需要通过HTTP或Syslog等网络接收日志,然后处理并将日志发送到最终接收器,那么只需要fluentd。
fluentbit是daemonset,fluentd是statefulset。
CRD
fluent-operator为fluentbit和fluent分别定义了两个group:fluentbit.fluent.io 和 fluentd.fluent.io。
fluentbit.fluent.io
- Fluentbit 定义了Fluent Bit 的属性,比如镜像版本、污点、亲和性等参数。
- ClusterFluentbitConfig 定义了Fluent Bit 的配置文件。
- ClusterInput 定义了Fluent Bit 的 input 插件,即输入插件。通过该插件,用户可以自定义采集何种日志。
- ClusterFilter 定义了Fluent Bit 的 filter 插件,该插件主要负责过滤以及处理 fluentbit 采集到的信息。
- ClusterParser 定义了Fluent Bit 的 parser 插件,该插件主要负责解析日志信息,可以将日志信息解析为其他格式。
- ClusterOutput 定义了Fluent Bit 的 output 插件,该插件主要负责将处理后的日志信息转发到目的地。
fluentd.fluent.io
- Fluentd 定义了 Fluentd 的属性,比如镜像版本、污点、亲和性等参数。
- ClusterFluentdConfig 定义了 Fluentd 集群级别的配置文件。
- FluentdConfig 定义了 Fluentd 的 namespace 范围的配置文件。
- ClusterFilter 定义了 Fluentd 集群范围的 filter 插件,该插件主要负责过滤以及处理 Fluentd 采集到的信息。如果安装了 Fluent Bit,则可以更进一步的处理日志信息。
- Filter CRD 该 定义了 Fluentd namespace 的 filter 插件,该插件主要负责过滤以及处理 Fluentd 采集到的信息。如果安装了 Fluent Bit,则可以更进一步的处理日志信息。
- ClusterOutput 定义了 Fluentd 的集群范围的 output 插件,该插件主要负责将处理后的日志信息转发到目的地。
- Output 定义了 Fluentd 的 namespace 范围的 output 插件,该插件主要负责将处理后的日志信息转发到目的地。
工作流
每个ClusterInput、ClusterParser、ClusterFilter、ClusterOutput代表一个FluentBit配置部分,由ClusterFluentBitConfig通过标签选择器选择。Fluent Operator监视这些对象,构造最终配置,最后创建一个Secret来存储将挂载到Fluent Bit DaemonSet中的配置。整个工作流程如下所示:
fluent-operator 的pod中包含了一个fluentbit、fluentd watcher,他可以监控fluentbit和fluentd的配置,当配置发生改变时,会自动热重启,而不用重启进程。
部署fluent-operator
部署方式有yaml和helm两种,我这里采用的是helm安装。
# kubectl apply -f https://raw.githubusercontent.com/fluent/fluent-operator/release-1.7/manifests/setup/setup.yaml
helm install fluent-operator --create-namespace -n fluent https://github.com/fluent/fluent-operator/releases/download/v1.7.0/fluent-operator.tgz
# 我这里的cri是docker,如果你是1.22以上的k8s,使用的containerd,那使用下面的命令
helm repo add elasticsearch https://helm.elastic.co
helm install fluent-operator --create-namespace -n fluent charts/fluent-operator/ --set containerRuntime=containerd
修改valuse文件,可以选择是否安装fluentd,输出到es等。我这里都使用的是false,后面手动安装。安装结果如下:
部署fluentbit/fluentd
部署好fluent-operator后,接着需要部署logging-stack,即日志的输出端,这里介绍es,loki,kafka。部署方式如下:
# Setup a Kafka cluster in the kafka namespace
./deploy-kafka.sh
# Setup an Elasticsearch cluster in the elastic namespace
# run 'export INSTALL_HELM=yes' first if helm is not installed
./deploy-es.sh
# Setup Loki
./deploy-loki.sh
插件
部署fluentbit/fluentd前还需要先介绍一下他们各自支持的插件,因为除了fluentbit/fluentd
外其他的crd实际上就是一个个的插件。每个插件的格式请参照下面的官方文档。
fluentbit
https://github.com/fluent/fluent-operator/blob/master/docs/plugins/fluentbit/index.md
fluentd
https://github.com/fluent/fluent-operator/blob/master/docs/plugins/fluentd/index.md
fluentbit-only
收集kubelet日志输出到es
cat <<EOF | kubectl apply -f -
apiVersion: fluentbit.fluent.io/v1alpha2
kind: FluentBit
metadata:
name: fluent-bit
namespace: fluent
labels:
app.kubernetes.io/name: fluent-bit
spec:
image: kubesphere/fluent-bit:v1.8.11
positionDB:
hostPath:
path: /var/lib/fluent-bit/
resources:
requests:
cpu: 10m
memory: 25Mi
limits:
cpu: 500m
memory: 200Mi
fluentBitConfigName: fluent-bit-only-config
tolerations:
- operator: Exists
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFluentBitConfig
metadata:
name: fluent-bit-only-config
labels:
app.kubernetes.io/name: fluent-bit
spec:
service:
parsersFile: parsers.conf
inputSelector:
matchLabels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "fluentbit-only"
filterSelector:
matchLabels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "fluentbit-only"
outputSelector:
matchLabels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "fluentbit-only"
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterInput
metadata:
name: kubelet
labels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "fluentbit-only"
spec:
systemd:
tag: service.kubelet
path: /var/log/journal
db: /fluent-bit/tail/kubelet.db
dbSync: Normal
systemdFilter:
- _SYSTEMD_UNIT=kubelet.service
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFilter
metadata:
name: systemd
labels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "fluentbit-only"
spec:
match: service.*
filters:
- lua:
script:
key: systemd.lua
name: fluent-bit-lua
call: add_time
timeAsTable: true
---
apiVersion: v1
data:
systemd.lua: |
function add_time(tag, timestamp, record)
new_record = {}
timeStr = os.date("!*t", timestamp["sec"])
t = string.format("%4d-%02d-%02dT%02d:%02d:%02d.%sZ", timeStr["year"], timeStr["month"], timeStr["day"], timeStr["hour"], timeStr["min"], timeStr["sec"], timestamp["nsec"])
kubernetes = {}
kubernetes["pod_name"] = record["_HOSTNAME"]
kubernetes["container_name"] = record["SYSLOG_IDENTIFIER"]
kubernetes["namespace_name"] = "kube-system"
new_record["time"] = t
new_record["log"] = record["MESSAGE"]
new_record["kubernetes"] = kubernetes
return 1, timestamp, new_record
end
kind: ConfigMap
metadata:
labels:
app.kubernetes.io/component: operator
app.kubernetes.io/name: fluent-bit-lua
name: fluent-bit-lua
namespace: fluent
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
name: es
labels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "fluentbit-only"
spec:
matchRegex: (?:kube|service)\.(.*)
es:
host: elasticsearch-master.elastic.svc
port: 9200
generateID: true
logstashPrefix: fluent-log-fb-only
logstashFormat: true
timeKey: "@timestamp"
EOF
查看安装的cr
kubectl -n fluent get daemonset
kubectl -n fluent get fluentbit
kubectl -n fluent get clusterfluentbitconfig
kubectl -n fluent get clusterinput.fluentbit.fluent.io
kubectl -n fluent get clusterfilter.fluentbit.fluent.io
kubectl -n fluent get clusteroutput.fluentbit.fluent.io
收集k8s日志输出到es。loki,kafka
cat <<EOF | kubectl apply -f -
apiVersion: fluentbit.fluent.io/v1alpha2
kind: FluentBit
metadata:
name: fluent-bit
namespace: fluent
labels:
app.kubernetes.io/name: fluent-bit
spec:
image: kubesphere/fluent-bit:v1.8.11
positionDB:
hostPath:
path: /var/lib/fluent-bit/
resources:
requests:
cpu: 10m
memory: 25Mi
limits:
cpu: 500m
memory: 200Mi
fluentBitConfigName: fluent-bit-config
tolerations:
- operator: Exists
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFluentBitConfig
metadata:
name: fluent-bit-config
labels:
app.kubernetes.io/name: fluent-bit
spec:
service:
parsersFile: parsers.conf
inputSelector:
matchLabels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "k8s"
filterSelector:
matchLabels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "k8s"
outputSelector:
matchLabels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "k8s"
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterInput
metadata:
name: tail
labels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "k8s"
spec:
tail:
tag: kube.*
path: /var/log/containers/*.log
# Exclude logs from util pod
excludePath: /var/log/containers/utils_default_utils-*.log
parser: docker #If it is a containerd environment, then this item should be set to cri
refreshIntervalSeconds: 10
memBufLimit: 5MB
skipLongLines: true
db: /fluent-bit/tail/pos.db
dbSync: Normal
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterFilter
metadata:
name: kubernetes
labels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "k8s"
spec:
match: kube.*
filters:
- kubernetes:
kubeURL: https://kubernetes.default.svc:443
kubeCAFile: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
kubeTokenFile: /var/run/secrets/kubernetes.io/serviceaccount/token
labels: false
annotations: false
- nest:
operation: lift
nestedUnder: kubernetes
addPrefix: kubernetes_
- modify:
rules:
- remove: stream
- remove: kubernetes_pod_id
- remove: kubernetes_host
- remove: kubernetes_container_hash
- nest:
operation: nest
wildcard:
- kubernetes_*
nestUnder: kubernetes
removePrefix: kubernetes_
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
name: kafka
labels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "k8s"
spec:
matchRegex: (?:kube|service)\.(.*)
kafka:
brokers: my-cluster-kafka-brokers.kafka.svc:9092
topics: fluent-log
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
name: k8s-app-es
labels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "k8s"
spec:
matchRegex: (?:kube|service)\.(.*)
es:
host: elasticsearch-master.elastic.svc
port: 9200
generateID: true
logstashPrefix: fluent-app-log-fb-only
logstashFormat: true
timeKey: "@timestamp"
---
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
name: k8s-app-loki
labels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "k8s"
spec:
matchRegex: (?:kube|service)\.(.*)
loki:
host: loki.loki.svc
labels:
- job=fluentbit
EOF
Filter过滤过程
参考地址:https://zhuanlan.zhihu.com/p/425167977
查看fluentbit配置
kubectl -n elastic get secrets fluent-bit-config -ojson | jq '.data."fluent-bit.conf"' | awk -F '"' '{printf $2}' | base64 --decode
fluentbit+fluentd
实际上就是把输出端选择插件forward,转发给fluentd.ns.svc。
fluentbit-output配置
cat <<EOF | kubectl apply -f -
apiVersion: fluentbit.fluent.io/v1alpha2
kind: ClusterOutput
metadata:
name: fluentd
labels:
fluentbit.fluent.io/enabled: "true"
fluentbit.fluent.io/mode: "k8s"
spec:
matchRegex: (?:kube|service)\.(.*)
forward:
host: fluentd.fluent.svc
port: 24224
EOF
fluentd接收配置
cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: Fluentd
metadata:
name: fluentd
namespace: fluent
labels:
app.kubernetes.io/name: fluentd
spec:
globalInputs:
- forward:
bind: 0.0.0.0
port: 24224
replicas: 1
image: kubesphere/fluentd:v1.14.6
fluentdCfgSelector:
matchLabels:
config.fluentd.fluent.io/enabled: "true"
# 支持buffer
buffer:
hostPath:
path: "/var/log/fluentd-buffer"
# 需要修改buffer目录权限,否则fluentd会报错没有权限访问。
chmod 777 /var/log/fluentd-buffer
EOF
fluentd配置
cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterFluentdConfig
metadata:
name: cluster-fluentd-config
labels:
config.fluentd.fluent.io/enabled: "true"
spec:
watchedNamespaces:
- kube-system
- default
- kafka
- elastic
- fluent
clusterOutputSelector:
matchLabels:
output.fluentd.fluent.io/scope: "cluster"
output.fluentd.fluent.io/enabled: "true"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterOutput
metadata:
name: cluster-fluentd-output-es
labels:
output.fluentd.fluent.io/scope: "cluster"
output.fluentd.fluent.io/enabled: "true"
spec:
outputs:
- elasticsearch:
host: elasticsearch-master.elastic.svc
port: 9200
logstashFormat: true
logstashPrefix: fluent-log-cluster-fd
buffer:
type: file
path: /buffers/es_buffer
EOF
查看cr
kubectl -n fluent get statefulset
kubectl -n fluent get fluentd
kubectl -n fluent get clusterfluentdconfig.fluentd.fluent.io
kubectl -n fluent get clusteroutput.fluentd.fluent.io
查看fluentd配置
kubectl -n fluent get secrets fluentd-config -ojson | jq '.data."app.conf"' | awk -F '"' '{printf $2}' | base64 --decode
查看es中的数据
kubectl -n elastic exec -it elasticsearch-master-0 -c elasticsearch -- curl -X GET "localhost:9200/fluent-log*/_search?pretty" -H 'Content-Type: application/json' -d '{
"size" : 0,
"aggs" : {
"kubernetes_ns": {
"terms" : {
"field": "kubernetes.namespace_name.keyword"
}
}
}
}'
kubectl -n elastic exec -it elasticsearch-master-0 -c elasticsearch -- curl 'localhost:9200/_cat/indices?v'
其他配置
上面的配置是集群cluster级别的,fluentd还支持namespace级别的,即FluentdConfig指定namespace。
fluentd还支持多租户。即在FluentdConfig,Output/ClusterOutput的labels中加上output.fluentd.fluent.io/user: "user1"
fluentd-only
使用fluentd接收http日志输出到标准输出
cat <<EOF | kubectl apply -f -
apiVersion: fluentd.fluent.io/v1alpha1
kind: Fluentd
metadata:
name: fluentd-http
namespace: fluent
labels:
app.kubernetes.io/name: fluentd
spec:
globalInputs:
- http:
bind: 0.0.0.0
port: 9880
replicas: 1
image: kubesphere/fluentd:v1.14.4
fluentdCfgSelector:
matchLabels:
config.fluentd.fluent.io/mode: "fluentd-only"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: FluentdConfig
metadata:
name: fluentd-only-config
namespace: fluent
labels:
config.fluentd.fluent.io/mode: "fluentd-only"
spec:
outputSelector:
matchLabels:
output.fluentd.fluent.io/mode: "fluentd-only"
---
apiVersion: fluentd.fluent.io/v1alpha1
kind: Output
metadata:
name: fluentd-only-stdout
namespace: fluent
labels:
output.fluentd.fluent.io/mode: "fluentd-only"
spec:
outputs:
- stdout: {}
EOF
发送测试日志
curl -X POST -d 'json={"foo":"bar"}' http://fluentd-http.fluent.svc:9880/app.log
坑
1.es报错:illegal_argument_exception,Action/metadata line [1] contains an unknown parameter [_type]
loggin-stack中部署的es版本是8.5.1。
而index _type
这个字段已经在es8版本以上已经被删掉了,所以无法识别。参考地址:https://elasticsearch.cn/article/14504
解决
将es版本替换为7.x即可。
helm search repo elasticsearch/elasticsearch -l
2.fluentbit无法解析域名
报错信息如下:
failed to flush chunk "xxx.fib",retry in 10 seconds,task_id=49,input=tail.1 > output=forward.0(out_id=0)
chunk "xxx.fib" cannot be retried,task_id=7,input=tail.1 > output=forward.0
connection #134 to fluentd.elastic.svc:24224 timed out after 10 seconds
getaddrinfo(host='fluentd.elastic.svc', err=12): Timeout while contacting DNS servers
解决
相关issue:https://github.com/fluent/fluent-bit/issues/4260
查看ClusterOutput crd,没有配置ndt.dns.mode=TCP
的地方,修改dnsproxy也没有明显效果。
dnsPolicy: none
dnsConfig:
nameservers:
- 10.43.0.10
searchs:
- elastic.svc.cluster.local
- svc.cluster.local
- cluster.local
- localdomain
options:
- name: ndots
values: "2"
替换fluentbit镜像到最新。
查看fluenbit日志恢复正常。
3.fluentd输出到有密码的es,fluent-operator报错
相关问题我已经提交了issue:https://github.com/fluent/fluent-operator/issues/485
输出到没有密码的es就没有报错,不知道是我的环境问题还是哪里,等待回复。