Go常见架构模式的实现

实现pipe-filter framework

Pipe-Filter 模式:

  • ⾮常适合与数据处理及数据分析系统
  • Filter封装数据处理的功能
  • Pipe⽤于连接Filter传递数据或者在异步处理过程中缓冲数据流
  • 进程内同步调⽤时,pipe演变为数据在⽅法调⽤间传递
  • 松耦合:Filter只跟数据(格式)耦合

Filter和组合模式:

示例:

简单示例代码:

filter.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Package pipefilter is to define the interfaces and the structures for pipe-filter style implementation
package pipefilter

// Request is the input of the filter
type Request interface{}

// Response is the output of the filter
type Response interface{}

// Filter interface is the definition of the data processing components
// Pipe-Filter structure
type Filter interface {
Process(data Request) (Response, error)
}

split_filter.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package pipefilter

import (
"errors"
"strings"
)

var SplitFilterWrongFormatError = errors.New("input data should be string")

type SplitFilter struct {
delimiter string
}

func NewSplitFilter(delimiter string) *SplitFilter {
return &SplitFilter{delimiter}
}

func (sf *SplitFilter) Process(data Request) (Response, error) {
str, ok := data.(string) //检查数据格式/类型,是否可以处理
if !ok {
return nil, SplitFilterWrongFormatError
}
parts := strings.Split(str, sf.delimiter)
return parts, nil
}

split_filter_test.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package pipefilter

import (
"reflect"
"testing"
)

func TestStringSplit(t *testing.T) {
sf := NewSplitFilter(",")
resp, err := sf.Process("1,2,3")
if err != nil {
t.Fatal(err)
}
parts, ok := resp.([]string)
if !ok {
t.Fatalf("Repsonse type is %T, but the expected type is string", parts)
}
if !reflect.DeepEqual(parts, []string{"1", "2", "3"}) {
t.Errorf("Expected value is {\"1\",\"2\",\"3\"}, but actual is %v", parts)
}
}

func TestWrongInput(t *testing.T) {
sf := NewSplitFilter(",")
_, err := sf.Process(123)
if err == nil {
t.Fatal("An error is expected.")
}
}

实现micro-kernel framework

  • 特点

    • 易于扩展
    • 错误隔离
    • 保持架构⼀致性
  • 要点

  • 内核包含公共流程或通⽤逻辑

    • 将可变或可扩展部分规划为扩展点
  • 抽象扩展点⾏为,定义接⼝

    • 利⽤插件进⾏扩展

示例:

简单示例代码:

agent.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package microkernel

import (
"context"
"errors"
"fmt"
"strings"
"sync"
)

const (
Waiting = iota
Running
)

var WrongStateError = errors.New("can not take the operation in the current state")

type CollectorsError struct {
CollectorErrors []error
}

func (ce CollectorsError) Error() string {
var strs []string
for _, err := range ce.CollectorErrors {
strs = append(strs, err.Error())
}
return strings.Join(strs, ";")
}

type Event struct {
Source string
Content string
}

type EventReceiver interface {
OnEvent(evt Event)
}

type Collector interface {
Init(evtReceiver EventReceiver) error
Start(agtCtx context.Context) error
Stop() error
Destory() error
}

type Agent struct {
collectors map[string]Collector
evtBuf chan Event
cancel context.CancelFunc
ctx context.Context
state int
}

func (agt *Agent) EventProcessGroutine() {
var evtSeg [10]Event
for {
for i := 0; i < 10; i++ {
select {
case evtSeg[i] = <-agt.evtBuf:
case <-agt.ctx.Done():
return
}
}
fmt.Println(evtSeg)
}

}

func NewAgent(sizeEvtBuf int) *Agent {
agt := Agent{
collectors: map[string]Collector{},
evtBuf: make(chan Event, sizeEvtBuf),
state: Waiting,
}

return &agt
}

func (agt *Agent) RegisterCollector(name string, collector Collector) error {
if agt.state != Waiting {
return WrongStateError
}
agt.collectors[name] = collector
return collector.Init(agt)
}

func (agt *Agent) startCollectors() error {
var err error
var errs CollectorsError
var mutex sync.Mutex

for name, collector := range agt.collectors {
go func(name string, collector Collector, ctx context.Context) {
defer func() {
mutex.Unlock()
}()
err = collector.Start(ctx)
mutex.Lock()
if err != nil {
errs.CollectorErrors = append(errs.CollectorErrors,
errors.New(name+":"+err.Error()))
}
}(name, collector, agt.ctx)
}
if len(errs.CollectorErrors) == 0 {
return nil
}
return errs
}

func (agt *Agent) stopCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Stop(); err != nil {
errs.CollectorErrors = append(errs.CollectorErrors,
errors.New(name+":"+err.Error()))
}
}
if len(errs.CollectorErrors) == 0 {
return nil
}

return errs
}

func (agt *Agent) destoryCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Destory(); err != nil {
errs.CollectorErrors = append(errs.CollectorErrors,
errors.New(name+":"+err.Error()))
}
}
if len(errs.CollectorErrors) == 0 {
return nil
}
return errs
}

func (agt *Agent) Start() error {
if agt.state != Waiting {
return WrongStateError
}
agt.state = Running
agt.ctx, agt.cancel = context.WithCancel(context.Background())
go agt.EventProcessGroutine()
return agt.startCollectors()
}

func (agt *Agent) Stop() error {
if agt.state != Running {
return WrongStateError
}
agt.state = Waiting
agt.cancel()
return agt.stopCollectors()
}

func (agt *Agent) Destory() error {
if agt.state != Waiting {
return WrongStateError
}
return agt.destoryCollectors()
}

func (agt *Agent) OnEvent(evt Event) {
agt.evtBuf <- evt
}

agent_test.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package microkernel

import (
"context"
"errors"
"fmt"
"testing"
"time"
)

type DemoCollector struct {
evtReceiver EventReceiver
agtCtx context.Context
stopChan chan struct{}
name string
content string
}

func NewCollect(name string, content string) *DemoCollector {
return &DemoCollector{
stopChan: make(chan struct{}),
name: name,
content: content,
}
}

func (c *DemoCollector) Init(evtReceiver EventReceiver) error {
fmt.Println("initialize collector", c.name)
c.evtReceiver = evtReceiver
return nil
}

func (c *DemoCollector) Start(agtCtx context.Context) error {
fmt.Println("start collector", c.name)
for {
select {
case <-agtCtx.Done():
c.stopChan <- struct{}{}
break
default:
time.Sleep(time.Millisecond * 50)
c.evtReceiver.OnEvent(Event{c.name, c.content})
}
}
}

func (c *DemoCollector) Stop() error {
fmt.Println("stop collector", c.name)
select {
case <-c.stopChan:
return nil
case <-time.After(time.Second * 1):
return errors.New("failed to stop for timeout")
}
}

func (c *DemoCollector) Destory() error {
fmt.Println(c.name, "released resources.")
return nil
}

func TestAgent(t *testing.T) {
agt := NewAgent(100)
c1 := NewCollect("c1", "1")
c2 := NewCollect("c2", "2")
agt.RegisterCollector("c1", c1)
agt.RegisterCollector("c2", c2)
if err := agt.Start(); err != nil {
fmt.Printf("start error %v\n", err)
}
fmt.Println(agt.Start())
time.Sleep(time.Second * 1)
agt.Stop()
agt.Destory()
}

Go反射编程

reflect.TypeOf vs. reflect.ValueOf:

  • reflflect.TypeOf 返回类型 (reflflect.Type)
  • reflflect.ValueOf 返回值 (reflflect.Value)
  • 可以从 reflflect.Value 获得类型
  • 通过 kind 的来判断类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func CheckType(v interface{}) {
t := reflect.TypeOf(v)
switch t.Kind() {
case reflect.Float32, reflect.Float64:
fmt.Println("Float")
case reflect.Int, reflect.Int32, reflect.Int64:
fmt.Println("Integer")
default:
fmt.Println("Unknown", t)
}
}

func TestBasicType(t *testing.T) {
var f float64 = 12
CheckType(f)
/** 运行结果:
=== RUN TestBasicType
Float
--- PASS: TestBasicType (0.00s)
*/
}

