Context and Cancellation of goroutines 笔记

原文 http://dahernan.github.io/2015/02/04/context-and-cancellation-of-goroutines/

假设我有一个程序执行一个长时间运行的函数,在这种情况下工作,我们在一个单独的goroutine中运行它

package main

import (
    "fmt"
    "sync"
    "time"

)

var (
    wg sync.WaitGroup
)

func work() error {
    defer wg.Done()

    for i := 0; i < 1000; i++ {
        select {
        case <-time.After(2 * time.Second):
            fmt.Println("Doing some work ", i)
        }
    }
    return nil
}


func main() {
    fmt.Println("Hey, I'm going to do some work")

    wg.Add(1)
    go work()
    wg.Wait()

    fmt.Println("Finished. I'm going home")
}

输出

$ go run work.go
Hey, I'm going to do some work
Doing some work  0
Doing some work  1
Doing some work  2
Doing some work  3
...
Doing some work  999
Finished. I'm going home

如果是用户在交互或者http请求中调用work,我们可能不想一直等待goroutine结束,这个时候需要设置超时机制,类似这样

package main

import (
    "fmt"
    "log"
    "time"
)

func work() error {
    for i := 0; i < 1000; i++ {
        select {
        case <-time.After(2 * time.Second):
            fmt.Println("Doing some work ", i)
        }
    }
    return nil
}

func main() {
    fmt.Println("Hey, I'm going to do some work")

    ch := make(chan error, 1)
    go func() {
        ch <- work()
    }()

    select {
    case err := <-ch:
        if err != nil {
            log.Fatal("Something went wrong :(", err)
        }
    case <-time.After(4 * time.Second):
        fmt.Println("Life is to short to wait that long")
    }

    fmt.Println("Finished. I'm going home")
}

输出

$ go run work.go
Hey, I'm going to do some work
Doing some work  0
Doing some work  1
Life is to short to wait that long
Finished. I'm going home

现在,会好一点,因为main执行时如果超时不需要等待work

但它有一个问题,如果我的程序仍在运行,例如Web服务器,即使我不等待函数work工作完成,它将继续运行并消耗资源,所以我需要一种方法取消那个goroutine

对于goroutine的取消,我们可以使用context包。我们必须将函数更改为接受context.Context类型的参数,按照惯例,它通常是第一个参数

package main

import (
    "fmt"
    "sync"
    "time"

    "golang.org/x/net/context"
)

var (
    wg sync.WaitGroup
)

func work(ctx context.Context) error {
    defer wg.Done()

    for i := 0; i < 1000; i++ {
        select {
        case <-time.After(2 * time.Second):
            fmt.Println("Doing some work ", i)

        // we received the signal of cancelation in this channel    
        case <-ctx.Done():
            fmt.Println("Cancel the context ", i)
            return ctx.Err()
        }
    }
    return nil
}

func main() {   
    ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
    defer cancel()

    fmt.Println("Hey, I'm going to do some work")

    wg.Add(1)
    go work(ctx)
    wg.Wait()

    fmt.Println("Finished. I'm going home")
}

输出

$ go run work.go
Hey, I'm going to do some work
Doing some work  0
Cancel the context  1
Finished. I'm going home

现在,超时后,work函数会结束,不会继续消耗资源

这些例子很好地学习了基础知识,但让我们试着让它更真实。现在work函数将向服务器发出http请求,服务器将成为另一个程序:

package main

// Lazy and Very Random Server 
import (
    "fmt"
    "math/rand"
    "net/http"
    "time"
)

func main() {
    http.HandleFunc("/", LazyServer)
    http.ListenAndServe(":1111", nil)
}

// sometimes really fast server, sometimes really slow server
func LazyServer(w http.ResponseWriter, req *http.Request) {
    headOrTails := rand.Intn(2)

    if headOrTails == 0 {
        time.Sleep(6 * time.Second)
        fmt.Fprintf(w, "Go! slow %v", headOrTails)
        fmt.Printf("Go! slow %v", headOrTails)
        return
    }

    fmt.Fprintf(w, "Go! quick %v", headOrTails)
    fmt.Printf("Go! quick %v", headOrTails)
    return
}

