// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime_test
import (
"internal/testenv"
"math"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestChan(t *testing.T) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4))
N := 200
if testing.Short() {
N = 20
}
for chanCap := 0; chanCap < N; chanCap++ {
{
// Ensure that receive from empty chan blocks.
c := make(chan int, chanCap)
recv1 := false
go func() {
_ = <-c
recv1 = true
}()
recv2 := false
go func() {
_, _ = <-c
recv2 = true
}()
time.Sleep(time.Millisecond)
if recv1 || recv2 {
t.Fatalf("chan[%d]: receive from empty chan", chanCap)
}
// Ensure that non-blocking receive does not block.
select {
case _ = <-c:
t.Fatalf("chan[%d]: receive from empty chan", chanCap)
default:
}
select {
case _, _ = <-c:
t.Fatalf("chan[%d]: receive from empty chan", chanCap)
default:
}
c <- 0
c <- 0
}
{
// Ensure that send to full chan blocks.
c := make(chan int, chanCap)
for i := 0; i < chanCap; i++ {
c <- i
}
sent := uint32(0)
go func() {
c <- 0
atomic.StoreUint32(&sent, 1)
}()
time.Sleep(time.Millisecond)
if atomic.LoadUint32(&sent) != 0 {
t.Fatalf("chan[%d]: send to full chan", chanCap)
}
// Ensure that non-blocking send does not block.
select {
case c <- 0:
t.Fatalf("chan[%d]: send to full chan", chanCap)
default:
}
<-c
}
{
// Ensure that we receive 0 from closed chan.
c := make(chan int, chanCap)
for i := 0; i < chanCap; i++ {
c <- i
}
close(c)
for i := 0; i < chanCap; i++ {
v := <-c
if v != i {
t.Fatalf("chan[%d]: received %v, expected %v", chanCap, v, i)
}
}
if v := <-c; v != 0 {
t.Fatalf("chan[%d]: received %v, expected %v", chanCap, v, 0)
}
if v, ok := <-c; v != 0 || ok {
t.Fatalf("chan[%d]: received %v/%v, expected %v/%v", chanCap, v, ok, 0, false)
}
}
{
// Ensure that close unblocks receive.
c := make(chan int, chanCap)
done := make(chan bool)
go func() {
v, ok := <-c
done <- v == 0 && ok == false
}()
time.Sleep(time.Millisecond)
close(c)
if !<-done {
t.Fatalf("chan[%d]: received non zero from closed chan", chanCap)
}
}
{
// Send 100 integers,
// ensure that we receive them non-corrupted in FIFO order.
c := make(chan int, chanCap)
go func() {
for i := 0; i < 100; i++ {
c <- i
}
}()
for i := 0; i < 100; i++ {
v := <-c
if v != i {
t.Fatalf("chan[%d]: received %v, expected %v", chanCap, v, i)
}
}
// Same, but using recv2.
go func() {
for i := 0; i < 100; i++ {
c <- i
}
}()
for i := 0; i < 100; i++ {
v, ok := <-c
if !ok {
t.Fatalf("chan[%d]: receive failed, expected %v", chanCap, i)
}
if v != i {
t.Fatalf("chan[%d]: received %v, expected %v", chanCap, v, i)
}
}
// Send 1000 integers in 4 goroutines,
// ensure that we receive what we send.
const P = 4
const L = 1000
for p := 0; p < P; p++ {
go func() {
for i := 0; i < L; i++ {
c <- i
}
}()
}
done := make(chan map[int]int)
for p := 0; p < P; p++ {
go func() {
recv := make(map[int]int)
for i := 0; i < L; i++ {
v := <-c
recv[v] = recv[v] + 1
}
done <- recv
}()
}
recv := make(map[int]int)
for p := 0; p < P; p++ {
for k, v := range <-done {
recv[k] = recv[k] + v
}
}
if len(recv) != L {
t.Fatalf("chan[%d]: received %v values, expected %v", chanCap, len(recv), L)
}
for _, v := range recv {
if v != P {
t.Fatalf("chan[%d]: received %v values, expected %v", chanCap, v, P)
}
}
}
{
// Test len/cap.
c := make(chan int, chanCap)
if len(c) != 0 || cap(c) != chanCap {
t.Fatalf("chan[%d]: bad len/cap, expect %v/%v, got %v/%v", chanCap, 0, chanCap, len(c), cap(c))
}
for i := 0; i < chanCap; i++ {
c <- i
}
if len(c) != chanCap || cap(c) != chanCap {
t.Fatalf("chan[%d]: bad len/cap, expect %v/%v, got %v/%v", chanCap, chanCap, chanCap, len(c), cap(c))
}
}
}
}
func TestNonblockRecvRace(t *testing.T) {
n := 10000
if testing.Short() {
n = 100
}
for i := 0; i < n; i++ {
c := make(chan int, 1)
c <- 1
go func() {
select {
case <-c:
default:
t.Error("chan is not ready")
}
}()
close(c)
<-c
if t.Failed() {
return
}
}
}
// This test checks that select acts on the state of the channels at one
// moment in the execution, not over a smeared time window.
// In the test, one goroutine does:
// create c1, c2
// make c1 ready for receiving
// create second goroutine
// make c2 ready for receiving
// make c1 no longer ready for receiving (if possible)
// The second goroutine does a non-blocking select receiving from c1 and c2.
// From the time the second goroutine is created, at least one of c1 and c2
// is always ready for receiving, so the select in the second goroutine must
// always receive from one or the other. It must never execute the default case.
func TestNonblockSelectRace(t *testing.T) {
n := 100000
if testing.Short() {
n = 1000
}
done := make(chan bool, 1)
for i := 0; i < n; i++ {
c1 := make(chan int, 1)
c2 := make(chan int, 1)
c1 <- 1
go func() {
select {
case <-c1:
case <-c2:
default:
done <- false
return
}
done <- true
}()
c2 <- 1
select {
case <-c1:
default:
}
if !<-done {
t.Fatal("no chan is ready")
}
}
}
// Same as TestNonblockSelectRace, but close(c2) replaces c2 <- 1.
func TestNonblockSelectRace2(t *testing.T) {
n := 100000
if testing.Short() {
n = 1000
}
done := make(chan bool, 1)
for i := 0; i < n; i++ {
c1 := make(chan int, 1)
c2 := make(chan int)
c1 <- 1
go func() {
select {
case <-c1:
case <-c2:
default:
done <- false
return
}
done <- true
}()
close(c2)
select {
case <-c1:
default:
}
if !<-done {
t.Fatal("no chan is ready")
}
}
}
func TestSelfSelect(t *testing.T) {
// Ensure that send/recv on the same chan in select
// does not crash nor deadlock.
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(2))
for _, chanCap := range []int{0, 10} {
var wg sync.WaitGroup
wg.Add(2)
c := make(chan int, chanCap)
for p := 0; p < 2; p++ {
p := p
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
if p == 0 || i%2 == 0 {
select {
case c <- p:
case v := <-c:
if chanCap == 0 && v == p {
t.Errorf("self receive")
return
}
}
} else {
select {
case v := <-c:
if chanCap == 0 && v == p {
t.Errorf("self receive")
return
}
case c <- p:
}
}
}
}()
}
wg.Wait()
}
}
func TestSelectStress(t *testing.T) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(10))
var c [4]chan int
c[0] = make(chan int)
c[1] = make(chan int)
c[2] = make(chan int, 2)
c[3] = make(chan int, 3)
N := int(1e5)
if testing.Short() {
N /= 10
}
// There are 4 goroutines that send N values on each of the chans,
// + 4 goroutines that receive N values on each of the chans,
// + 1 goroutine that sends N values on each of the chans in a single select,
// + 1 goroutine that receives N values on each of the chans in a single select.
// All these sends, receives and selects interact chaotically at runtime,
// but we are careful that this whole construct does not deadlock.
var wg sync.WaitGroup
wg.Add(10)
for k := 0; k < 4; k++ {
k := k
go func() {
for i := 0; i < N; i++ {
c[k] <- 0
}
wg.Done()
}()
go func() {
for i := 0; i < N; i++ {
<-c[k]
}
wg.Done()
}()
}
go func() {
var n [4]int
c1 := c
for i := 0; i < 4*N; i++ {
select {
case c1[3] <- 0:
n[3]++
if n[3] == N {
c1[3] = nil
}
case c1[2] <- 0:
n[2]++
if n[2] == N {
c1[2] = nil
}
case c1[0] <- 0:
n[0]++
if n[0] == N {
c1[0] = nil
}
case c1[1] <- 0:
n[1]++
if n[1] == N {
c1[1] = nil
}
}
}
wg.Done()
}()
go func() {
var n [4]int
c1 := c
for i := 0; i < 4*N; i++ {
select {
case <-c1[0]:
n[0]++
if n[0] == N {
c1[0] = nil
}
case <-c1[1]:
n[1]++
if n[1] == N {
c1[1] = nil
}
case <-c1[2]:
n[2]++
if n[2] == N {
c1[2] = nil
}
case <-c1[3]:
n[3]++
if n[3] == N {
c1[3] = nil
}
}
}
wg.Done()
}()
wg.Wait()
}
func TestSelectFairness(t *testing.T) {
const trials = 10000
if runtime.GOOS == "linux" && runtime.GOARCH == "ppc64le" {
testenv.SkipFlaky(t, 22047)
}
c1 := make(chan byte, trials+1)
c2 := make(chan byte, trials+1)
for i := 0; i < trials+1; i++ {
c1 <- 1
c2 <- 2
}
c3 := make(chan byte)
c4 := make(chan byte)
out := make(chan byte)
done := make(chan byte)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
var b byte
select {
case b = <-c3:
case b = <-c4:
case b = <-c1:
case b = <-c2:
}
select {
case out <- b:
case <-done:
return
}
}
}()
cnt1, cnt2 := 0, 0
for i := 0; i < trials; i++ {
switch b := <-out; b {
case 1:
cnt1++
case 2:
cnt2++
default:
t.Fatalf("unexpected value %d on channel", b)
}
}
// If the select in the goroutine is fair,
// cnt1 and cnt2 should be about the same value.
// With 10,000 trials, the expected margin of error at
// a confidence level of five nines is 4.4172 / (2 * Sqrt(10000)).
r := float64(cnt1) / trials
e := math.Abs(r - 0.5)
t.Log(cnt1, cnt2, r, e)
if e > 4.4172/(2*math.Sqrt(trials)) {
t.Errorf("unfair select: in %d trials, results were %d, %d", trials, cnt1, cnt2)
}
close(done)
wg.Wait()
}
func TestChanSendInterface(t *testing.T) {
type mt struct{}
m := &mt{}
c := make(chan interface{}, 1)
c <- m
select {
case c <- m:
default:
}
select {
case c <- m:
case c <- &mt{}:
default:
}
}
func TestPseudoRandomSend(t *testing.T) {
n := 100
for _, chanCap := range []int{0, n} {
c := make(chan int, chanCap)
l := make([]int, n)
var m sync.Mutex
m.Lock()
go func() {
for i := 0; i < n; i++ {
runtime.Gosched()
l[i] = <-c
}
m.Unlock()
}()
for i := 0; i < n; i++ {
select {
case c <- 1:
case c <- 0:
}
}
m.Lock() // wait
n0 := 0
n1 := 0
for _, i := range l {
n0 += (i + 1) % 2
n1 += i
}
if n0 <= n/10 || n1 <= n/10 {
t.Errorf("Want pseudorandom, got %d zeros and %d ones (chan cap %d)", n0, n1, chanCap)
}
}
}
func TestMultiConsumer(t *testing.T) {
const nwork = 23
const niter = 271828
pn := []int{2, 3, 7, 11, 13, 17, 19, 23, 27, 31}
q := make(chan int, nwork*3)
r := make(chan int, nwork*3)
// workers
var wg sync.WaitGroup
for i := 0; i < nwork; i++ {
wg.Add(1)
go func(w int) {
for v := range q {
// mess with the fifo-ish nature of range
if pn[w%len(pn)] == v {
runtime.Gosched()
}
r <- v
}
wg.Done()
}(i)
}
// feeder & closer
expect := 0
go func() {
for i := 0; i < niter; i++ {
v := pn[i%len(pn)]
expect += v
q <- v
}
close(q) // no more work
wg.Wait() // workers done
close(r) // ... so there can be no more results
}()
// consume & check
n := 0
s := 0
for v := range r {
n++
s += v
}
if n != niter || s != expect {
t.Errorf("Expected sum %d (got %d) from %d iter (saw %d)",
expect, s, niter, n)
}
}
func TestShrinkStackDuringBlockedSend(t *testing.T) {
// make sure that channel operations still work when we are
// blocked on a channel send and we shrink the stack.
// NOTE: this test probably won't fail unless stack1.go:stackDebug
// is set to >= 1.
const n = 10
c := make(chan int)
done := make(chan struct{})
go func() {
for i := 0; i < n; i++ {
c <- i
// use lots of stack, briefly.
stackGrowthRecursive(20)
}
done <- struct{}{}
}()
for i := 0; i < n; i++ {
x := <-c
if x != i {
t.Errorf("bad channel read: want %d, got %d", i, x)
}
// Waste some time so sender can finish using lots of stack
// and block in channel send.
time.Sleep(1 * time.Millisecond)
// trigger GC which will shrink the stack of the sender.
runtime.GC()
}
<-done
}
func TestSelectDuplicateChannel(t *testing.T) {
// This test makes sure we can queue a G on
// the same channel multiple times.
c := make(chan int)
d := make(chan int)
e := make(chan int)
// goroutine A
go func() {
select {
case <-c:
case <-c:
case <-d:
}
e <- 9
}()
time.Sleep(time.Millisecond) // make sure goroutine A gets queued first on c
// goroutine B
go func() {
<-c
}()
time.Sleep(time.Millisecond) // make sure goroutine B gets queued on c before continuing
d <- 7 // wake up A, it dequeues itself from c. This operation used to corrupt c.recvq.
<-e // A tells us it's done
c <- 8 // wake up B. This operation used to fail because c.recvq was corrupted (it tries to wake up an already running G instead of B)
}
var selectSink interface{}
func TestSelectStackAdjust(t *testing.T) {
// Test that channel receive slots that contain local stack
// pointers are adjusted correctly by stack shrinking.
c := make(chan *int)
d := make(chan *int)
ready1 := make(chan bool)
ready2 := make(chan bool)
f := func(ready chan bool, dup bool) {
// Temporarily grow the stack to 10K.
stackGrowthRecursive((10 << 10) / (128 * 8))
// We're ready to trigger GC and stack shrink.
ready <- true
val := 42
var cx *int
cx = &val
var c2 chan *int
var d2 chan *int
if dup {
c2 = c
d2 = d
}
// Receive from d. cx won't be affected.
select {
case cx = <-c:
case <-c2:
case <-d:
case <-d2:
}
// Check that pointer in cx was adjusted correctly.
if cx != &val {
t.Error("cx no longer points to val")
} else if val != 42 {
t.Error("val changed")
} else {
*cx = 43
if val != 43 {
t.Error("changing *cx failed to change val")
}
}
ready <- true
}
go f(ready1, false)
go f(ready2, true)
// Let the goroutines get into the select.
<-ready1
<-ready2
time.Sleep(10 * time.Millisecond)
// Force concurrent GC a few times.
var before, after runtime.MemStats
runtime.ReadMemStats(&before)
for i := 0; i < 100; i++ {
selectSink = new([1 << 20]byte)
runtime.ReadMemStats(&after)
if after.NumGC-before.NumGC >= 2 {
goto done
}
}
t.Fatal("failed to trigger concurrent GC")
done:
selectSink = nil
// Wake selects.
close(d)
<-ready1
<-ready2
}
type struct0 struct{}
func BenchmarkMakeChan(b *testing.B) {
b.Run("Byte", func(b *testing.B) {
var x chan byte
for i := 0; i < b.N; i++ {
x = make(chan byte, 8)
}
close(x)
})
b.Run("Int", func(b *testing.B) {
var x chan int
for i := 0; i < b.N; i++ {
x = make(chan int, 8)
}
close(x)
})
b.Run("Ptr", func(b *testing.B) {
var x chan *byte
for i := 0; i < b.N; i++ {
x = make(chan *byte, 8)
}
close(x)
})
b.Run("Struct", func(b *testing.B) {
b.Run("0", func(b *testing.B) {
var x chan struct0
for i := 0; i < b.N; i++ {
x = make(chan struct0, 8)
}
close(x)
})
b.Run("32", func(b *testing.B) {
var x chan struct32
for i := 0; i < b.N; i++ {
x = make(chan struct32, 8)
}
close(x)
})
b.Run("40", func(b *testing.B) {
var x chan struct40
for i := 0; i < b.N; i++ {
x = make(chan struct40, 8)
}
close(x)
})
})
}
func BenchmarkChanNonblocking(b *testing.B) {
myc := make(chan int)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
select {
case <-myc:
default:
}
}
})
}
func BenchmarkSelectUncontended(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
myc1 := make(chan int, 1)
myc2 := make(chan int, 1)
myc1 <- 0
for pb.Next() {
select {
case <-myc1:
myc2 <- 0
case <-myc2:
myc1 <- 0
}
}
})
}
func BenchmarkSelectSyncContended(b *testing.B) {
myc1 := make(chan int)
myc2 := make(chan int)
myc3 := make(chan int)
done := make(chan int)
b.RunParallel(func(pb *testing.PB) {
go func() {
for {
select {
case myc1 <- 0:
case myc2 <- 0:
case myc3 <- 0:
case <-done:
return
}
}
}()
for pb.Next() {
select {
case <-myc1:
case <-myc2:
case <-myc3:
}
}
})
close(done)
}
func BenchmarkSelectAsyncContended(b *testing.B) {
procs := runtime.GOMAXPROCS(0)
myc1 := make(chan int, procs)
myc2 := make(chan int, procs)
b.RunParallel(func(pb *testing.PB) {
myc1 <- 0
for pb.Next() {
select {
case <-myc1:
myc2 <- 0
case <-myc2:
myc1 <- 0
}
}
})
}
func BenchmarkSelectNonblock(b *testing.B) {
myc1 := make(chan int)
myc2 := make(chan int)
myc3 := make(chan int, 1)
myc4 := make(chan int, 1)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
select {
case <-myc1:
default:
}
select {
case myc2 <- 0:
default:
}
select {
case <-myc3:
default:
}
select {
case myc4 <- 0:
default:
}
}
})
}
func BenchmarkChanUncontended(b *testing.B) {
const C = 100
b.RunParallel(func(pb *testing.PB) {
myc := make(chan int, C)
for pb.Next() {
for i := 0; i < C; i++ {
myc <- 0
}
for i := 0; i < C; i++ {
<-myc
}
}
})
}
func BenchmarkChanContended(b *testing.B) {
const C = 100
myc := make(chan int, C*runtime.GOMAXPROCS(0))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for i := 0; i < C; i++ {
myc <- 0
}
for i := 0; i < C; i++ {
<-myc
}
}
})
}
func benchmarkChanSync(b *testing.B, work int) {
const CallsPerSched = 1000
procs := 2
N := int32(b.N / CallsPerSched / procs * procs)
c := make(chan bool, procs)
myc := make(chan int)
for p := 0; p < procs; p++ {
go func() {
for {
i := atomic.AddInt32(&N, -1)
if i < 0 {
break
}
for g := 0; g < CallsPerSched; g++ {
if i%2 == 0 {
<-myc
localWork(work)
myc <- 0
localWork(work)
} else {
myc <- 0
localWork(work)
<-myc
localWork(work)
}
}
}
c <- true
}()
}
for p := 0; p < procs; p++ {
<-c
}
}
func BenchmarkChanSync(b *testing.B) {
benchmarkChanSync(b, 0)
}
func BenchmarkChanSyncWork(b *testing.B) {
benchmarkChanSync(b, 1000)
}
func benchmarkChanProdCons(b *testing.B, chanSize, localWork int) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, 2*procs)
myc := make(chan int, chanSize)
for p := 0; p < procs; p++ {
go func() {
foo := 0
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
for i := 0; i < localWork; i++ {
foo *= 2
foo /= 2
}
myc <- 1
}
}
myc <- 0
c <- foo == 42
}()
go func() {
foo := 0
for {
v := <-myc
if v == 0 {
break
}
for i := 0; i < localWork; i++ {
foo *= 2
foo /= 2
}
}
c <- foo == 42
}()
}
for p := 0; p < procs; p++ {
<-c
<-c
}
}
func BenchmarkChanProdCons0(b *testing.B) {
benchmarkChanProdCons(b, 0, 0)
}
func BenchmarkChanProdCons10(b *testing.B) {
benchmarkChanProdCons(b, 10, 0)
}
func BenchmarkChanProdCons100(b *testing.B) {
benchmarkChanProdCons(b, 100, 0)
}
func BenchmarkChanProdConsWork0(b *testing.B) {
benchmarkChanProdCons(b, 0, 100)
}
func BenchmarkChanProdConsWork10(b *testing.B) {
benchmarkChanProdCons(b, 10, 100)
}
func BenchmarkChanProdConsWork100(b *testing.B) {
benchmarkChanProdCons(b, 100, 100)
}
func BenchmarkSelectProdCons(b *testing.B) {
const CallsPerSched = 1000
procs := runtime.GOMAXPROCS(-1)
N := int32(b.N / CallsPerSched)
c := make(chan bool, 2*procs)
myc := make(chan int, 128)
myclose := make(chan bool)
for p := 0; p < procs; p++ {
go func() {
// Producer: sends to myc.
foo := 0
// Intended to not fire during benchmarking.
mytimer := time.After(time.Hour)
for atomic.AddInt32(&N, -1) >= 0 {
for g := 0; g < CallsPerSched; g++ {
// Model some local work.
for i := 0; i < 100; i++ {
foo *= 2
foo /= 2
}
select {
case myc <- 1:
case <-mytimer:
case <-myclose:
}
}
}
myc <- 0
c <- foo == 42
}()
go func() {
// Consumer: receives from myc.
foo := 0
// Intended to not fire during benchmarking.
mytimer := time.After(time.Hour)
loop:
for {
select {
case v := <-myc:
if v == 0 {
break loop
}
case <-mytimer:
case <-myclose:
}
// Model some local work.
for i := 0; i < 100; i++ {
foo *= 2
foo /= 2
}
}
c <- foo == 42
}()
}
for p := 0; p < procs; p++ {
<-c
<-c
}
}
func BenchmarkChanCreation(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
myc := make(chan int, 1)
myc <- 0
<-myc
}
})
}
func BenchmarkChanSem(b *testing.B) {
type Empty struct{}
myc := make(chan Empty, runtime.GOMAXPROCS(0))
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
myc <- Empty{}
<-myc
}
})
}
func BenchmarkChanPopular(b *testing.B) {
const n = 1000
c := make(chan bool)
var a []chan bool
var wg sync.WaitGroup
wg.Add(n)
for j := 0; j < n; j++ {
d := make(chan bool)
a = append(a, d)
go func() {
for i := 0; i < b.N; i++ {
select {
case <-c:
case <-d:
}
}
wg.Done()
}()
}
for i := 0; i < b.N; i++ {
for _, d := range a {
d <- true
}
}
wg.Wait()
}
var (
alwaysFalse = false
workSink = 0
)
func localWork(w int) {
foo := 0
for i := 0; i < w; i++ {
foo /= (foo + 1)
}
if alwaysFalse {
workSink += foo
}
}
|