利用反射编写灵活的代码:

  • 按名字访问结构的成员

    reflect.ValueOf(*e).FieldByName("Name")

  • 按名字访问结构的方法

    reflect.ValueOf(*e).MethodByName("UpdateAge").Call([]reflect.Value{reflect.ValueOf(1)})

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type Employee struct {
EmployeeId string
Name string `format:"normal"`
Age int
}

func (e *Employee) UpdateAge(newVal int) {
e.Age = newVal
}

func TestInvokeByName(t *testing.T) {
e := &Employee{"1", "Mike", 30}
// 按名字获取成员
t.Logf("Name:value(%[1]v),Type(%[1]T)", reflect.ValueOf(*e).FieldByName("Name"))
if nameField, ok := reflect.TypeOf(*e).FieldByName("Name"); !ok {
t.Error("Failed to get 'Name' field.")
} else {
t.Log("Tag:format", nameField.Tag.Get("format"))
}
reflect.ValueOf(e).MethodByName("UpdateAge").Call([]reflect.Value{reflect.ValueOf(1)})
t.Log("Updated Age:", e)
/** 运行结果:
=== RUN TestInvokeByName
TestInvokeByName: reflect_test.go:28: Name:value(Mike),Type(reflect.Value)
TestInvokeByName: reflect_test.go:32: Tag:format normal
TestInvokeByName: reflect_test.go:35: Updated Age: &{1 Mike 1}
--- PASS: TestInvokeByName (0.00s)
*/
}

Struct Tag:

1
2
3
4
type BasicInfo struct {
Name string `json:"name"`
Age int `json:"age"`
}

访问Struct:

1
2
3
4
if nameField, ok := reflect.TypeOf(*e).FieldByName("Name"); !ok {
t.Error("Failed to get 'Name' field.")
} else {
t.Log("Tag:format", nameField.Tag.Get("format")) }

Reflect.Type 和 Reflflect.Value 都有 FieldByName ⽅法,注意他们的区别。

DeepEqual:

比较切片和map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type Customer struct {
CookieID string
Name string
Age int
}

func TestDeepEqual(t *testing.T) {
a := map[int]string{1: "one", 2: "two", 3: "three"}
b := map[int]string{1: "one", 2: "two", 4: "three"}
fmt.Println(reflect.DeepEqual(a, b))

s1 := []int{1, 2, 3}
s2 := []int{1, 2, 3}
s3 := []int{2, 3, 1}
t.Log("s1 == s2?", reflect.DeepEqual(s1, s2))
t.Log("s1 == s3?", reflect.DeepEqual(s1, s3))

c1 := Customer{"1", "Mike", 40}
c2 := Customer{"1", "Mike", 40}

fmt.Println(reflect.DeepEqual(c1, c2))
/** 运行结果:
=== RUN TestDeepEqual
false
TestDeepEqual: fiexible_reflect_test.go:23: s1 == s2? true
TestDeepEqual: fiexible_reflect_test.go:24: s1 == s3? false
true
--- PASS: TestDeepEqual (0.00s)
*/
}

关于“反射”你应该知道的:

  • 提⾼了程序的灵活性

  • 降低了程序的可读性

  • 降低了程序的性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
type Employee struct {
EmployeeID string
Name string `format:"normal"`
Age int
}

func (e *Employee) UpdateAge(newVal int) {
e.Age = newVal
}

type Customer struct {
CookieID string
Name string
Age int
}

func fillBySettings(st interface{}, settings map[string]interface{}) error {

// func (v Value) Elem() Value
// Elem returns the value that the interface v contains or that the pointer v points to.
// It panics if v's Kind is not Interface or Ptr.
// It returns the zero Value if v is nil.

if reflect.TypeOf(st).Kind() != reflect.Ptr {
return errors.New("the first param should be a pointer to the struct type.")
}
// Elem() 获取指针指向的值
if reflect.TypeOf(st).Elem().Kind() != reflect.Struct {
return errors.New("the first param should be a pointer to the struct type.")
}

if settings == nil {
return errors.New("settings is nil.")
}

var (
field reflect.StructField
ok bool
)

for k, v := range settings {
if field, ok = (reflect.ValueOf(st)).Elem().Type().FieldByName(k); !ok {
continue
}
if field.Type == reflect.TypeOf(v) {
vstr := reflect.ValueOf(st)
vstr = vstr.Elem()
vstr.FieldByName(k).Set(reflect.ValueOf(v))
}

}
return nil
}

func TestFillNameAndAge(t *testing.T) {
settings := map[string]interface{}{"Name": "Mike", "Age": 30}
e := Employee{}
if err := fillBySettings(&e, settings); err != nil {
t.Fatal(err)
}
t.Log(e)
c := new(Customer)
if err := fillBySettings(c, settings); err != nil {
t.Fatal(err)
}
t.Log(*c)
/** 运行结果:
=== RUN TestFillNameAndAge
TestFillNameAndAge: fiexible_reflect_test.go:69: { Mike 30}
TestFillNameAndAge: fiexible_reflect_test.go:74: { Mike 30}
--- PASS: TestFillNameAndAge (0.00s)
*/
}

”不安全“行为的危险性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func TestUnsafe(t *testing.T) {
i := 10
f := *(*float64)(unsafe.Pointer(&i))
t.Log(unsafe.Pointer(&i))
t.Log(f)
/** 运行结果:
=== RUN TestUnsafe
TestUnsafe: unsafe_test.go:11: 0xc000016268
TestUnsafe: unsafe_test.go:12: 5e-323
--- PASS: TestUnsafe (0.00s)
*/
}

// The cases is suitable for unsafe
type MyInt int

// 合理的类型转换
func TestConvert(t *testing.T) {
a := []int{1, 2, 3, 4}
b := *(*[]MyInt)(unsafe.Pointer(&a))
t.Log(b)
/** 运行结果:
=== RUN TestConvert
TestConvert: unsafe_test.go:26: [1 2 3 4]
--- PASS: TestConvert (0.00s)
*/
}

// 原子类型操作
func TestAtomic(t *testing.T) {
var shareBuffer unsafe.Pointer
writeDataFn := func() {
data := []int{}
for i := 0; i < 100; i++ {
data = append(data, i)
}
atomic.StorePointer(&shareBuffer, unsafe.Pointer(&data))
}
readDataFn := func() {
data := atomic.LoadPointer(&shareBuffer)
fmt.Println(data, *(*[]int)(data))
}
var wg sync.WaitGroup
writeDataFn()
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for i := 0; i < 10; i++ {
writeDataFn()
time.Sleep(time.Microsecond * 100)
}
wg.Done()
}()
wg.Add(1)
go func() {
for i := 0; i < 10; i++ {
readDataFn()
time.Sleep(time.Microsecond * 100)
}
wg.Done()
}()
}
wg.Wait()
}

Go单元测试及Benchmark

单元测试

之前在刚开始写了如何编写测试程序

内置单元测试框架:

  • Fail, Error: 该测试失败,该测试继续,其他测试继续执⾏

  • FailNow, Fatal: 该测试失败,该测试中⽌,其他测试继续执⾏

  • 代码覆盖率

    go test -v -cover

  • 断言

    https://github.com/stretchr/testify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func TestErrorInCode(t *testing.T) {
fmt.Println("Start")
t.Error("Error")
fmt.Println("End")
/** 运行结果:
=== RUN TestErrorInCode
Start
TestErrorInCode: functions_test.go:25: Error
End
--- FAIL: TestErrorInCode (0.00s)
*/
}

func TestFatalInCode(t *testing.T) {
fmt.Println("Start")
t.Fatal("Error")
fmt.Println("End")
/** 运行结果:
=== RUN TestFatalInCode
Start
TestFatalInCode: functions_test.go:38: Error
--- FAIL: TestFatalInCode (0.00s)
*/
}

使用断言:

go get -u github.com/stretchr/testify

1
2
3
4
5
6
7
8
9
10
11
12
func square(op int) int {
return op * op
}

