Go

goroutine vs tokio

February 16, 2022
Go, Goroutine, Tokio
Concurrent

Reddit讨论贴

Go uses a different strategy for blocking systemcalls. It does not run them on a threadpool - it moves all the other goroutines that are queued to run on the current thread to a new worker thread, then runs the blocking systemcall on the current thread. This minimizes context switching.

You can do this in tokio as well, using task::block_in_place. If I change your code to use that instead of tokio::fs, it gets a lot closer to the go numbers. Note that using block_in_place is not without caveats, and it only works on the multi-threaded runtime, not the single-threaded one. That’s why it’s not used in the implementation of tokio::fs.

...

go runtime chan

February 11, 2022
Go, Runtime, Chan
Chan

src/runtime/chan.go:

// Invariants:
//  At least one of c.sendq and c.recvq is empty,
//  except for the case of an unbuffered channel with a single goroutine
//  blocked on it for both sending and receiving using a select statement,
//  in which case the length of c.sendq and c.recvq is limited only by the
//  size of the select statement.
//
// For buffered channels, also:
//  c.qcount > 0 implies that c.recvq is empty.
//  c.qcount < c.dataqsiz implies that c.sendq is empty.

// 在文件开头,说明了几个不变量:
//  c.sendq和c.recvq中至少有一个是空的,
//  除非,一个无缓冲管道在一个goroutine里阻塞了,这个管道的发送和接收都使用了一个select语句,这时
//  c.sendq和c.recvq的长度被select语句限制。
// 
// 对于缓冲管道,同样地:
//  c.qcount > 0 表明c.recvq是空的。
//  c.qcount < c.dataqsiz 表明c.sendq是空的。

// 实际的chan类型
type hchan struct {
	qcount   uint           // total data in the queue - 队列里的数据总数量
	dataqsiz uint           // size of the circular queue - 循环队列的大小,make时传进来的值
	buf      unsafe.Pointer // points to an array of dataqsiz elements - dataqsiz元素组成的数组的指针
	elemsize uint16 // 元素大小
	closed   uint32 // 是否关闭
	elemtype *_type // element type - 元素类型
	sendx    uint   // send index - 发送索引
	recvx    uint   // receive index - 接收索引
	recvq    waitq  // list of recv waiters - 等待接收者列表,表明这个管道的接收者;一个链表,里面的每个元素代表一个g;
	sendq    waitq  // list of send waiters - 等待发送者列表,编码这个管道的发送者

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex // 保护chan里的所有字段,以及阻塞在本管道里的sudog;当持有这个锁时,不要改变其它G的状态,因为在栈收缩时可能引起死锁。
}

type waitq struct {
	first *sudog
	last  *sudog
}

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel. - 代表了一个在等待列表的g
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
// - sudog是必须的,因为g和同步对象关系是多对多。一个g可以在多个等待列表里,因此一个g对应有多个sudog;
// 多个g可以等待同一个同步对象,因此一个对象会对应多个sudog。
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
// - sudog从一个特殊池子里分配,使用acquireSudog分配和releaseSudog释放它们。
type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.
    // - 以下字段由hchan.lock来保护。

	g *g // 代表的g

	next *sudog // 链表中的下一个
	prev *sudog // 链表中的上一个
	elem unsafe.Pointer // data element (may point to stack) - 数据元素,可能是指向栈的指针

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.
    // - 以下字段永远不会被并发访问。
    // 对于管道,waitlink只会被g访问。
    // 对于信号量,所有字段(包括上面的)只有在持有semaRoot锁时才能被访问

	acquiretime int64 // 获取时间
	releasetime int64 // 释放时间
	ticket      uint32 // 票据

	// isSelect indicates g is participating in a select, so
	// g.selectDone must be CAS'd to win the wake-up race.
	isSelect bool // 表明g是否参与到了一个select里,从而使得g.selectDone必须CAS地去赢得唤醒竞赛

	// success indicates whether communication over channel c
	// succeeded. It is true if the goroutine was awoken because a
	// value was delivered over channel c, and false if awoken
	// because c was closed.
	success bool // 表明管道的通信是否成功了,如果goroutine因为一个值被管道传送到来而唤醒即为成功

	parent   *sudog // semaRoot binary tree - 根信号量二叉树
	waitlink *sudog // g.waiting list or semaRoot - g的等待列表或semaRoot
	waittail *sudog // semaRoot
	c        *hchan // channel - 所属管道
}