curl 测试

$ curl http://localhost:1111/
Go! quick 1
$ curl http://localhost:1111/
Go! quick 1
$ curl http://localhost:1111/
*some seconds later*
Go! slow 0

因此,我们将在goroutine中向此服务器发出http请求,但如果服务器速度很慢,我们将取消请求并快速返回

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "sync"
    "time"

    "golang.org/x/net/context"
)

var (
    wg sync.WaitGroup
)

// main is not changed
func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    fmt.Println("Hey, I'm going to do some work")

    wg.Add(1)
    go work(ctx)
    wg.Wait()

    fmt.Println("Finished. I'm going home")

}

func work(ctx context.Context) error {
    defer wg.Done()

    tr := &http.Transport{}
    client := &http.Client{Transport: tr}

    // anonymous struct to pack and unpack data in the channel
    c := make(chan struct {
        r   *http.Response
        err error
    }, 1)

    req, _ := http.NewRequest("GET", "http://localhost:1111", nil)
    go func() {
        resp, err := client.Do(req)
        fmt.Println("Doing http request is a hard job")
        pack := struct {
            r   *http.Response
            err error
        }{resp, err}
        c <- pack
    }()

    select {
    case <-ctx.Done():
        tr.CancelRequest(req)
        <-c // Wait for client.Do
        fmt.Println("Cancel the context")
        return ctx.Err()
    case ok := <-c:
        err := ok.err
        resp := ok.r
        if err != nil {
            fmt.Println("Error ", err)
            return err
        }

        defer resp.Body.Close()
        out, _ := ioutil.ReadAll(resp.Body)
        fmt.Printf("Server Response: %s\n", out)

    }
    return nil
}

输出

$ go run work.go
Hey, I'm going to do some work
Doing http request is a hard job
Server Response: Go! quick 1
Finished. I'm going home

$ go run work.go
Hey, I'm going to do some work
Doing http request is a hard job
Cancel the context
Finished. I'm going home
2018/8/25 posted in  golang

Avoiding Memory Leak in Golang API 笔记

原文 https://hackernoon.com/avoiding-memory-leak-in-golang-api-1843ef45fca8

使用Golang写API时如何避免内存泄漏 虽然Go有GC,但是依旧可能出现内存泄漏,或者某种资源泄漏。

1 自定义配置 http.Client

默认 client:=http.Client{} //default

自定义http.Transport

Note:

Transport:为http.RoundTripper接口,定义功能为负责http的请求分发。实际功能由结构体net/http/transport.go中的Transport struct继承并实现,除了请求发分还实现了对空闲连接的管理。如果创建client时不定义,就用系统默认配置。
from https://blog.csdn.net/kdpujie/article/details/73177179

RoundTripper 接口 抽象了一个具有执行一次 HTTP 事务, 从请求中获取 response的能力

需要重用连接,配置MaxIdleConns

keepAliveTimeout:= 600 * time.Second
timeout:= 2 * time.Second
defaultTransport := &http.Transport{
    Dial: (&net.Dialer{
                     KeepAlive: keepAliveTimeout,}
           ).Dial,
    MaxIdleConns: 100,
    MaxIdleConnsPerHost: 100,
}
client:= &http.Client{
           Transport: defaultTransport,
           Timeout:   timeout,
}

2 避免未关闭Response body造成内存泄漏

读取和关闭response body,即使不需要这些数据

req, err:= http.NewRequest("GET","http://example.com?q=one",nil)
if err != nil {
  return err
}
resp, err:= client.Do(req)
//=================================================
// CLOSE THE RESPONSE BODY
//=================================================
if resp != nil {
    defer resp.Body.Close() // MUST CLOSED THIS 
}
if err != nil {
  return err
}
//=================================================
// READ THE BODY EVEN THE DATA IS NOT IMPORTANT
// THIS MUST TO DO, TO AVOID MEMORY LEAK WHEN REUSING HTTP 
// CONNECTION
//=================================================
_, err = io.Copy(ioutil.Discard, resp.Body) // WE READ THE BODY
if err != nil { 
   return err
}