func TestSquareWithAssert(t *testing.T) {
inputs := [...]int{1, 2, 3}
expected := [...]int{1, 4, 9}
for i := 0; i < len(inputs); i++ {
ret := square(inputs[i])
assert.Equal(t, expected[i], ret)
}
}

Benchmark

文件名以下划线_benchmark结尾,方法名以Benchmark开头,参数为b *testing.B

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 利用+=连接
func TestConcatStringByAdd(t *testing.T) {
assert := assert.New(t)
elems := []string{"1", "2", "3", "4", "5"}
ret := ""
for _, elem := range elems {
ret += elem
}
assert.Equal("12345", ret)
}

// 利用buffer连接
func TestConcatStringBytesBuffer(t *testing.T) {
assert := assert.New(t)
var buf bytes.Buffer
elems := []string{"1", "2", "3", "4", "5"}
for _, elem := range elems {
buf.WriteString(elem)
}
assert.Equal("12345", buf.String())
}

func BenchmarkConcatStringByAdd(b *testing.B) {
elems := []string{"1", "2", "3", "4", "5"}
b.ResetTimer()
for i := 0; i < b.N; i++ {
ret := ""
for _, elem := range elems {
ret += elem
}
}
b.StopTimer()
}

func BenchmarkConcatStringBytesBuffer(b *testing.B) {
elems := []string{"1", "2", "3", "4", "5"}
b.ResetTimer()
for i := 0; i < b.N; i++ {
var buf bytes.Buffer
for _, elem := range elems {
buf.WriteString(elem)
}
}
}

在命令行输入 go test -bench=. -benchmem

Windows 下使⽤ go test 命令⾏时,-bench=.应写为-bench=”.”

运行结果:

1
2
3
4
5
6
7
8
$ go test -bench=. -benchmem
goos: darwin
goarch: amd64
pkg: eighteen/benchmark
BenchmarkConcatStringByAdd-8 8982729 130 ns/op 16 B/op 4 allocs/op
BenchmarkConcatStringBytesBuffer-8 17703706 64.9 ns/op 64 B/op 1 allocs/op
PASS
ok eighteen/benchmark 2.532s

使用 buffer 连接字符串的性能比 += 要好很多。

BDD

BDD in Go:

项⽬⽹站 :

https://github.com/smartystreets/goconvey

安装:

go get -u github.com/smartystreets/goconvey/convey

启动 WEB UI :

$GOPATH/bin/goconvey

1
2
3
4
5
6
7
8
9
10
11
12
func TestSpec(t *testing.T) {
convey.Convey("Given 2 even numbers", t, func() {
a := 2
b := 4
convey.Convey("When add the two numbers", func() {
c := a + b
convey.Convey("Then the result is still even", func() {
convey.So(c%2, convey.ShouldEqual, 0)
})
})
})
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ go test -v  bdd_spec_test.go 
=== RUN TestSpec

Given 2 even numbers
When add the two numbers
Then the result is still even ✔


1 total assertion

--- PASS: TestSpec (0.00s)
PASS
ok command-line-arguments 0.006s

可以看到最后一步为 ✔

Go典型并发任务

仅运行一次

最容易联想到的单例模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
type Singleton struct {
}

var singleInstance *Singleton
var once sync.Once

func GetSingletonObj() *Singleton {
once.Do(func() {
fmt.Println("Create Obj")
singleInstance = new(Singleton)
})
return singleInstance
}

func TestGetSingletonObj(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
obj := GetSingletonObj()
fmt.Printf("%x\n", unsafe.Pointer(obj))
wg.Done()
}()
}
wg.Wait()
/** 运行结果:
=== RUN TestGetSingletonObj
Create Obj
1269f78
1269f78
1269f78
1269f78
1269f78
1269f78
1269f78
1269f78
1269f78
1269f78
--- PASS: TestGetSingletonObj (0.00s)
*/
}

仅需任意任务完成

任务堆里面,只需任务一个完成就返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("the result is from %d", id)
}

func FirstResponse() string {
numOfRunner := 10
ch := make(chan string) // 非缓冲channel
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
return <-ch
}

func TestFirstResponse(t *testing.T) {
t.Log(FirstResponse())
/** 第一次运行结果:
=== RUN TestFirstResponse
TestFirstResponse: first_response_test.go:27: the result is from 0
--- PASS: TestFirstResponse (0.01s)
*/
/** 第二次运行结果:
=== RUN TestFirstResponse
TestFirstResponse: first_response_test.go:27: the result is from 3
--- PASS: TestFirstResponse (0.01s)
*/
}

因为协程的调度机制,所以返回结果不一样。

但这样是存在很大的问题,修改TestFirstResponse

1
2
3
4
5
6
7
8
9
10
11
12
13
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine()) // 获取协程数量
t.Log(FirstResponse())
time.Sleep(time.Second * 1)
t.Log("After:", runtime.NumGoroutine()) // 获取协程数量
/** 运行结果:
=== RUN TestFirstResponse
TestFirstResponse: first_response_test.go:28: Before: 2
TestFirstResponse: first_response_test.go:29: the result is from 6
TestFirstResponse: first_response_test.go:30: After: 11
--- PASS: TestFirstResponse (0.01s)
*/
}

因为使用的是非缓冲channelFirstResponse方法只取走了一次,往channel放入数据的时候,没有被取走,会造成阻塞。

修改非缓冲channel 为缓冲channel就行,否则会造成资源耗尽。

所有任务完成

之前都是用sync.waitGroup实现,这次利用csp机制实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("the result is from %d", id)
}

func AllResponse() string {
numOfRunner := 10
ch := make(chan string) // 非缓冲channel
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}

finalRet := ""
for i := 0; i < numOfRunner; i++ {
finalRet += <-ch + "\n"
}

return finalRet
}

func TestFirstResponse(t *testing.T) {
t.Log(AllResponse())
/** 运行结果:
=== RUN TestFirstResponse
TestFirstResponse: all_done_test.go:33: the result is from 9
the result is from 0
the result is from 2
the result is from 7
the result is from 4
the result is from 6
the result is from 1
the result is from 5
the result is from 8
the result is from 3

--- PASS: TestFirstResponse (0.01s)
*/
}

对象池

使用 buffered channel 实现对象池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
type ReusableObj struct {
}

type ObjPool struct {
bufChan chan *ReusableObj // 用于缓冲可重用对象
}

func NewObjPool(numOfObj int) *ObjPool {
objPool := ObjPool{}
objPool.bufChan = make(chan *ReusableObj, numOfObj)
// 提前建立好连接
for i := 0; i < numOfObj; i++ {
objPool.bufChan <- &ReusableObj{}
}
return &objPool
}

// 获取连接
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
select {
case ret := <-p.bufChan:
return ret, nil
case <-time.After(timeout): // 超时控制
return nil, errors.New("time out")
}
}

// 放入连接
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
select {
case p.bufChan <- obj:
return nil
default:
return errors.New("overflow")
}
}

func TestObjPool(t *testing.T) {
pool := NewObjPool(10) // 创建对象池

for i := 0; i < 11; i++ {
// 从对象池中获取
if v, err := pool.GetObj(time.Second * 1); err != nil {
t.Error(err)
} else {
fmt.Println(v)
// 放入对象池
if err := pool.ReleaseObj(v); err != nil {
t.Error(err)
}
}
}

fmt.Println("Done")
}

sync.Pool对象缓存

sync.Pool 对象获取:

  • 尝试从私有对象获取
  • 私有对象不存在,尝试从当前 Processor 的共享池获取
  • 如果当前 Processor 共享池也是空的,那么就尝试去其他 Processor 的共享池获取
  • 如果所有⼦池都是空的,最后就⽤⽤户指定的 New 函数,产⽣⼀个新的对象返回

sync.Pool 对象放回:

  • 如果私有对象不存在则保存为私有对象
  • 如果私有对象存在,放⼊当前 Processor ⼦池的共享池中

sync.Pool 对象生命周期:

  • GC 会清除 sync.Pool 缓存的对象
  • 对象的缓存有效期为下⼀次 GC 之前
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func TestSyncPool(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} {
fmt.Println("Create a new object.")
return 100
},
}

v := pool.Get().(int) // 从池中获取并断言类型
fmt.Println(v) // 100

