-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwriter.go
175 lines (143 loc) · 4.42 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package outbox
import (
"context"
"database/sql"
"fmt"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/jackc/pgx/v5"
"github.com/nikolayk812/pgx-outbox/types"
)
// Writer writes outbox messages to a single outbox table.
// To write messages to multiple outbox tables, create multiple Writer instances.
// An outbox message must be written in the same transaction as business entities,
// hence the tx argument which supports both pgx.Tx and *sql.Tx.
// Implementations must be safe for concurrent use by multiple goroutines.
type Writer interface {
// Write writes the message to the outbox table.
// It returns the ID of the newly inserted message.
Write(ctx context.Context, tx Tx, message types.Message) (int64, error)
// WriteBatch writes multiple messages to the outbox table.
// It returns the IDs of the newly inserted messages.
// It returns an error if any of the messages fail to write.
WriteBatch(ctx context.Context, tx pgx.Tx, messages []types.Message) ([]int64, error)
}
type writer struct {
table string
usePreparedBatch bool
}
func NewWriter(table string, opts ...WriteOption) (Writer, error) {
if table == "" {
return nil, ErrTableEmpty
}
w := &writer{
table: table,
usePreparedBatch: true,
}
for _, opt := range opts {
opt(w)
}
return w, nil
}
// Write returns an error if
// - tx is nil or unsupported
// - message is invalid
// - write operation fails.
func (w *writer) Write(ctx context.Context, tx Tx, message types.Message) (int64, error) {
if tx == nil {
return 0, ErrTxNil
}
if err := message.Validate(); err != nil {
return 0, fmt.Errorf("message.Validate: %w", err)
}
ib := sq.StatementBuilder.PlaceholderFormat(sq.Dollar).
Insert(w.table).
Columns("broker", "topic", "metadata", "payload").
Values(message.Broker, message.Topic, message.Metadata, string(message.Payload)).
Suffix("RETURNING id")
query, args, err := ib.ToSql()
if err != nil {
return 0, fmt.Errorf("ib.ToSql: %w", err)
}
row, err := queryRow(ctx, tx, query, args...)
if err != nil {
return 0, fmt.Errorf("queryRow: %w", err)
}
var id int64
if err := row.Scan(&id); err != nil {
return 0, fmt.Errorf("row.Scan: %w", err)
}
return id, nil
}
// WriteBatch uses batching feature of the pgx driver,
// by default it uses prepared statements for batch writes, but it can be disabled using WithDisablePreparedBatch option.
// WriteBatch returns an error if
// - tx is nil
// - any message is invalid
// - write operation fails.
//
//nolint:cyclop
func (w *writer) WriteBatch(ctx context.Context, tx pgx.Tx, messages []types.Message) (_ []int64, txErr error) {
if tx == nil {
return nil, ErrTxNil
}
if len(messages) == 0 {
return nil, nil
}
if err := types.Messages(messages).Validate(); err != nil {
return nil, fmt.Errorf("messages.Validate: %w", err)
}
if len(messages) == 1 {
id, err := w.Write(ctx, tx, messages[0])
if err != nil {
return nil, fmt.Errorf("w.Write: %w", err)
}
return []int64{id}, nil
}
query := fmt.Sprintf("INSERT INTO %s (broker, topic, metadata, payload) VALUES ($1, $2, $3, $4) RETURNING id", w.table)
if w.usePreparedBatch {
prepareStatementName := fmt.Sprintf("%s_write_batch_%d", w.table, time.Now().UnixMilli())
_, err := tx.Prepare(ctx, prepareStatementName, query)
if err != nil {
return nil, fmt.Errorf("tx.Prepare: %w", err)
}
query = prepareStatementName
}
batch := &pgx.Batch{}
for _, message := range messages {
batch.Queue(query,
message.Broker, message.Topic, message.Metadata, string(message.Payload))
}
br := tx.SendBatch(ctx, batch)
defer func() {
if err := br.Close(); err != nil {
txErr = fmt.Errorf("br.Close: %w", err)
}
}()
// Collect all returned IDs
ids := make([]int64, 0, len(messages))
for range messages {
row := br.QueryRow()
var id int64
if err := row.Scan(&id); err != nil {
return nil, fmt.Errorf("row.Scan: %w", err)
}
ids = append(ids, id)
}
return ids, nil
}
// Tx is a transaction interface to support both and pgx.Tx and *sql.Tx.
// As pgx.Tx and *sql.Tx do not have common method signatures, this is empty interface.
type Tx interface{}
func queryRow(ctx context.Context, tx Tx, q string, args ...interface{}) (pgx.Row, error) {
var row pgx.Row
switch t := tx.(type) {
case *sql.Tx:
row = t.QueryRowContext(ctx, q, args...)
case pgx.Tx:
row = t.QueryRow(ctx, q, args...)
default:
return nil, fmt.Errorf("%w: %T", ErrTxUnsupportedType, tx)
}
return row, nil
}