Golang channel 的超时控制

出现问题的示例代码

type sampleChannel struct{
  Data *Sample
  Err error
}

func (u *usecase) GetSample(id int64, someparam string, anotherParam string) ([]*Sample, error) {

    chanSample := make(chan sampleChannel, 3)
    wg := sync.WaitGroup{}

    wg.Add(1)
    go func() {
        defer wg.Done()
            chanSample <- u.getDataFromGoogle(id, anotherParam) // just example of function
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
            chanSample <- u.getDataFromFacebook(id, anotherParam)
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
            chanSample <- u.getDataFromTwitter(id,anotherParam)
    }()

    wg.Wait()
    close(chanSample)

    result := make([]*Sample, 0)

    for sampleItem := range chanSample {
        if sampleItem.Error != nil {
            logrus.Error(sampleItem.Err)
        }
        if sampleItem.Data == nil { 
            continue
        }
         result = append(result, sampleItem.Data)
    }


    return result
}

这个代码并发三个任务请求三个api,使用WaitGroup等待所有的goroutine结束
但是,当有一个服务出现问题时,就不会正常结束

解决一
通过添加超时来修正它,让用户不会等待这么久,他们只会得到内部服务器错误。


func (u *usecase) GetSample(id int64, someparam string, anotherParam string) ([]*Sample, error) {
    chanSample := make(chan sampleChannel, 3)
    defer close(chanSample)
    go func() {
            chanSample <- u.getDataFromGoogle(id, anotherParam) // just example of function
    }()

 
    go func() {
            chanSample <- u.getDataFromFacebook(id, anotherParam)
    }()

   
    go func() {
        chanSample <- u.getDataFromTwitter(id,anotherParam)
    }()
 

    result := make([]*feed.Feed, 0)
    timeout := time.After(time.Second * 2)
        for loop := 0; loop < 3; loop++ {
            select {
            case sampleItem := <-chanSample:
                if sampleItem.Err != nil {
                    logrus.Error(sampleItem.Err)
                    continue
                }
                if feedItem.Data == nil {
                    continue
                }
                 result = append(result,sampleItem.Data)
            case <-timeout:
                  err := fmt.Errorf("Timeout to get sample id: %d. ", id)
                  result = make([]*sample, 0)
                  return result, err
            }
        }

    return result, nil;
}

4 使用Context 控制超时

在完成第3阶段后,我们的问题仍未完全解决。我们的main API仍然消耗高CPU和内存。

原因是 在Internal Server Error 出现后,请求的 goroutine 还在执行

使用 context.Context 中止 goroutine

func (u *usecase) GetSample(c context.Context, id int64, someparam string, anotherParam string) ([]*Sample, error) {
    
  if c== nil {
    c= context.Background()
  }
  
  ctx, cancel := context.WithTimeout(c, time.Second * 2)
  defer cancel()
  
  chanSample := make(chan sampleChannel, 3)
  defer close(chanSample)
    go func() {
            chanSample <- u.getDataFromGoogle(ctx, id, anotherParam) // just example of function
    }()

 
    go func() {
            chanSample <- u.getDataFromFacebook(ctx, id, anotherParam)
    }()

   
    go func() {
        chanSample <- u.getDataFromTwitter(ctx, id,anotherParam)
    }()
 

    result := make([]*feed.Feed, 0)
    for loop := 0; loop < 3; loop++ {
      select {
        case sampleItem := <-chanSample:
            if sampleItem.Err != nil {
                continue
            }
            if feedItem.Data == nil {
                continue
            }
             result = append(result,sampleItem.Data)
          // ============================================================
          // CATCH IF THE CONTEXT ALREADY EXCEEDED THE TIMEOUT
          // FOR AVOID INCONSISTENT DATA, WE JUST SENT EMPTY ARRAY TO 
          // USER AND ERROR MESSAGE
          // ============================================================
            case <-ctx.Done(): // To get the notify signal that the context already exceeded the timeout
              err := fmt.Errorf("Timeout to get sample id: %d. ", id)
              result = make([]*sample, 0)
              return result, err
        }
    }

    return result, nil;
}