pool.Put(3)
v1, _ := pool.Get().(int)
fmt.Println(v1) // 3

//在放进去个 2
pool.Put(2)
//为了验证生命周期 这里GC一下
runtime.GC()
v3, _ := pool.Get().(int)
fmt.Println(v3) // 100 而不是 2
/** 运行结果:
=== RUN TestSyncPool
Create a new object.
100
3
Create a new object.
100
--- PASS: TestSyncPool (0.00s)
*/
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func TestSyncPoolMultiGoroutine(t *testing.T) {
pool := sync.Pool{
New: func() interface{} {
fmt.Println("Create a new object.")
return 10
},
}

pool.Put(100)
pool.Put(100)
pool.Put(100)

var wg sync.WaitGroup

for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
t.Log(pool.Get())
wg.Done()
}()
}
wg.Wait()
/** 运行结果:
=== RUN TestSyncPoolMultiGoroutine
TestSyncPoolMultiGoroutine: sync_pool_test.go:59: 100
Create a new object.
TestSyncPoolMultiGoroutine: sync_pool_test.go:59: 10
Create a new object.
TestSyncPoolMultiGoroutine: sync_pool_test.go:59: 10
TestSyncPoolMultiGoroutine: sync_pool_test.go:59: 100
Create a new object.
Create a new object.
TestSyncPoolMultiGoroutine: sync_pool_test.go:59: 10
Create a new object.
Create a new object.
Create a new object.
TestSyncPoolMultiGoroutine: sync_pool_test.go:59: 100
TestSyncPoolMultiGoroutine: sync_pool_test.go:59: 10
TestSyncPoolMultiGoroutine: sync_pool_test.go:59: 10
TestSyncPoolMultiGoroutine: sync_pool_test.go:59: 10
TestSyncPoolMultiGoroutine: sync_pool_test.go:59: 10
--- PASS: TestSyncPoolMultiGoroutine (0.00s)
*/
}

sync.Pool 总结:

  • 适合于通过复用,降低复杂对象的创建和GC代价
  • 协程安全,会有锁的开销
  • 生命周期受GC影响,不适合于做连接池等,需自己管理生命周期的资源的池化

Go并发编程

协程机制

Thead vs. Groutine

  • 创建时默认的 stack 的大小
    • JDK5 以后的 Java Thread stack 默认为1M
    • Groutine 的 Stack 初始化大小为2k
  • 和 KSE(Kernel Space Entity)的对应关系
    • Java Thread 是 1:1
    • Groutine 是 M:N

GoGMP调度:

M:系统线程

P:Go实现的协程处理器

G:协程

从图中可看出,Processor 在不同的系统线程中,每个 Processor 都挂着准备运行的协程队列。

Processor 依次运行协程队列中的协程。

这时候问题就来了,假如一个协程运行的时间特别长,把整个 Processor 都占住了,那么在队列中的协程是不是就会被延迟的很久?

在Go启动的时候,会有一个守护线程来去做一个计数,计每个 Processor 运行完成的协程的数量,当一段时间内发现,某个 Processor 完成协程的数量没有发生变化的时候,就会往这个正在运行的协程任务栈插入一个特别的标记,协程在运行的时候遇到非内联函数,就会读到这个标记,就会把自己中断下来,然后插到这个等候协程队列的队尾,切换到别的协程进行运行。

当某一个协程被系统中断了,例如说 io 需要等待的时候,为了提高整体的并发,Processor 会把自己移到另一个可使用的系统线程当中,继续执行它所挂的协程队列,当这个被中断的协程被唤醒完成之后,会把自己加入到其中某个 Processor 的队列里,会加入到全局等待队列中。

当一个协程被中断的时候,它在寄存器里的运行状态也会保存到这个协程对象里,当协程再次获得运行状态的时候,重写写入寄存器,继续运行。

话不多说,直接上代码,如何在代码里启动一个协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func TestGroutine(t *testing.T) {
for i := 0; i < 10; i++ {
// int 参数
go func(i int) {
fmt.Println(i)
}(i) // 传入参数
}
// 有可能测试程序结束的非常快 加个等待
time.Sleep(time.Millisecond * 50)
/** 运行结果
=== RUN TestGroutine
1
4
5
2
6
3
0
8
9
7
--- PASS: TestGroutine (0.05s)
*/
}

共享内存并发机制

Lock

如果你是 Java 或者 C++ 程序员,那么以下代码非常常见,使用锁来进行并发控制(可惜我是个Phper🙈):

1
2
3
4
5
6
7
8
9
lock lock = ...;
lock.lock();
try{
// process (thread-safe)
}catch(Exception ex){

}finally{
lock.unlock();
}

同样Go也提供了这样的机 package sync:

Mutex 互斥锁

RWLock 读写锁

不使用锁的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func TestCounter(t *testing.T) {
counter := 0
for i := 0; i < 5000; i++ {
go func(i int) {
counter++
}(i)
}
time.Sleep(time.Second * 1)
t.Logf("counter = %d", counter)
/** 运行结果:
=== RUN TestCounter
TestCounter: share_memory_test.go:16: counter = 4627
--- PASS: TestCounter (1.01s)
*/
}

可以发现结果与预期结果不一样,这是因为 conuter 变量在不同的协程里面去做自增,导致了一个并发的竞争条件,传统意义来讲就是一个不是线程安全的程序。要保证线程安全,就要对共享的内存进行锁保护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func TestCounterThreadSafe(t *testing.T) {
var mut sync.Mutex
counter := 0
for i := 0; i < 5000; i++ {
go func(i int) {
// defer 释放锁
defer func() {
mut.Unlock()
}()
// 加锁
mut.Lock()
counter++
}(i)
}
time.Sleep(time.Second * 1)
t.Logf("counter = %d", counter)
/** 运行结果:
=== RUN TestCounterThreadSafe
TestCounterThreadSafe: share_memory_test.go:40: counter = 5000
--- PASS: TestCounterThreadSafe (1.01s)
*/
}

这次就得到了预期结果。

WaitGroup

等待所有协程完成,才能往下执行操作。

上面代码中,怕代码执行太快,所以加了 sleep

但我们无法控制这个 sleep 需要睡眠时间。

下面来用 WaitGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func TestCounterWaitGroup(t *testing.T) {
var mut sync.Mutex
var wg sync.WaitGroup
counter := 0
for i := 0; i < 5000; i++ {
wg.Add(1) // 增加一个要等待的协程
go func(i int) {
// defer 释放锁
defer func() {
mut.Unlock()
}()
// 加锁
mut.Lock()
counter++
wg.Done() // 一个协程完成了
}(i)
}
wg.Wait() // 等待所有添加的协程完成 才继续向下运行
t.Logf("counter = %d", counter)
/** 运行结果:
=== RUN TestCounterWaitGroup
TestCounterWaitGroup: share_memory_test.go:66: counter = 5000
--- PASS: TestCounterWaitGroup (0.00s)
*/
}

CSP并发机制

有人可能会说不就是 Actor Model

Actor Model

CSP vs. Actor

  • Actor 的直接通讯不同,CSP模式则是通过Channel进行通讯的,更松耦合一些。
  • Go中的channel是有容量限制并且独立于处理Groutine,而如ErlangActor模式中的mailbox容量是无限的,接收进程也总是被动地处理消息。

Channel

channel

GoChannel的基本机制:

  • 上图左边(非缓冲channel):

    通讯的两方必须同时在channel的两边,才能完成这次交互。任何一方不在,另一方就会被阻塞在那里等待,直到等到另一方才能完成这次交互。

  • 上图右边(缓冲channel):

    就是对这个channel设置容量,在未满的情况下,放消息的人就能放进去,如果满了,就会发生阻塞等待。

    等待拿消息的人去拿,空出来容量。反之,拿消息一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func service() string {
time.Sleep(time.Millisecond * 50) // 模拟阻塞
return "Done"
}

func otherTask() {
fmt.Println("working on something else")
time.Sleep(time.Millisecond * 100) // 模拟阻塞
fmt.Println("Task is done.")
}