// 新建
func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// compiler checks this but be safe.
	if elem.size >= 1<<16 { // 管道的元素大小不能太大
		throw("makechan: invalid channel element type")
	}
    // const hchanSize uintptr = 96
	if hchanSize%maxAlign != 0 || elem.align > maxAlign { // 对齐检查
		throw("makechan: bad alignment")
	}

    // 元素大小乘以管道大小,计算出来所需内存大小
	mem, overflow := math.MulUintptr(elem.size, uintptr(size)) 
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers. -- 元素没有包含指针
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers. -- 元素包含指针
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan) // 初始化锁

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

// 发送
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	// src is on our stack, dst is a slot on another stack.
    // - src是在我们的栈上,dst是另一个栈上的槽

	// Once we read sg.elem out of sg, it will no longer
	// be updated if the destination's stack gets copied (shrunk).
	// So make sure that no preemption points can happen between read & use.
	dst := sg.elem
	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
	// No need for cgo write barrier checks because dst is always
	// Go memory.
	memmove(dst, src, t.size) // 移动src到dst
}

// 接收 -- 请看源码

// 关闭
func closechan(c *hchan) {
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 { // 已关闭的chan,如果再次关闭会panic
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
		racerelease(c.raceaddr())
	}

	c.closed = 1 // 设为关闭

	var glist gList

    // 先释放接收者,再释放发送者

	// release all readers
	for {
		sg := c.recvq.dequeue() // 逐个出队sudog
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem) // 清理元素
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp) // 把关联的g存到glist里
	}

	// release all writers (they will panic)
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		gp := glist.pop() // 逐个处理g
		gp.schedlink = 0
		goready(gp, 3) // 因为我们已经释放了这些g所关联的chan,所以让这些g进入ready状态,准备运行 -- Mark gp ready to run.
	}
}

src/runtime/type.go:

...

go work

February 10, 2022
Go, Work
Workspace

go1.18将要推出workspace模式,此举是为了方便在本地开发多个不同module时的依赖管理。

命令说明:

$ go help work
Go workspace provides access to operations on workspaces.

Note that support for workspaces is built into many other commands, not
just 'go work'.

See 'go help modules' for information about Go\'s module system of which
workspaces are a part.

A workspace is specified by a go.work file that specifies a set of
module directories with the "use" directive. These modules are used as
root modules by the go command for builds and related operations.  A
workspace that does not specify modules to be used cannot be used to do
builds from local modules.

go.work files are line-oriented. Each line holds a single directive,
made up of a keyword followed by arguments. For example:

        go 1.18

        use ../foo/bar
        use ./baz

        replace example.com/foo v1.2.3 => example.com/bar v1.4.5

The leading keyword can be factored out of adjacent lines to create a block,
like in Go imports.

        use (
          ../foo/bar
          ./baz
        )

The use directive specifies a module to be included in the workspace\'s
set of main modules. The argument to the use directive is the directory
containing the module\'s go.mod file.

The go directive specifies the version of Go the file was written at. It
is possible there may be future changes in the semantics of workspaces
that could be controlled by this version, but for now the version
specified has no effect.

The replace directive has the same syntax as the replace directive in a
go.mod file and takes precedence over replaces in go.mod files.  It is
primarily intended to override conflicting replaces in different workspace
modules.

To determine whether the go command is operating in workspace mode, use
the "go env GOWORK" command. This will specify the workspace file being
used.

Usage:

        go work <command> [arguments]

The commands are:

        edit        edit go.work from tools or scripts
        init        initialize workspace file
        sync        sync workspace build list to modules
        use         add modules to workspace file

Use "go help work <command>" for more information about a command.

使用use指令指定包含在workspace里的module集。use指令后紧接着的是包含了模块的go.mod文件的目录–相对go.work的目录。

...

Go快速入门

January 25, 2022
Go
Learn

源码 #