那么我们在代码中为每个gorotine调用使用Context。

通过context.WithTimeout方法得到ctx,然后创建一个request并使用Request.WithContext绑定它。当 ctx.Done() channel有数据时说明超时

func ( u *usecase) getDataFromFacebook(ctx context.Context, id int64, param string) sampleChanel{
 
  req,err := http.NewRequest("GET","https://facebook.com",nil)
  if err != nil {
    return sampleChannel{
      Err: err,
    }
  }
  // ============================================================
  // THEN WE PASS THE CONTEXT TO OUR REQUEST.
  // THIS FEATURE CAN BE USED FROM GO 1.7
  // ============================================================
  if ctx != nil {
    req = req.WithContext(ctx) // NOTICE THIS. WE ARE USING CONTEXT TO OUR HTTP CALL REQUEST
  }
  resp, err:= u.httpClient.Do(req)
  if err != nil {
    return sampleChannel{
      Err: err,
    }
  }
  body,err:= ioutils.ReadAll(resp.Body)
  if err!= nil {
    return sampleChannel{
      Err:err,
    }
    sample:= new(Sample)
    err:= json.Unmarshall(body,&sample)
    if err != nil {
      return sampleChannle{
        Err:err,
      }
    }
    return sampleChannel{
      Err:nil,
      Data:sample,
    }
  }

links

2018/8/25 posted in  golang

Golang http.RoundTripper 笔记

RoundTripper is an interface representing the ability to execute a single HTTP transaction, obtaining the Response for a given Request.

对于http客户端,可以使用不同的实现了 RoundTripper 接口的Transport实现来配置它的行为

RoundTripper 有点像 http.Client 的中间件

接口定义

type RoundTripper interface { 
       RoundTrip(*Request) (*Response, error)
}

需要实现RoundTrip函数

type SomeClient struct {}

func (s *SomeClient) RoundTrip(r *http.Request)(*Response, error) {
    //Something comes here...Maybe
}

场景

原文: https://lanre.wtf/blog/2017/07/24/roundtripper-go/

  • 缓存 responses,比如 app需要访问 Github api,获取 trending repos,这个数据变动不频繁,假设30分钟变动一次,你显然不希望每次都要点击api都要来请求Github api,解决这个问题的方法是实现这样的http.RoundTripper

    • 有缓存时从缓存取出response数据
    • 过期,数据通过重新请求api获取
  • 根据需要设置http header, 一个容易想到的例子go-github一个Github的 api的go客户端。某些github api不需要认证,有些需要认证则需要提供自己的http client,比如 ghinstallation,下面是ghinstallation 的 RoundTrip 函数实现,设置 Authorization 头

  • 限速(Rate limiting) 控制请求速率

实际的例子

实现http.RoundTripper 缓存 http response的逻辑。

一个http server的实现

import (
    "fmt"
    "net/http"
)

func main() {
    // server/main.go
    mux := http.NewServeMux()

    mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        // This is here so we can actually see that the responses that have been cached don't get here
        fmt.Println("The request actually got here")

        w.Write([]byte("You got here"))
    })

    http.ListenAndServe(":8000", mux)
}

http client中创建新的 http.Transport 实现 http.RoundTripper接口

主程序main实现
https://github.com/adelowo/rounder/blob/master/client/main.go