func TestService(t *testing.T) {
fmt.Println(service())
otherTask()
/** 运行结果:
=== RUN TestService
Done
working on something else
Task is done.
--- PASS: TestService (0.16s)
*/
}

由运行结果可知,完全是串行的,耗时为 0.16s

service进行改造,在调用的时候启动另外一个协程去执行,而不是阻塞当前写的协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func service() string {
time.Sleep(time.Millisecond * 50) // 模拟阻塞
return "Done"
}

func otherTask() {
fmt.Println("working on something else")
time.Sleep(time.Millisecond * 100) // 模拟阻塞
fmt.Println("Task is done.")
}

func AsyncService() chan string {
retCh := make(chan string) // 创建一个非缓冲string类型的channel
//retCh := make(chan string, 1) // 创建一个容量为1 string类型的缓冲channel
go func() {
ret := service()
fmt.Println("returned result.")
retCh <- ret // 放入 channel
fmt.Println("service exited.")
}()
return retCh
}

func TestAsyncService(t *testing.T) {
retCh := AsyncService()
otherTask()
fmt.Println(<-retCh) // 从channel拿出
/** 运行结果
=== RUN TestAsyncService
working on something else
returned result.
Task is done.
Done
service exited.
--- PASS: TestAsyncService (0.10s)
*/
}

可以看打印的顺序,实现了一个异步返回结果,耗时 0.1s

多路选择和超时

select

  • 多渠道的选择:

    1
    2
    3
    4
    5
    6
    7
    8
    select {
    case ret := <-retCh:
    t.Logf("result %s", ret)
    case ret := <-retCh2:
    t.Logf("result %s", ret)
    default:
    t.Error("No one returned")
    }
  • 超时控制:

    1
    2
    3
    4
    5
    6
    select {
    case ret := <-retCh:
    t.Logf("result %s", ret)
    case <-time.After(time.Second * 1):
    t.Error("time out")
    }

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func service() string {
time.Sleep(time.Millisecond * 500) // 模拟阻塞
return "Done"
}

func AsyncService() chan string {
retCh := make(chan string) // 创建一个非缓冲channel
//retCh := make(chan string, 10) // 创建一个容量为10的缓冲channel
go func() {
ret := service()
fmt.Println("returned result.")
retCh <- ret // 放入 channel
fmt.Println("service exited.")
}()
return retCh
}

func TestSelect(t *testing.T) {
// 上面模拟阻塞 500ms
select {
case ret := <-AsyncService():
t.Log(ret)
case <-time.After(time.Millisecond * 100):
t.Error("time out")
}
/** 运行结果:
=== RUN TestSelect
TestSelect: select_test.go:31: time out
--- FAIL: TestSelect (0.10s)
*/
}

channel的关闭和广播

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 生产者
func dataProducer(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 10; i++ {
ch <- i // 放入
}
wg.Done()
}()
}

// 消费者
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 10; i++ {
data := <-ch // 取出
fmt.Println(data)
}
wg.Done()
}()
}

func TestCloseChannel(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg)
wg.Add(1)
dataReceiver(ch, &wg)
wg.Wait()
/** 运行结果:
=== RUN TestCloseChannel
0
1
2
3
4
5
6
7
8
9
--- PASS: TestCloseChannel (0.00s)
*/
}

这里可以看到 dataProducer 放数据的时候放了10个,dataReceiver 也是拿了10个。

这是因为我们知道是10,但正常情况 dataReceiver 才能知道 dataProducer 放完了呢。

其一我们可以做个约定,比如 dataProducer 放入个 -1 ,当 dataReceiver 收到 -1 就退出去。

但是又出来一个新问题,如果有多个 dataReceiver 呢,dataProducer 就得知道有多少个 dataReceiver,来放入多个 -1,问题就是不知道。

channel的关闭:

  • 向关闭的 channel 发送数据,会导致 panic
  • v, ok <-ch; okbool 值,true 表示正常接受,false 表示通道关闭
  • 所有的 channel 接收者都会在 channel 关闭时,⽴立刻从阻塞等待中返回且上 述 ok 值为 false。这个⼴广播机制常被利利⽤用,进⾏行行向多个订阅者同时发送信号。 如:退出信号。

改造 dataReceiver

1
2
3
4
5
6
7
8
9
10
11
12
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
go func() {
for {
if data, ok := <-ch; ok { // ok 为 true
fmt.Println(data)
} else { // 结束循环
break
}
}
wg.Done()
}()
}

启动多个 dataReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func TestCloseChannel(t *testing.T) {
var wg sync.WaitGroup
ch := make(chan int)
wg.Add(1)
dataProducer(ch, &wg)
wg.Add(1)
dataReceiver(ch, &wg)
wg.Add(1)
dataReceiver(ch, &wg)
wg.Wait()
/** 运行结果:
=== RUN TestCloseChannel
0
1
2
3
5
6
7
8
4
9
--- PASS: TestCloseChannel (0.00s)
*/
}

假如不判断 ok 呢:

1
2
3
4
5
6
7
8
9
func dataReceiver(ch chan int, wg *sync.WaitGroup) {
go func() {
for i := 0; i < 11; i++ { // 上面 dataProducer 放进去了10个
data := <-ch
fmt.Println(data) // 当通道被关闭 会返回一个这个通道定义类型的零值
}
wg.Done()
}()
}

任务的取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func cancel_1(cancelChan chan struct{}) {
cancelChan <- struct{}{} // 往 channel 中放入消息
}
func cancel_2(cancelChan chan struct{}) {
close(cancelChan)
}

func isCancelled(cancelChan chan struct{}) bool {
select {
case <-cancelChan: // 从 channel 中收到消息 返回true
return true
default:
return false
}
}