// 所有代码都需要放到包里
package color

// 导入其它包
import (
	"context"
	"fmt"
	"strconv"
	"sync"
	"time"
)

// 枚举
type Color int

// 常量
const (
	Red   Color = 1 // 红
	Blue  Color = 2 // 蓝
	Green Color = 3 // 绿
)

// 函数
func NewCar(
	name string,
	rate int,
) *Car {
	return &Car{
		name: name,
		rate: rate,
	}
}

// 类型
type Car struct {
	// 类型字段
	name string // 首字母小写,非导出,只能包内使用
	rate int
}

// 类型方法
func (car *Car) String() string { // 首字母大写,导出,可供其它包使用
	return "[Car] name: " + car.name + ", rate: " + strconv.Itoa(car.rate) + "."
}

func (car *Car) Run(
	ctx context.Context, // 使用ctx实现超时控制
) {
	// 定时器,每隔rate秒执行一次
	ticker := time.NewTicker(time.Duration(car.rate) * time.Second)
	defer ticker.Stop() // defer语句,在方法退出前执行,做收尾工作

	// for range ticker.C { // 循环,遍历chan
	// 	fmt.Printf("%s\n", car)
	// }

	for {
		select {
		case <-ticker.C:
			{ // 代码块,让逻辑更聚合,更清晰
				timesMutex.Lock()
				count := 1
				if v, ok := times[car.name]; ok {
					count = v + 1
				}
				times[car.name] = count
				timesMutex.Unlock()
			}

			fmt.Printf("%s\n", car)

		case <-ctx.Done():
			return
		}
	}
}

// 接口
type Runner interface {
	Run(ctx context.Context)
}

// 变量
var (
	// 确保*Car实现了Runner接口
	_ Runner = (*Car)(nil)

	timesMutex = new(sync.RWMutex)       // 读写锁,唯一写,多个读,读时无写
	times      = make(map[string]int, 2) // 记录Car Run的次数;在声明时初始化,并配置容量
)

测试 #

package color

import (
	"context"
	"fmt"
	"sync"
	"testing"
	"time"
)

func TestCar(t *testing.T) {
	// 超时控制
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
	defer cancel()

	// 并发执行
	wg := new(sync.WaitGroup)

	for _, car := range []Runner{ // 遍历切片
		NewCar("lanbo", 2),
		NewCar("boshi", 3),
	} {
		wg.Add(1) // 记录一个
		go func(car Runner) {
			defer wg.Done() // 完成一个

			t.Run(car.(*Car).name, func(t *testing.T) { // 对接口断言,获得具体类型
				car.Run(ctx)
			})
		}(car)
	}

	// 等上面均完成
	wg.Wait()

	timesMutex.RLock()
	fmt.Printf("times: %+v\n", times)
	timesMutex.RUnlock()
}

执行 #

编译:go build

...

burn cpu use golang

December 9, 2021
Go
Cpu

虚假的 burn #

package main

func fakeBurn() {
 for {

 }
}

真正的 burn #

package main

import (
 "flag"
 "fmt"
 "runtime"
 "time"
)

var (
 numBurn        int
 updateInterval int
)

func cpuBurn() {
 for {
  for i := 0; i < 2147483647; i++ {
  }

  // Gosched yields the processor, allowing other goroutines to run. It does not suspend the current goroutine, so execution resumes automatically.
  // Gosched让当前goroutine让出处理器,从而使得其它goroutine可以运行。它不会挂起/暂停当前的goroutine,它会自动恢复执行。
  runtime.Gosched()
 }
}

func init() {
 flag.IntVar(&numBurn, "n", 0, "number of cores to burn (0 = all)")
 flag.IntVar(&updateInterval, "u", 10, "seconds between updates (0 = don't update)")
 flag.Parse()
 if numBurn <= 0 {
  numBurn = runtime.NumCPU()
 }
}

func main() {
 runtime.GOMAXPROCS(numBurn)
 fmt.Printf("Burning %d CPUs/cores\n", numBurn)
 for i := 0; i < numBurn; i++ {
  go cpuBurn()
 }

 // 一直执行,区别是其中一个会定期打印,另一个不会打印
 if updateInterval > 0 {
  t := time.Tick(time.Duration(updateInterval) * time.Second)
  for secs := updateInterval; ; secs += updateInterval {
   <-t
   fmt.Printf("%d seconds\n", secs)
  }
 } else {
  select {} // wait forever
 }
}

