使用go管理es 7.17.3

安装客户端

es的go客户端库有github.com/elastic/go-elasticsearch/v7,这个是官方提供的。包的地址是:https://pkg.go.dev/github.com/elastic/go-elasticsearch/v7

github.com/olivere/elastic/v7,这个是由Olivere E. Gilson维护的客户端。包的地址是:https://pkg.go.dev/github.com/olivere/elastic/v7 后者应用的较多,所以这里使用后者。

go get -u github.com/olivere/elastic/v7

file

创建客户端测试连接

使用client类型中的NewClient方法:

func NewClient(options ...ClientOptionFunc) (*Client, error)

代码如下:

package init

import (
    "context"
    "fmt"
    "github.com/olivere/elastic/v7"
    "log"
    "os"
)

// es连接地址
var esurl = "http://127.0.0.1:9200"

func initEs() *elastic.Client {
    client, err := elastic.NewClient(
        elastic.SetURL(esurl),
        // es用户名密码
        elastic.SetBasicAuth("elastic", "xxxxxxx"),
        // 禁用嗅探,即不探测es集群节点变化
        elastic.SetSniff(false),
        // 设置错误日志输出
        elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
        // 设置info日志输出
        elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)),
    )
    if err != nil {
        log.Fatal("es连接失败:", err)
    }

// ping es地址,得到返回码,err和es服务器的一些信息。
    info, code, err := client.Ping(esurl).Do(context.Background())
    if err != nil {
        log.Fatal("ping失败:", err)
    }
    fmt.Printf("es返回码是: %d 版本号是:%s\n", code, info.Version.Number)
    fmt.Println("es连接成功!")
    esClient = client
    return esClient
}

file

添加索引

索引名为user,数据类型包括name,age,address,id。使用的是client类型中的CreateIndex方法。索引副本数为0,分片数为1。

func (c *Client) CreateIndex(name string) *IndicesCreateService

代码如下:

package main

import (
    "context"
    conn "elastic/init"
    "fmt"
)

const mapping = `
{
    "settings":{
      "number_of_shards": 1,
      "number_of_replicas": 0 
   },
    "mappings": {
        "properties": {
            "name": {
                "type": "keyword"
            },
            "age": {
                "type": "long"
            },
            "address": {
                "type": "text"
            },
            "id": {
                "type": "long"
            }
        }
    }
}
`

func AddIndex(index string) {
    client, nil := conn.InitEs()
    exists, err := client.IndexExists(index).Do(context.Background())
    if err != nil {
        log.Fatal(err)
    }

    if exists {
        fmt.Println("索引已存在")
        return
    }

    if !exists {
        createIndex, err := client.CreateIndex(index).BodyString(mapping).Do(context.Background())
        if err != nil {
            fmt.Println("创建索引失败:", err)
        }
        if !createIndex.Acknowledged {
            return
        }
        fmt.Println("创建索引成功:", createIndex)
    }
}

func main() {
    AddIndex("user")
}

file

再次调用。

file

查看索引。

file

插入数据

使用的是client类型中的Index方法:

func (c *Client) Index() *IndexService
type IndexService struct {
    // contains filtered or unexported fields
}
func (s *IndexService) BodyJson(body interface{}) *IndexService
func (s *IndexService) BodyString(body string) *IndexService
func (s *IndexService) Do(ctx context.Context) (*IndexResponse, error)
func (s *IndexService) Id(id string) *IndexService
func (s *IndexService) Index(index string) *IndexService

在上面的代码中继续添加:

package main

import (
    "context"
    conn "elastic/init"
    "fmt"
    "log"
)

var index = "user"
var client = conn.InitEs()

const mapping = `
{
    "settings":{
      "number_of_shards": 1,
      "number_of_replicas": 0 
   },
    "mappings": {
        "properties": {
            "name": {
                "type": "keyword"
            },
            "age": {
                "type": "long"
            },
            "address": {
                "type": "text"
            },
            "id": {
                "type": "long"
            }
        }
    }
}
`

type User struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
    Addr string `json:"address"`
    Id   int    `json:"id"`
}

