-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgrpc_example_test.go
137 lines (118 loc) · 2.89 KB
/
grpc_example_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package example
import (
"context"
"fmt"
"io"
"net"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
v1 "sample-go-apps/grpc-example/v1"
)
// Used to be able to connect to a local, in-memory server.
type DialerFunc func(string, time.Duration) (net.Conn, error)
func TestTraceObserverRoundTrip(t *testing.T) {
for i := 0; i < 50; i++ {
test(t)
}
}
func test(t *testing.T) {
s := newTestObsServer(t)
defer s.Close()
createAndRunClient(s.dialer, 5)
s.expectServer.WaitForSpans(t, 5, 2*time.Second)
}
func createAndRunClient(dialer DialerFunc, count int) {
conn, _ := grpc.Dial("bufnet",
grpc.WithDialer(dialer),
grpc.WithInsecure(),
)
defer func() {
// This is the key sleep here
time.Sleep(500 * time.Millisecond)
_ = conn.Close()
}()
serviceClient := v1.NewIngestServiceClient(conn)
spanClient, _ := serviceClient.RecordSpan(context.Background())
defer func() {
if err := spanClient.CloseSend(); err != nil {
fmt.Println("error closing trace observer sender")
}
}()
for i := 0; i < count; i++ {
span := v1.Span{TraceId: "something"}
if err := spanClient.Send(&span); err != nil {
fmt.Println("error sending span")
}
}
}
type expectServer struct {
spansReceivedChan chan struct{}
}
func (s *expectServer) RecordSpan(stream v1.IngestService_RecordSpanServer) error {
for {
_, err := stream.Recv()
if err == io.EOF {
return nil
} else if nil != err {
return err
}
s.spansReceivedChan <- struct{}{}
}
}
// A server and the information needed to connect to it
type testObsServer struct {
*expectServer
server *grpc.Server
conn *grpc.ClientConn
dialer DialerFunc
}
func (ts *testObsServer) Close() {
_ = ts.conn.Close()
ts.server.Stop()
}
// Create an in-memory gRPC server
func newTestObsServer(t *testing.T) testObsServer {
grpcServer := grpc.NewServer()
s := &expectServer{
spansReceivedChan: make(chan struct{}, 10),
}
v1.RegisterIngestServiceServer(grpcServer, s)
lis := bufconn.Listen(1024 * 1024)
go grpcServer.Serve(lis)
bufDialer := func(string, time.Duration) (net.Conn, error) {
return lis.Dial()
}
conn, err := grpc.Dial("bufnet",
grpc.WithDialer(bufDialer),
grpc.WithInsecure(),
grpc.WithBlock(), // create the connection synchronously
)
if err != nil {
t.Fatal("failure to create ClientConn", err)
}
return testObsServer{
expectServer: s,
server: grpcServer,
conn: conn,
dialer: bufDialer,
}
}
// Wait until the expected number of Spans have been received, or the timeout is reached
func (s *expectServer) WaitForSpans(t *testing.T, expected int, secTimeout time.Duration) {
var rcvd int
timeout := time.NewTicker(secTimeout)
for {
select {
case <-s.spansReceivedChan:
rcvd++
if rcvd >= expected {
return
}
case <-timeout.C:
t.Errorf("Did not receive expected spans before timeout - expected %d but got %d", expected, rcvd)
return
}
}
}