Go实现AOP

January 17, 2021
Go
Aop, Proxy

AOP #

面向切面编程(AOP: Aspect Oriented Program)。

划分,重复,复用 #

我们知道,面向对象的特点是继承、多态和封装。而封装就要求将功能分散到不同的对象中去,这在软件设计中往往称为职责分配。实际上也就是说,让不同的类设计不同的方法。这样代码就分散到一个个的类中去了。这样做的好处是降低了代码的复杂程度,使类可重用。

出现的问题:

但是人们也发现,在分散代码的同时,也增加了代码的重复性。什么意思呢?比如说,我们在两个类中,可能都需要在每个方法中做日志。按面向对象的设计方法,我们就必须在两个类的方法中都加入日志的内容。也许他们是完全相同的,但就是因为面向对象的设计让类与类之间无法联系,而不能将这些重复的代码统一起来。

想法1:

也许有人会说,那好办啊,我们可以将这段代码写在一个独立的类独立的方法里,然后再在这两个类中调用。但是,这样一来,这两个类跟我们上面提到的独立的类就有耦合了,它的改变会影响这两个类。

那么,有没有什么办法,能让我们在需要的时候,随意地加入代码呢?

这种在运行时,动态地将代码切入到类的指定方法、指定位置上的编程思想就是面向切面的编程。

一般而言,我们管切入到指定类指定方法的代码片段称为切面,而切入到哪些类、哪些方法则叫切入点。

有了AOP,我们就可以把几个类共有的代码,抽取到一个切片中,等到需要时再切入对象中去,从而改变其原有的行为。

OOP从横向上区分出一个个的类来,而AOP则从纵向上向对象中加入特定的代码。

从技术上来说,AOP基本上是通过代理机制实现的。

AOP

Go实现AOP – 层间代理 #

假设有store,从数据库获取数据,其中有方法IUserStore.GetByID,传入id参数,返回用户信息:

type IUserStore interface {
        GetByID(ctx context.Context, id int) (User, error)
}

另外有service,刚好有用户id并且需要拿到用户信息,于是依赖了上述IUserStore:

type IUserSrv interface {
        CheckUser(ctx context.Context, id int) error // 获取用户信息,然后检查用户某些属性
} 

type userImpl struct {
        userStore IUserStore
}

func (impl userImpl) CheckUser(ctx context.Context, id int) error {
        user, err := impl.userStore.GetByID(ctx, id)
        if err != nil {
                return err
        }

        // 使用user数据做一些操作
        _ = user
}

上面所描述的是一个最简单的情况,如果我们要在userImpl.CheckUser里对impl.userStore.GetByID方法调用添加耗时统计,依然十分简单。

func (impl userImpl) CheckUser(ctx context.Context, id int) error {
        begin := time.Now()
        user, err := impl.userStore.GetByID(ctx, id)
        if err != nil {
                return err
        }
        fmt.Println(time.Since(begin)) // 统计耗时

        // 使用user数据做一些操作
        _ = user
}

但是,如果方法里调用的类似impl.userStore.GetByID的方法非常之多,逻辑非常之复杂时,这样一个一个的添加,必然非常麻烦、非常累。

...

go ctx

December 18, 2020
Go
Ctx

ctx #

1.why

goroutine号称百万之众,互相之间盘根错节,难以管理控制。为此,必须提供一种机制来管理控制它们。

各自为战 #

package main

import (
    "fmt"
    "time"
)

func main() {
    // start first
    go func() {
        fmt.Println(1)
    }()

    // start second
    go func() {
        fmt.Println(2)
    }()

    time.Sleep(time.Second)
}

万法归一 #

package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := new(sync.WaitGroup)

    // start first
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println(1)
    }()

    // start second
    wg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Println(2)
    }()

    wg.Wait()
}

可以看到使用waitgroup可以控制多个goroutine必须互相等待,直到最后一个完成才会全部完成。

...