func main() {
    cachedTransport := newTransport()

    // cachedTransport 是自定义实现http.RoundTripper接口的 Transport
    client := &http.Client{
        Transport: cachedTransport,
        Timeout:   time.Second * 5,
    }

    // 每5秒清除缓存
    cacheClearTicker := time.NewTicker(time.Second * 5)

    //每秒请求一次,可以看出response是从缓存获取还是从服务器请求
    reqTicker := time.NewTicker(time.Second * 1)

    terminateChannel := make(chan os.Signal, 1)

    signal.Notify(terminateChannel, syscall.SIGTERM, syscall.SIGHUP)

    req, err := http.NewRequest(http.MethodGet, "http://localhost:8000", strings.NewReader(""))

    if err != nil {
        panic("Whoops")
    }

    for {
        select {
        case <-cacheClearTicker.C:
            // Clear the cache so we can hit the original server
            cachedTransport.Clear()

        case <-terminateChannel:
            cacheClearTicker.Stop()
            reqTicker.Stop()
            return

        case <-reqTicker.C:

            resp, err := client.Do(req)

            if err != nil {
                log.Printf("An error occurred.... %v", err)
                continue
            }

            buf, err := ioutil.ReadAll(resp.Body)

            if err != nil {
                log.Printf("An error occurred.... %v", err)
                continue
            }

            fmt.Printf("The body of the response is \"%s\" \n\n", string(buf))
        }
    }
}

cacheTransport 中 RoundTrip 函数实现读取缓存中的reponse

func (c *cacheTransport) RoundTrip(r *http.Request) (*http.Response, error) {

    // Check if we have the response cached..
    // If yes, we don't have to hit the server
    // We just return it as is from the cache store.
    if val, err := c.Get(r); err == nil {
        fmt.Println("Fetching the response from the cache")
        return cachedResponse([]byte(val), r)
    }

    // Ok, we don't have the response cached, the store was probably cleared.
    // Make the request to the server.
    resp, err := c.originalTransport.RoundTrip(r)

    if err != nil {
        return nil, err
    }

    // Get the body of the response so we can save it in the cache for the next request.
    buf, err := httputil.DumpResponse(resp, true)

    if err != nil {
        return nil, err
    }

    // Saving it to the cache store
    c.Set(r, string(buf))

    fmt.Println("Fetching the data from the real source")
    return resp, nil
}

运行结果

links:

2018/8/24 posted in  golang

Golang Rsyslog 笔记

Golang 标准库集成了 syslog 库

import "log/syslog"

ryslog中的 facility 和 priority

  • facility

    facility指定了产生日志消息的子系统,可选值为 auth , authpriv , cron , daemon , kern , lpr , mail , news , syslog , user , ftp , uucp , local0 ~ local7

  • priority

priority指定了日志消息的优先级,可用的优先级包含 debug (7) , info (6) , notice (5) , warning (4) , err (3) , crit (2) , alert (1) , emerg (0)

func New(priority Priority, tag string) (*Writer, error)

第一个参数是 priority, 第二个参数是tag, tag默认是 os.Args[0]

demo

package main

import (
        "log"
        "log/syslog"
)

func main() {
        w, err := syslog.New(syslog.LOG_INFO|syslog.LOG_LOCAL6, "test_golang")
        if err != nil {
                log.Fatal(err)
        }
        log.SetOutput(w)
        log.Println("this is a test message")
}

输出的 facility 为 syslog.LOG_LOCAL6 tag 为 test_golang, 其中tag 在rsyslog里就是 programname

rsyslog 配置

lidashuang@instance-2:~$ cat /etc/rsyslog.d/00-testgo.conf 
$template GLFile,"/data/log/%programname%.log"
$FileOwner syslog
$FileGroup syslog
$CreateDirs on
$DirCreateMode 0755
$FileCreateMode 0644
$RepeatedMsgReduction off
local6.* ?GLFile;RSYSLOG_SyslogProtocol23Format

& stop

匹配 syslog.LOG_LOCAL6 的日志,以RSYSLOG_SyslogProtocol23Format格式保存到 "/data/log/%programname%.log"

结果如下

文件名是 程序里的 tag 参数 test_golang

lidashuang@instance-2:~$ cat /data/log/test_golang.log 
<182>1 2018-01-26T11:41:13.505188+00:00 instance-2 test_golang 19376 - -  2018/01/26 11:41:13 this is a test message

其它:

2018/1/26 posted in  golang