安装客户端
es的go客户端库有github.com/elastic/go-elasticsearch/v7
,这个是官方提供的。包的地址是:https://pkg.go.dev/github.com/elastic/go-elasticsearch/v7
github.com/olivere/elastic/v7
,这个是由Olivere E. Gilson维护的客户端。包的地址是:https://pkg.go.dev/github.com/olivere/elastic/v7 后者应用的较多,所以这里使用后者。
go get -u github.com/olivere/elastic/v7
创建客户端测试连接
使用client类型中的NewClient方法:
func NewClient(options ...ClientOptionFunc) (*Client, error)
代码如下:
package init
import (
"context"
"fmt"
"github.com/olivere/elastic/v7"
"log"
"os"
)
// es连接地址
var esurl = "http://127.0.0.1:9200"
func initEs() *elastic.Client {
client, err := elastic.NewClient(
elastic.SetURL(esurl),
// es用户名密码
elastic.SetBasicAuth("elastic", "xxxxxxx"),
// 禁用嗅探,即不探测es集群节点变化
elastic.SetSniff(false),
// 设置错误日志输出
elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
// 设置info日志输出
elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
)
if err != nil {
log.Fatal("es连接失败:", err)
}
// ping es地址,得到返回码,err和es服务器的一些信息。
info, code, err := client.Ping(esurl).Do(context.Background())
if err != nil {
log.Fatal("ping失败:", err)
}
fmt.Printf("es返回码是: %d 版本号是:%s\n", code, info.Version.Number)
fmt.Println("es连接成功!")
esClient = client
return esClient
}
添加索引
索引名为user,数据类型包括name,age,address,id。使用的是client类型中的CreateIndex方法。索引副本数为0,分片数为1。
func (c *Client) CreateIndex(name string) *IndicesCreateService
代码如下:
package main
import (
"context"
conn "elastic/init"
"fmt"
)
const mapping = `
{
"settings":{
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"name": {
"type": "keyword"
},
"age": {
"type": "long"
},
"address": {
"type": "text"
},
"id": {
"type": "long"
}
}
}
}
`
func AddIndex(index string) {
client, nil := conn.InitEs()
exists, err := client.IndexExists(index).Do(context.Background())
if err != nil {
log.Fatal(err)
}
if exists {
fmt.Println("索引已存在")
return
}
if !exists {
createIndex, err := client.CreateIndex(index).BodyString(mapping).Do(context.Background())
if err != nil {
fmt.Println("创建索引失败:", err)
}
if !createIndex.Acknowledged {
return
}
fmt.Println("创建索引成功:", createIndex)
}
}
func main() {
AddIndex("user")
}
再次调用。
查看索引。
插入数据
使用的是client类型中的Index方法:
func (c *Client) Index() *IndexService
type IndexService struct {
// contains filtered or unexported fields
}
func (s *IndexService) BodyJson(body interface{}) *IndexService
func (s *IndexService) BodyString(body string) *IndexService
func (s *IndexService) Do(ctx context.Context) (*IndexResponse, error)
func (s *IndexService) Id(id string) *IndexService
func (s *IndexService) Index(index string) *IndexService
在上面的代码中继续添加:
package main
import (
"context"
conn "elastic/init"
"fmt"
"log"
)
var index = "user"
var client = conn.InitEs()
const mapping = `
{
"settings":{
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"name": {
"type": "keyword"
},
"age": {
"type": "long"
},
"address": {
"type": "text"
},
"id": {
"type": "long"
}
}
}
}
`
type User struct {
Name string `json:"name"`
Age int `json:"age"`
Addr string `json:"address"`
Id int `json:"id"`
}
func AddIndex(index string) {
exists, err := client.IndexExists(index).Do(context.Background())
if err != nil {
log.Fatal(err)
//return true, nil
}
if exists {
fmt.Println("索引已存在")
return
}
if !exists {
createIndex, err := client.CreateIndex(index).BodyString(mapping).Do(context.Background())
if err != nil {
fmt.Println("创建索引失败:", err)
//return false, nil
}
if !createIndex.Acknowledged {
return
}
fmt.Println("创建索引成功:", createIndex)
}
}
func AddData(index string, data User) {
// 使用json格式
res, err := client.Index().Index(index).Id("1").BodyJson(data).Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Println("添加数据成功", res)
}
func AddDataString(index string, data string) {
// 使用string格式
res, err := client.Index().Index(index).Id("2").BodyString(data).Do(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Println("添加数据成功", res)
}
func main() {
AddIndex(index)
data := User{
Name: "user1",
Age: 18,
Addr: "hz",
Id: 1,
}
AddData(index, data)
data2 := `{"name":"user2", "age":20, "address":"hz", "id":2}`
AddDataString(index, data2)
}
查看索引数据。
查询数据
elastic包有多种查询方式,
- BoolQuery组合查询
func NewBoolQuery() *BoolQuery type BoolQuery struct { Query // contains filtered or unexported fields } func (q *BoolQuery) Filter(filters ...Query) *BoolQuery 条件过滤 func (q *BoolQuery) Must(queries ...Query) *BoolQuery 相当于and func (q *BoolQuery) MustNot(queries ...Query) *BoolQuery 相当于!and func (q *BoolQuery) Should(queries ...Query) *BoolQuery 相当于or func (q *BoolQuery) Source() (interface{}, error)
- TermQuery精确查询
func NewTermQuery(name string, value interface{}) *TermQuery func (q *TermSuggester) Field(field string) *TermSuggester func (q *TermSuggester) Name() string func (q *TermSuggester) Size(size int) *TermSuggester func (q *TermSuggester) Size(size int) *TermSuggester
- MatchQuery匹配查询
func NewMatchQuery(name string, text interface{}) *MatchQuery type MatchQuery struct { // contains filtered or unexported fields }
- MatchQuery范围查询
func NewRangeQuery(name string) *RangeQuery type RangeQuery struct { // contains filtered or unexported fields } func (q *RangeQuery) From(from interface{}) *RangeQuery func (q *RangeQuery) Gt(from interface{}) *RangeQuery > func (q *RangeQuery) Gte(from interface{}) *RangeQuery ≥ func (q *RangeQuery) Lt(to interface{}) *RangeQuery < func (q *RangeQuery) Lte(to interface{}) *RangeQuery ≤
- reflect
https://pkg.go.dev/reflect
package main
import (
"context"
conn "elastic/init"
"fmt"
elastic "github.com/olivere/elastic/v7"
"log"
"reflect"
)
var index = "user"
var client = conn.InitEs()
type User struct {
Name string `json:"name"`
Age int `json:"age"`
Addr string `json:"address"`
Id int `json:"id"`
}
func search() {
boolQuery := elastic.NewBoolQuery().Must()
nameQuery := elastic.NewTermQuery("name", "user3")
//ageQuery := elastic.NewMatchQuery("age", 20)
idQuery := elastic.NewRangeQuery("id").Gte(1)
boolSearch := elastic.NewBoolQuery().
Filter(nameQuery).
//Filter(ageQuery).
Filter(idQuery)
boolQuery.Must(nameQuery, idQuery)
res, err := client.Search().
Index(index).
Query(boolSearch).
From(0).
Size(10).
Pretty(true).
Do(context.Background())
if err != nil {
log.Fatal(err)
}
total := res.TotalHits()
fmt.Printf("Found %d results\n", total)
if total > 0 {
printQueryInfo(res, err)
} else {
fmt.Println("no result")
}
}
func printQueryInfo(res *elastic.SearchResult, err error) {
if err != nil {
log.Fatal(err)
}
var typ User
for _, item := range res.Each(reflect.TypeOf(typ)) {
if q, ok := item.(User); ok {
fmt.Printf("Found a user: %+v\n", q)
}
}
}
func main() {
search()
}
更新数据
可以复用插入数据的代码,id不变,修改索引的值即可更新数据。也可以使用UpdateByQuery
方法。
type UpdateByQueryService struct {
// contains filtered or unexported fields
}
func NewUpdateByQueryService(client *Client) *UpdateByQueryService
func (s *UpdateByQueryService) Index(index ...string) *UpdateByQueryService
func (s *UpdateByQueryService) Query(query Query) *UpdateByQueryService
func (s *UpdateByQueryService) Script(script *Script) *UpdateByQueryService
func (s *UpdateByQueryService) Do(ctx context.Context) (*BulkIndexByScrollResponse, error)
func (r *BulkResponse) Updated() []*BulkResponseItem
type BulkIndexByScrollResponse struct {
Header http.Header `json:"-"`
Took int64 `json:"took"`
SliceId *int64 `json:"slice_id,omitempty"`
TimedOut bool `json:"timed_out"`
Total int64 `json:"total"`
Updated int64 `json:"updated,omitempty"`
Created int64 `json:"created,omitempty"`
Deleted int64 `json:"deleted"`
Batches int64 `json:"batches"`
VersionConflicts int64 `json:"version_conflicts"`
Noops int64 `json:"noops"`
Retries struct {
Bulk int64 `json:"bulk"`
Search int64 `json:"search"`
} `json:"retries,omitempty"`
Throttled string `json:"throttled"`
ThrottledMillis int64 `json:"throttled_millis"`
RequestsPerSecond float64 `json:"requests_per_second"`
Canceled string `json:"canceled,omitempty"`
ThrottledUntil string `json:"throttled_until"`
ThrottledUntilMillis int64 `json:"throttled_until_millis"`
Failures []bulkIndexByScrollResponseFailure `json:"failures"`
}
type Script struct {
// contains filtered or unexported fields
}
func NewScript(script string) *Script
代码如下:
package main
import (
"context"
conn "elastic/init"
"fmt"
"github.com/olivere/elastic/v7"
"log"
)
var index = "user"
var client = conn.InitEs()
func updateData() {
query := elastic.NewTermQuery("name", "user3")
updateQuery := client.UpdateByQuery(index).Query(query)
script := elastic.NewScript("ctx._source.age = ctx._source.age + 1")
updateQuery = updateQuery.Script(script)
res, err := updateQuery.Do(context.Background())
if err != nil {
log.Fatal(err)
}
if res.Updated > 0 {
fmt.Println("更新数据成功")
} else {
fmt.Println("更新数据失败")
}
}
func main() {
updateData()
}
age从22变为了23。
删除数据
使用的是DeleteByQuery方法
func NewDeleteByQueryService(client *Client) *DeleteByQueryService
type DeleteByQueryService struct {
// contains filtered or unexported fields
}
func (s *DeleteByQueryService) Do(ctx context.Context) (*BulkIndexByScrollResponse, error)
func (s *DeleteByQueryService) Index(index ...string) *DeleteByQueryService
func (s *DeleteByQueryService) Query(query Query) *DeleteByQueryService
func (s *DeleteByQueryService) Do(ctx context.Context) (*BulkIndexByScrollResponse, error)
代码如下:
package main
import (
"context"
conn "elastic/init"
"fmt"
"github.com/olivere/elastic/v7"
"log"
)
var index = "user"
var client = conn.InitEs()
func deleteData() {
query := elastic.NewBoolQuery().Filter(elastic.NewTermQuery("name", "user3"))
res, err := client.DeleteByQuery(index).Query(query).Do(context.Background())
if err != nil {
log.Fatal(err)
}
if res.Deleted > 0 {
fmt.Println("删除数据成功")
fmt.Println("删除的数量为:", res.Total)
} else {
fmt.Println("删除数据失败")
}
}
func main() {
deleteData()
}