func TestCancel(t *testing.T) {
cancelChan := make(chan struct{}, 0)
for i := 0; i < 5; i++ { // 启动5个协程任务
go func(i int, cancelChan chan struct{}) {
for { // 每个任务一直在执行
if isCancelled(cancelChan) { // 每次检查是否任务是否进行停止 进行停止
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, cancelChan)
}
cancel_1(cancelChan)
time.Sleep(time.Second * 1)
/** 运行结果:
=== RUN TestCancel
4 Cancelled
--- PASS: TestCancel (1.00s)
*/
}

只有一个 任务被取消掉了,因为 channel 传递过去只有一个信号,而这里有5个协程,其它协程没有被取消。

而我们可以传递5个,将它们全部取消,这样的编程坏处,前面的逻辑和有多少个task进行耦合,必须事先知道有多少个task

换成第二个取消方法,因为是广播机制,所以所有协程都会收到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func TestCancel(t *testing.T) {
cancelChan := make(chan struct{}, 0)
for i := 0; i < 5; i++ { // 启动5个协程任务
go func(i int, cancelChan chan struct{}) {
for { // 每个任务一直在执行
if isCancelled(cancelChan) { // 每次检查是否任务是否进行停止 进行停止
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, cancelChan)
}
cancel_2(cancelChan)
time.Sleep(time.Second * 1)
/** 运行结果:
=== RUN TestCancel
4 Cancelled
2 Cancelled
1 Cancelled
0 Cancelled
3 Cancelled
--- PASS: TestCancel (1.00s)
*/
}

Context与任务取消

我们直接取消叶子节点的任务是可以的,但是取消一个父节点,子节点任务不会被取消,当然可以自己去做这种机制。在 Go 的1.9版本之后把 Context 并入到内置包里面了。帮我们做这些事。

Context

  • 根 Context: 通过 context.Background () 创建
  • ⼦ Context: context.WithCancel(parentContext) 创建
    • ctx, cancel := context.WithCancel(context.Background())
  • 当前 Context 被取消时,基于他的⼦子 context 都会被取消
  • 接收取消通知 <-ctx.Done()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func isCancelled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}

func TestCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 5; i++ { // 启动5个协程任务
go func(i int, ctx context.Context) {
for { // 每个任务一直在执行
if isCancelled(ctx) {
break
}
time.Sleep(time.Millisecond * 5)
}
fmt.Println(i, "Cancelled")
}(i, ctx)
}
cancel()
time.Sleep(time.Second * 1)
/** 运行结果:
=== RUN TestCancel
4 Cancelled
2 Cancelled
3 Cancelled
0 Cancelled
1 Cancelled
--- PASS: TestCancel (1.00s)
*/
}

Go包和依赖管理

构建可复用的模块(包)

package:

  • 基本复用模块单元

    以首字母大写来表明可被包外代码访问

  • 代码的 package 可以和所在的目录不一致

  • 同一目录里的 Go 代码的 package 要保持一致

需要把包目录加入到GOPATH

目录结构:

1
2
3
4
5
6
7
8
~/Documents/Go
- learning
- src
- fifteen
- client
- package_test.go
- series
- my_series.go

查看 go env

1
2
$ go env
GOPATH="/Users/gaobinzhan/Documents/Go/learning:/Users/gaobinzhan/Documents/Go"

可以看到这个目录已经加入GOPATH里了。

my_series.go

1
2
3
4
5
6
7
8
9
10
11
package series

// 首字母必须大写 才可被包外代码访问
func GetFibonacci(n int) ([]int, error) {
fibList := []int{1, 2}

for i := 2; i < n; i++ {
fibList = append(fibList, fibList[i-2]+fibList[i-1])
}
return fibList, nil
}

package_test.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package client

import (
"fifteen/series"
"testing"
)

func TestPackage(t *testing.T) {
t.Log(series.GetFibonacci(5))
/** 运行结果:
=== RUN TestPackage
TestPackage: package_test.go:9: [1 2 3 5 8] <nil>
--- PASS: TestPackage (0.00s)
*/
}

init方法:

  • main 被执行前,所有依赖的 packageinit 方法都会被执行
  • 不同包的 init 函数按照包导入的依赖关系决定执行顺序
  • 每个包可以有多个 init 函数
  • 包的每个源文件

下面修改文件

my_series.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package series

import "fmt"

func init() {
fmt.Println("init 1")
}

func init() {
fmt.Println("init 2")
}

func GetFibonacci(n int) ([]int, error) {
fibList := []int{1, 2}

for i := 2; i < n; i++ {
fibList = append(fibList, fibList[i-2]+fibList[i-1])
}
return fibList, nil
}

package_test.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package client

import (
"fifteen/series"
"testing"
)

func TestPackage(t *testing.T) {
t.Log(series.GetFibonacci(5))
/** 运行结果:
init 1
init 2
=== RUN TestPackage
TestPackage: package_test.go:10: [1 2 3 5 8] <nil>
--- PASS: TestPackage (0.00s)
*/
}

获取远程package:

  • 通过 go get 来获取远程依赖

    go get -u 强制从网络更新远程依赖

  • 注意代码在 Github 上的组织形式,以适应 go get

    直接以代码路径开始,不要有 src

示例:go get -u https://github.com/easierway/concurrent_map

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package remote_package

import (
cm "github.com/easierway/concurrent_map"
"testing"
)

func TestConcurrentMap(t *testing.T) {
m := cm.CreateConcurrentMap(99)
m.Set(cm.StrKey("key"), 10)
t.Log(m.Get(cm.StrKey("key")))
/** 运行结果:
=== RUN TestConcurrentMap
TestConcurrentMap: remote_package_test.go:11: 10 true
--- PASS: TestConcurrentMap (0.00s)
*/

}

依赖管理

Go未解决的依赖问题:

  • 同一环境下,不同项目使用同一包的不同版本
  • 无法管理对包的特定版本的依赖

vendor路径:

随着 Go 1.5 release 版本的发布,vendor ⽬录被添加到除了 GOPATH 和

GOROOT 之外的依赖⽬录查找的解决⽅案。在 Go 1.6 之前,你需要⼿动

的设置环境变量

查找依赖包路径的解决⽅案如下:

  • 当前包下的 vendor ⽬录

  • 向上级⽬录查找,直到找到 src 下的 vendor ⽬录

  • 在 GOPATH 下⾯查找依赖包

  • 在 GOROOT ⽬录下查

常用的依赖管理工具:

简单用一下

安装 gilde

brew install glide

删除我们刚刚 go get 下来的包 然后执行 glide init

然后会在目录下面生成一个 glide.yaml文件

执行 glide install 会生成 vendor 目录 里面就是我们的依赖包

执行原来的测试文件,依然可以执行成功。

Go编写好的错误处理

编写好的错误处理

Go的错误机制:

  • 没有异常机制

  • error 类型实现了 error 接口

    1
    2
    3
    type error interface {
    Error() string
    }
  • 可以通过 errors.News 来快速创建错误实例

    1
    errors.News("n must be in the range [0,10]")

拿Fibonacci举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func GetFibonacci(n int) []int {
fibList := []int{1, 2}

for i := 2; i < n; i++ {
fibList = append(fibList, fibList[i-2]+fibList[i-1])
}
return fibList
}

func TestGetFibonacci(t *testing.T) {
t.Log(GetFibonacci(10))
t.Log(GetFibonacci(-10))
/** 运行结果
=== RUN TestGetFibonacci
TestGetFibonacci: err_test.go:15: [1 2 3 5 8 13 21 34 55 89]
TestGetFibonacci: err_test.go:21: [1 2]
--- PASS: TestGetFibonacci (0.00s)
*/
}

可以看到没有对入参进行校验

现在做下校验:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func GetFibonacci(n int) ([]int, error) {
if n < 2 || n > 100 {
return nil, errors.New("n should be in [2,100]")
}
fibList := []int{1, 2}

for i := 2; i < n; i++ {
fibList = append(fibList, fibList[i-2]+fibList[i-1])
}
return fibList, nil
}

func TestGetFibonacci(t *testing.T) {
// 如果有错误进行错误输出
if v, err := GetFibonacci(-10); err != nil {
t.Error(err)
} else {
t.Log(v)
}
/** 运行结果
=== RUN TestGetFibonacci
TestGetFibonacci: err_test.go:22: n should be in [2,100]
--- FAIL: TestGetFibonacci (0.00s)
*/
}

假设现在有个需求,返回的值是太小了还是太大了,返回不同的错误,最简单的方法直接改造GetFibonacci

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func GetFibonacci(n int) ([]int, error) {
if n < 2 {
return nil, errors.New("n should be not less than 2")
}

if n > 100 {
return nil, errors.New("n should be not larger than 100")
}

fibList := []int{1, 2}

for i := 2; i < n; i++ {
fibList = append(fibList, fibList[i-2]+fibList[i-1])
}
return fibList, nil
}

如果区分错误类型,依靠字符串去匹配简直太麻烦还容易出错,最常见的解决方法,定义两个预置的错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
var LessThanTwoError = errors.New("n should be not less than 2")
var LargerThenHundredError = errors.New("n should be not larger than 100")

func GetFibonacci(n int) ([]int, error) {
if n < 2 {
return nil, LessThanTwoError
}

if n > 100 {
return nil, LargerThenHundredError
}

fibList := []int{1, 2}

for i := 2; i < n; i++ {
fibList = append(fibList, fibList[i-2]+fibList[i-1])
}
return fibList, nil
}

func TestGetFibonacci(t *testing.T) {
// 如果有错误进行错误输出
if v, err := GetFibonacci(-10); err != nil {
// 假如调用者需要判断错误的就比较简单了
if err == LessThanTwoError {
fmt.Println("It is less.")
}
t.Error(err)
} else {
t.Log(v)
}
/** 运行结果
=== RUN TestGetFibonacci
It is less.
TestGetFibonacci: err_test.go:36: n should be not less than 2
--- FAIL: TestGetFibonacci (0.00s)
*/
}

总结:

  • 定义不同的错误变量,以便于判断错误类型

  • 及早失败,避免嵌套,提高代码可读性

panic和recover

panic

panic:

  • panic 用于不可以恢复的错误
  • panic 退出前会执行 defer 指定的内容

panic vs. os.Exit:

  • os.Exit 退出时不会调用 defer 指定的函数
  • os.Exit 退出时不输出当前调用栈的信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func TestExit(t *testing.T) {
fmt.Println("Start")
os.Exit(-1)
/** 运行结果
=== RUN TestExit
Start

Process finished with exit code 1
*/
}

func TestPanic(t *testing.T) {
defer func() {
fmt.Println("Finally!")
}()
fmt.Println("Start")
panic(errors.New("Something wrong!"))
/** 运行结果:
=== RUN TestPanic
Start
Finally!
--- FAIL: TestPanic (0.00s)
panic: Something wrong! [recovered]
panic: Something wrong!

goroutine 6 [running]:
testing.tRunner.func1.1(0x1119860, 0xc000046510)
/usr/local/Cellar/go/1.14.2_1/libexec/src/testing/testing.go:940 +0x2f5
testing.tRunner.func1(0xc00011a120)
/usr/local/Cellar/go/1.14.2_1/libexec/src/testing/testing.go:943 +0x3f9
panic(0x1119860, 0xc000046510)
/usr/local/Cellar/go/1.14.2_1/libexec/src/runtime/panic.go:969 +0x166
command-line-arguments.TestPanic(0xc00011a120)
/Users/gaobinzhan/Documents/Go/learning/src/test/err_test.go:65 +0xd7
testing.tRunner(0xc00011a120, 0x114afa0)
/usr/local/Cellar/go/1.14.2_1/libexec/src/testing/testing.go:991 +0xdc
created by testing.(*T).Run
/usr/local/Cellar/go/1.14.2_1/libexec/src/testing/testing.go:1042 +0x357

Process finished with exit code 1
*/
}

recover

大家在写c++或者php代码的时候,总有一种习惯不希望这个程序被中断或者退出,用来捕获。

php代码:

1
2
3
4
5
try {

} catch (\Throwable $throwable) {

}

c++ 代码:

1
2
3
4
5
try{
...
}catch(...){

}

go代码:

1
2
3
4
5
defer func(){
if err := recover(); err != nil {
// 恢复错误
}
}()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func TestRecover(t *testing.T) {
defer func() {
if err := recover(); err != nil {
// 没有写错误恢复 只是打印出来了
fmt.Println("recovered from", err)
}
}()
fmt.Println("Start")
panic(errors.New("Something wrong!"))
/** 运行结果:
=== RUN TestRecover
Start
recovered from Something wrong!
--- PASS: TestRecover (0.00s)
*/
}

最常见的”错误恢复”:

1
2
3
4
5
defer func() {
if err := recover(); err != nil {
log.Error("recovered panic",err)
}
}()

当心!recover 成为恶魔:

  • 形成僵尸服务进程,导致 health check 失效。
  • “Let it Crash!” 往往是我们恢复不确定性错误的最好方法。

就如上常见的“错误恢复”只是记录了一下,这样的恢复方式是非常危险的。

一定要当心我们自己 recover 在做的事,因为我们 recover 的时候并不去检测错误到底发生了什么错误,而是简单的记录了一下或者忽略。

这时候可能是系统里面的某些核心资源已经消耗完了,我们这样把它强制恢复掉,其实系统依然不能够正常地工作的,还是导致我们的一些健康检查程序 health check 没有办法检查出当前系统的问题。

因为很多的这种 health check 只是检查当前的系统进程在还是不在,因为我们的进程是在的,所以就会导致一种僵尸服务进程,它好像活着,但它也不能提供服务。

这种情况下个人认为倒不如采用一种可恢复的设计模式其中的一种叫 Let it Crash ,干脆 Crash掉,一旦Crash掉 守护进程 ,就会帮我们的服务进程重新提起来。

Go面向对象编程

Is Go an object-oriented language?

Yes and no. Although Go has types and methods and allows an object

oriented style of programming, there is no type hierarchy. The concept

of “interface” in Go provides a different approach that we believe is

easy to use and in some ways more general.

Also, the lack of a type hierarchy makes “objects” in Go feel much more

lightweight than in languages such as C++ or Java.

行为的定义和实现

结构体定义:

1
2
3
4
5
type Employee struct {
Id string
Name string
Age int
}

实例创建及初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func TestCreateEmployee(t *testing.T) {
e := Employee{"0", "Bob", 20} // 分别把值放进去
e1 := Employee{Name: "Mike", Age: 30} // 指定某个field的值
e2 := new(Employee) // new关键字 去创建指向实例的指针 这里返回的引用/指针 相当于 e:=Employee{}
e2.Id = "2" // 通过 example.filed 去赋值
e2.Name = "Rose"
e2.Age = 22
t.Log(e)
t.Log(e1)
t.Log(e1.Id)
t.Log(e2)
t.Logf("e is %T", e)
t.Logf("e2 is %T", e2)
/** 运行结果:
=== RUN TestCreateEmployee
TestCreateEmployee: encap_test.go:18: {0 Bob 20}
TestCreateEmployee: encap_test.go:19: { Mike 30}
TestCreateEmployee: encap_test.go:20:
TestCreateEmployee: encap_test.go:21: &{2 Rose 22}
TestCreateEmployee: encap_test.go:22: e is test.Employee
TestCreateEmployee: encap_test.go:23: e2 is *test.Employee
--- PASS: TestCreateEmployee (0.00s)
*/
}

行为定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 第一种定义方式在实例对应方法被调用时,实例的成员会进行值复制
func (e Employee) String() string {
return fmt.Sprintf("ID:%s-Name:%s-Age:%d", e.Id, e.Name, e.Age)
}

func TestStructOperations(t *testing.T) {
e := Employee{"0", "Bob", 20}
t.Log(e.String())
/** 运行结果:
=== RUN TestStructOperations
TestStructOperations: encap_test.go:46: ID:0-Name:Bob-Age:20
--- PASS: TestStructOperations (0.00s)
*/
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 通常情况下为了避免内存拷贝我们使用第二种定义方式
func (e *Employee) String() string {
return fmt.Sprintf("ID:%s/Name:%s/Age:%d", e.Id, e.Name, e.Age)
}

func TestStructOperations(t *testing.T) {
e := Employee{"0", "Bob", 20}
t.Log(e.String())
/** 运行结果:
=== RUN TestStructOperations
TestStructOperations: encap_test.go:51: ID:0/Name:Bob/Age:20
--- PASS: TestStructOperations (0.00s)
*/
}

在Go语言中不管通过指针访问还是通过实例访问,都是一样的

那么这两种定义没有区别吗??

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (e *Employee) String() string {
fmt.Printf("Address is %x \n", unsafe.Pointer(&e.Name))
return fmt.Sprintf("ID:%s/Name:%s/Age:%d", e.Id, e.Name, e.Age)
}

func TestStructOperations(t *testing.T) {
e := Employee{"0", "Bob", 20}
fmt.Printf("Address is %x \n", unsafe.Pointer(&e.Name))
t.Log(e.String())
/** 运行结果:
=== RUN TestStructOperations
Address is c000060370
Address is c000060370
TestStructOperations: encap_test.go:54: ID:0/Name:Bob/Age:20
--- PASS: TestStructOperations (0.00s)
*/
}

可以发现两个地址一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (e Employee) String() string {
fmt.Printf("Address is %x \n", unsafe.Pointer(&e.Name))
return fmt.Sprintf("ID:%s-Name:%s-Age:%d", e.Id, e.Name, e.Age)
}

func TestStructOperations(t *testing.T) {
e := Employee{"0", "Bob", 20}
fmt.Printf("Address is %x \n", unsafe.Pointer(&e.Name))
t.Log(e.String())
/** 运行结果:
=== RUN TestStructOperations
Address is c000092370
Address is c0000923a0
TestStructOperations: encap_test.go:55: ID:0-Name:Bob-Age:20
--- PASS: TestStructOperations (0.00s)
*/
}

这时候两个地址不一致,说明结构体的数据被复制了,会造成开销

Go语言的相关接口

Java的接口与依赖:

Go的 Duck Type 式接口实现:

  • 接口为非入侵性,实现不依赖与接口定义
  • 所以接口的定义可以包含在接口使用者包内

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Programmer interface {
WriteHelloWorld() string
}

type GoProgrammer struct {
}

func (g *GoProgrammer) WriteHelloWorld() string {
return "Hello World"
}

func TestClient(t *testing.T) {
var p Programmer
p = new(GoProgrammer)
t.Log(p.WriteHelloWorld())
/** 运行结果:
=== RUN TestClient
TestClient: interface_test.go:19: Hello World
--- PASS: TestClient (0.00s)
*/
}

接口变量:

自定义类型:

  • type IntConvertionFn func(n int) int
  • type MyPoint int

扩展和复用

复合:

  • Go不支持继承,可以通过复合的方式来复用

匿名类型嵌入:

它不是继承,如果我们把“内部 struct”看作父类,把“外部 struct” 看作子类,会发现如下问题:

  • 不支持子类替换
  • 子类并不是真正继承了父类的方法
    • 父类定义的方法无法访问子类的数据和方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type Pet struct {
}

func (p *Pet) Speak() {
fmt.Print("...")
}

func (p *Pet) SpeakTo(string string) {
p.Speak()
fmt.Println(" ", string)
}

type Dog struct {
p *Pet
}

func (d *Dog) Speak() {
fmt.Print("Wang!")
}

func (d *Dog) SpeakTo(string string) {
d.p.SpeakTo(string)
}

func TestDog(t *testing.T) {
dog := new(Dog)
dog.SpeakTo("Gao") // 没有打印 Wang! 需要改动 dog中SpeakTo方法
/** 运行结果:
=== RUN TestDog
... Gao
--- PASS: TestDog (0.00s)
*/
}

多态与空接口

多态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
type Code string // 自定义类型

type Programmer interface {
WriteHelloWorld() Code
}

type GoProgrammer struct {
}

type PhpProgrammer struct {
}

func (g *GoProgrammer) WriteHelloWorld() Code {
return "fmt.Println(\"Hello World\")"
}

func (p *PhpProgrammer) WriteHelloWorld() Code {
return "echo \"Hello World\""
}

func writeFirstProgram(p Programmer) {
fmt.Printf("%T %v\n", p, p.WriteHelloWorld())
}

func TestPolymorphism(t *testing.T) {
goProg := new(GoProgrammer)
phpProg := new(PhpProgrammer)
writeFirstProgram(goProg)
writeFirstProgram(phpProg)
/** 运行结果
=== RUN TestPolymorphism
*test.GoProgrammer fmt.Println("Hello World")
*test.PhpProgrammer echo "Hello World"
--- PASS: TestPolymorphism (0.00s)
*/
}

空接口与断言:

  • 空接口可以表示任何类型

  • 通过断言来将空接口转换为制定类型

    v, ok := p.(int) // ok=true 时为转换成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func DoSomething(p interface{}) {
// 如果传入的参数能被断言成一个整型
if i, ok := p.(int); ok {
fmt.Println("Integer", i)
return
}

// 如果传入的参数能被断言成一个字符型
if s, ok := p.(string); ok {
fmt.Println("String", s)
return
}

fmt.Println("Unknow Type")

// 也可以通过switch来判断
/*switch v := p.(type) {
case int:
fmt.Println("Integer", v)
case string:
fmt.Println("String", v)
default:
fmt.Println("Unknow Type")
}*/
}

func TestEmptyInterfaceAssertion(t *testing.T) {
DoSomething(10)
DoSomething("gaobinzhan")
/** 运行结果
=== RUN TestEmptyInterfaceAssertion
Integer 10
String gaobinzhan
--- PASS: TestEmptyInterfaceAssertion (0.00s)
*/
}

Go接口最佳实践:

Go的函数及可变参数和defer

函数是一等公民:

  • 可以有多个返回值
  • 所有参数都是值传递:slicemapchannel 会有传引用的错觉
  • 函数可以作为变量的值
  • 函数可以作为参数和返回值
1
2
3
4
5
6
7
8
9
func returnMultiValues() (int, int) {
// 返回两个值
return rand.Intn(10), rand.Intn(20)
}

func TestFn(t *testing.T) {
a, b := returnMultiValues()
t.Log(a, b) // 1 7
}

可变参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func Sum(ops ...int) int {
ret := 0
for _, op := range ops {
ret += op
}
return ret
}

func TestVarParams(t *testing.T) {
t.Log(Sum(1, 2, 3, 4))
t.Log(Sum(1, 2, 3, 4, 5))
/** 运行结果
=== RUN TestVarParams
TestVarParams: func_test.go:48: 10
TestVarParams: func_test.go:49: 15
--- PASS: TestVarParams (0.00s)
*/
}

defer:

在最后执行完执行,通常我们用于释放资源及释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func TestDefer(t *testing.T) {
defer func() {
t.Log("Clean resources")
}()
t.Log("Started")
// panic 手动触发宕机
panic("Fatal error") // defer 仍然会执行
/** 运行结果
=== RUN TestDefer
TestDefer: func_test.go:56: Started
TestDefer: func_test.go:54: Clean resources
--- FAIL: TestDefer (0.00s)
*/
}

Go字符串

字符串:

  • string是数据类型,不是引用或指针类型
  • string是只读的byte slice,len函数可以获取它所包含的byte数
  • string的byte数组可以存放任何数据
1
2
3
4
5
6
7
8
9
10
11
12
13
func TestStringInit(t *testing.T) {
var s string
t.Log(s) // 初始化为默认零值"" 空字符串

s = "hello"
t.Log(len(s)) // 5 5个byte
//s[1] = 3 // string是不可变的byte slice 不可以赋值

s = "\xE4\xB8\xA5" // 可以存储任何二进制数据

t.Log(s) // 严
t.Log(len(s)) // 3 为3个byte
}

Unicode UTF8:

  • Unicode是一种字符集(code point)
  • UTF8是unicode的存储实现(转换为字节序列的规则)

编码和存储:

字符 Unicode UTF-8 string/[]byte
“中” 0x4E2D 0xE4B8AD [0xE4,0xB8,0xAD]
1
2
3
4
5
6
7
8
9
10
11
12
func TestUnicode(t *testing.T) {
s := "中"
t.Log(len(s)) // 3 为3个byte

// 新的数据类型 rune 能够取出字符串的unicode
c := []rune(s)
t.Log(len(c)) // 1

// %x 输出十六进制
t.Logf("中 unicode %x", c[0]) // 中 unicode 4e2d
t.Logf("中 utf8 %x", s) // 中 utf8 e4b8ad
}

字符串遍历:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func TestStringToRange(t *testing.T) {
s := "PHP是世界上最好的语言"
for _, c := range s {
// 都用第一个参数
t.Logf("%[1]c %[1]x", c)
}
/** 运行结果:
=== RUN TestStringToRange
TestStringToRange: string_test.go:37: P 50
TestStringToRange: string_test.go:37: H 48
TestStringToRange: string_test.go:37: P 50
TestStringToRange: string_test.go:37: 是 662f
TestStringToRange: string_test.go:37: 世 4e16
TestStringToRange: string_test.go:37: 界 754c
TestStringToRange: string_test.go:37: 上 4e0a
TestStringToRange: string_test.go:37: 最 6700
TestStringToRange: string_test.go:37: 好 597d
TestStringToRange: string_test.go:37: 的 7684
TestStringToRange: string_test.go:37: 语 8bed
TestStringToRange: string_test.go:37: 言 8a00
--- PASS: TestStringToRange (0.00s)
*/
}

常用字符串函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func TestStringFn(t *testing.T) {
s := "A,B,C"
// 字符串切割
parts := strings.Split(s, ",")
for _, part := range parts {
t.Log(part)
}
// 字符串拼接
t.Log(strings.Join(parts, "-"))
/** 运行结果:
=== RUN TestStringFn
TestStringFn: string_fun_test.go:12: A
TestStringFn: string_fun_test.go:12: B
TestStringFn: string_fun_test.go:12: C
TestStringFn: string_fun_test.go:14: A-B-C
--- PASS: TestStringFn (0.00s)
*/
}

数据类型转换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func TestStrConv(t *testing.T) {
// 整型转字符串
s := strconv.Itoa(10)
t.Log("str" + s)

// 字符串转整型对错误值需要做一个判断
if i, err := strconv.Atoi("10"); err == nil {
t.Log(10 + i)
}
/** 运行结果:
=== RUN TestStrConv
TestStrConv: string_fun_test.go:29: str10
TestStrConv: string_fun_test.go:33: 20
--- PASS: TestStrConv (0.00s)
*/
}