func AddIndex(index string) {
    exists, err := client.IndexExists(index).Do(context.Background())
    if err != nil {
        log.Fatal(err)
        //return true, nil
    }

    if exists {
        fmt.Println("索引已存在")
        return
    }

    if !exists {
        createIndex, err := client.CreateIndex(index).BodyString(mapping).Do(context.Background())
        if err != nil {
            fmt.Println("创建索引失败:", err)
            //return false, nil
        }
        if !createIndex.Acknowledged {
            return
        }
        fmt.Println("创建索引成功:", createIndex)
    }
}

func AddData(index string, data User) {
    // 使用json格式
    res, err := client.Index().Index(index).Id("1").BodyJson(data).Do(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("添加数据成功", res)
}

func AddDataString(index string, data string) {
    // 使用string格式
    res, err := client.Index().Index(index).Id("2").BodyString(data).Do(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("添加数据成功", res)
}

func main() {
    AddIndex(index)
    data := User{
        Name: "user1",
        Age:  18,
        Addr: "hz",
        Id:   1,
    }
    AddData(index, data)
    data2 := `{"name":"user2", "age":20, "address":"hz", "id":2}`
    AddDataString(index, data2)
}

file
file

查看索引数据。

file

查询数据

elastic包有多种查询方式,

  • BoolQuery组合查询
    func NewBoolQuery() *BoolQuery
    type BoolQuery struct {
    Query
    // contains filtered or unexported fields
    }
    func (q *BoolQuery) Filter(filters ...Query) *BoolQuery 条件过滤
    func (q *BoolQuery) Must(queries ...Query) *BoolQuery 相当于and
    func (q *BoolQuery) MustNot(queries ...Query) *BoolQuery 相当于!and
    func (q *BoolQuery) Should(queries ...Query) *BoolQuery 相当于or
    func (q *BoolQuery) Source() (interface{}, error)
  • TermQuery精确查询
    func NewTermQuery(name string, value interface{}) *TermQuery
    func (q *TermSuggester) Field(field string) *TermSuggester
    func (q *TermSuggester) Name() string
    func (q *TermSuggester) Size(size int) *TermSuggester
    func (q *TermSuggester) Size(size int) *TermSuggester
  • MatchQuery匹配查询
    func NewMatchQuery(name string, text interface{}) *MatchQuery
    type MatchQuery struct {
    // contains filtered or unexported fields
    }
  • MatchQuery范围查询
    func NewRangeQuery(name string) *RangeQuery
    type RangeQuery struct {
    // contains filtered or unexported fields
    }
    func (q *RangeQuery) From(from interface{}) *RangeQuery
    func (q *RangeQuery) Gt(from interface{}) *RangeQuery >
    func (q *RangeQuery) Gte(from interface{}) *RangeQuery ≥
    func (q *RangeQuery) Lt(to interface{}) *RangeQuery <
    func (q *RangeQuery) Lte(to interface{}) *RangeQuery ≤
  • reflect
    https://pkg.go.dev/reflect
package main

import (
    "context"
    conn "elastic/init"
    "fmt"
    elastic "github.com/olivere/elastic/v7"
    "log"
    "reflect"
)

var index = "user"
var client = conn.InitEs()

type User struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
    Addr string `json:"address"`
    Id   int    `json:"id"`
}

func search() {
    boolQuery := elastic.NewBoolQuery().Must()
    nameQuery := elastic.NewTermQuery("name", "user3")
    //ageQuery := elastic.NewMatchQuery("age", 20)
    idQuery := elastic.NewRangeQuery("id").Gte(1)
    boolSearch := elastic.NewBoolQuery().
        Filter(nameQuery).
        //Filter(ageQuery).
        Filter(idQuery)

    boolQuery.Must(nameQuery, idQuery)

    res, err := client.Search().
        Index(index).
        Query(boolSearch).
        From(0).
        Size(10).
        Pretty(true).
        Do(context.Background())
    if err != nil {
        log.Fatal(err)
    }

    total := res.TotalHits()
    fmt.Printf("Found %d results\n", total)
    if total > 0 {
        printQueryInfo(res, err)
    } else {
        fmt.Println("no result")
    }
}

func printQueryInfo(res *elastic.SearchResult, err error) {
    if err != nil {
        log.Fatal(err)
    }
    var typ User
    for _, item := range res.Each(reflect.TypeOf(typ)) {
        if q, ok := item.(User); ok {
            fmt.Printf("Found a user: %+v\n", q)
        }
    }
}

func main() {
    search()
}

file

更新数据

可以复用插入数据的代码,id不变,修改索引的值即可更新数据。也可以使用UpdateByQuery方法。

type UpdateByQueryService struct {
    // contains filtered or unexported fields
}
func NewUpdateByQueryService(client *Client) *UpdateByQueryService
func (s *UpdateByQueryService) Index(index ...string) *UpdateByQueryService
func (s *UpdateByQueryService) Query(query Query) *UpdateByQueryService
func (s *UpdateByQueryService) Script(script *Script) *UpdateByQueryService

func (s *UpdateByQueryService) Do(ctx context.Context) (*BulkIndexByScrollResponse, error)
func (r *BulkResponse) Updated() []*BulkResponseItem
type BulkIndexByScrollResponse struct {
    Header           http.Header `json:"-"`
    Took             int64       `json:"took"`
    SliceId          *int64      `json:"slice_id,omitempty"`
    TimedOut         bool        `json:"timed_out"`
    Total            int64       `json:"total"`
    Updated          int64       `json:"updated,omitempty"`
    Created          int64       `json:"created,omitempty"`
    Deleted          int64       `json:"deleted"`
    Batches          int64       `json:"batches"`
    VersionConflicts int64       `json:"version_conflicts"`
    Noops            int64       `json:"noops"`
    Retries          struct {
        Bulk   int64 `json:"bulk"`
        Search int64 `json:"search"`
    } `json:"retries,omitempty"`
    Throttled            string                             `json:"throttled"`
    ThrottledMillis      int64                              `json:"throttled_millis"`
    RequestsPerSecond    float64                            `json:"requests_per_second"`
    Canceled             string                             `json:"canceled,omitempty"`
    ThrottledUntil       string                             `json:"throttled_until"`
    ThrottledUntilMillis int64                              `json:"throttled_until_millis"`
    Failures             []bulkIndexByScrollResponseFailure `json:"failures"`
}

type Script struct {
    // contains filtered or unexported fields
}
func NewScript(script string) *Script

代码如下:

package main

import (
    "context"
    conn "elastic/init"
    "fmt"
    "github.com/olivere/elastic/v7"
    "log"
)

var index = "user"
var client = conn.InitEs()

func updateData() {
    query := elastic.NewTermQuery("name", "user3")
    updateQuery := client.UpdateByQuery(index).Query(query)

    script := elastic.NewScript("ctx._source.age = ctx._source.age + 1")
    updateQuery = updateQuery.Script(script)

    res, err := updateQuery.Do(context.Background())
    if err != nil {
        log.Fatal(err)
    }

    if res.Updated > 0 {
        fmt.Println("更新数据成功")
    } else {
        fmt.Println("更新数据失败")
    }
}

func main() {
    updateData()
}

age从22变为了23。

file

删除数据

使用的是DeleteByQuery方法

func NewDeleteByQueryService(client *Client) *DeleteByQueryService
type DeleteByQueryService struct {
    // contains filtered or unexported fields
}
func (s *DeleteByQueryService) Do(ctx context.Context) (*BulkIndexByScrollResponse, error)
func (s *DeleteByQueryService) Index(index ...string) *DeleteByQueryService
func (s *DeleteByQueryService) Query(query Query) *DeleteByQueryService
func (s *DeleteByQueryService) Do(ctx context.Context) (*BulkIndexByScrollResponse, error)

代码如下:

package main

import (
    "context"
    conn "elastic/init"
    "fmt"
    "github.com/olivere/elastic/v7"
    "log"
)

var index = "user"
var client = conn.InitEs()

func deleteData() {
    query := elastic.NewBoolQuery().Filter(elastic.NewTermQuery("name", "user3"))
    res, err := client.DeleteByQuery(index).Query(query).Do(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    if res.Deleted > 0 {
        fmt.Println("删除数据成功")
        fmt.Println("删除的数量为:", res.Total)
    } else {
        fmt.Println("删除数据失败")
    }
}

func main() {
    deleteData()
}

file

file

0 0 投票数
文章评分
订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论

相关文章

开始在上面输入您的搜索词,然后按回车进行搜索。按ESC取消。

返回顶部
0
希望看到您的想法,请您发表评论x