Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow Record Distributed Result Batches #544

Merged
merged 27 commits into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9c26d05
arrow record channel in context impl + unit test
ddl-giuliocapolino Dec 8, 2021
a907324
iteration on comments 1
ddl-giuliocapolino Dec 11, 2021
857f8b6
changed mutex map struct to sync.map
ddl-giuliocapolino Dec 13, 2021
a52ba51
final iteration on comments
ddl-giuliocapolino Dec 27, 2021
fce1c82
Merge pull request #1 from dominodatalab/ddl-giuliocapolino.with_arro…
ddl-giuliocapolino Dec 27, 2021
4d0efe7
added converter for snowflake records
ddl-giuliocapolino Dec 15, 2021
ee807bf
added converter tests
ddl-giuliocapolino Dec 16, 2021
8943664
releasing record as soon as possible
ddl-giuliocapolino Dec 16, 2021
bb07f34
fix value has non-zero nanoseconds error
ddl-giuliocapolino Dec 20, 2021
6dae32a
iteration on comments
ddl-giuliocapolino Dec 28, 2021
f56da50
Merge pull request #2 from dominodatalab/ddl-giuliocapolino.add_arrow…
ddl-giuliocapolino Dec 28, 2021
ccfbe40
Merge branch 'master' into master
ddl-giuliocapolino Jan 12, 2022
4874c89
withDistributedBatches refactor
ddl-giuliocapolino Jan 18, 2022
701e083
Merge branch 'master' into ddl-giuliocapolino.with-distributed-batches
ddl-giuliocapolino Jan 20, 2022
b18a4de
reverting withArrowRecordChan changes
ddl-giuliocapolino Jan 20, 2022
5023beb
adjusting go.mod for forking
ddl-giuliocapolino Jan 21, 2022
8b89936
Merge pull request #5 from dominodatalab/ddl-giuliocapolino.publish-f…
ddl-giuliocapolino Jan 24, 2022
82fbc2f
first iteration on comments
ddl-giuliocapolino Jan 26, 2022
37799e5
final iteration on comments
ddl-giuliocapolino Feb 8, 2022
d39e2e6
Merge pull request #3 from dominodatalab/ddl-giuliocapolino.with-dist…
ddl-giuliocapolino Feb 8, 2022
aedd242
Merge remote-tracking branch 'upstream/master'
ddl-giuliocapolino Feb 8, 2022
da0f03b
reverting module change
ddl-giuliocapolino Feb 8, 2022
f4e0b06
Merge pull request #7 from dominodatalab/ddl-giuliocapolino.reverting…
ddl-giuliocapolino Feb 8, 2022
d9a0df1
revert exposure of SnowflakeRows, QueryContext, and change signature …
ddl-giuliocapolino Feb 8, 2022
3756cf8
improved chunk_test to query more than 1 chunk
ddl-giuliocapolino Feb 8, 2022
b5a4331
improved chunk_test to show download workers, added metadata, and mad…
ddl-giuliocapolino Feb 10, 2022
ed3fe29
nomenclature tweaks
ddl-giuliocapolino Feb 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions arrow_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/base64"
"io"

"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
)
Expand Down Expand Up @@ -52,6 +53,27 @@ func (arc *arrowResultChunk) decodeArrowChunk(rowType []execResponseRowType, hig
}
}

func (arc *arrowResultChunk) decodeArrowBatch(scd *snowflakeChunkDownloader) (*[]array.Record, error) {
var records []array.Record

for {
rawRecord, err := arc.reader.Read()
if err == io.EOF {
break
} else if err != nil {
return nil, err
}
record, err := arrowToRecord(rawRecord, scd.RowSet.RowType)
rawRecord.Release()
if err != nil {
return nil, err
}
record.Retain()
records = append(records, record)
}
return &records, nil
}

// Build arrow chunk based on RowSet of base64
func buildFirstArrowChunk(rowsetBase64 string) arrowResultChunk {
rowSetBytes, err := base64.StdEncoding.DecodeString(rowsetBase64)
Expand Down
80 changes: 80 additions & 0 deletions chunk_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync"
"time"

"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
)
Expand All @@ -34,6 +35,7 @@ type chunkDownloader interface {
getRowType() []execResponseRowType
setNextChunkDownloader(downloader chunkDownloader)
getNextChunkDownloader() chunkDownloader
getResultBatches() []*ResultBatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getArrowBatches() []*ArrowBatch

}

type snowflakeChunkDownloader struct {
Expand All @@ -55,9 +57,11 @@ type snowflakeChunkDownloader struct {
ChunksFinalErrors []*chunkError
ChunksMutex *sync.Mutex
DoneDownloadCond *sync.Cond
FirstBatch *ResultBatch
NextDownloader chunkDownloader
Qrmk string
QueryResultFormat string
ResultBatches []*ResultBatch
RowSet rowSetType
FuncDownload func(context.Context, *snowflakeChunkDownloader, int)
FuncDownloadHelper func(context.Context, *snowflakeChunkDownloader, int) error
Expand Down Expand Up @@ -88,6 +92,9 @@ func (scd *snowflakeChunkDownloader) nextResultSet() error {
}

func (scd *snowflakeChunkDownloader) start() error {
if usesDistributedBatches(scd.ctx) {
return scd.startDistributedBatches()
}
scd.CurrentChunkSize = len(scd.RowSet.JSON) // cache the size
scd.CurrentIndex = -1 // initial chunks idx
scd.CurrentChunkIndex = -1 // initial chunk
Expand Down Expand Up @@ -231,6 +238,13 @@ func (scd *snowflakeChunkDownloader) getRowType() []execResponseRowType {
return scd.RowSet.RowType
}

func (scd *snowflakeChunkDownloader) getResultBatches() []*ResultBatch {
if scd.FirstBatch.Rec == nil {
return scd.ResultBatches
}
return append([]*ResultBatch{scd.FirstBatch}, scd.ResultBatches...)
}

func getChunk(
ctx context.Context,
scd *snowflakeChunkDownloader,
Expand All @@ -245,6 +259,33 @@ func getChunk(
return newRetryHTTP(ctx, scd.sc.rest.Client, http.NewRequest, u, headers, timeout).execute()
}

func (scd *snowflakeChunkDownloader) startDistributedBatches() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startArrowBatches()

var err error
chunkMetaLen := len(scd.ChunkMetas)
firstArrowChunk := buildFirstArrowChunk(scd.RowSet.RowSetBase64)
scd.FirstBatch = &ResultBatch{
idx: 0,
scd: scd,
funcDownloadHelper: scd.FuncDownloadHelper,
}
// decode first chunk if possible
if firstArrowChunk.allocator != nil {
scd.FirstBatch.Rec, err = firstArrowChunk.decodeArrowBatch(scd)
if err != nil {
return err
}
}
scd.ResultBatches = make([]*ResultBatch, chunkMetaLen)
for i := 0; i < chunkMetaLen; i++ {
scd.ResultBatches[i] = &ResultBatch{
idx: i,
scd: scd,
funcDownloadHelper: scd.FuncDownloadHelper,
}
}
return nil
}

/* largeResultSetReader is a reader that wraps the large result set with leading and tailing brackets. */
type largeResultSetReader struct {
status int
Expand Down Expand Up @@ -380,6 +421,12 @@ func decodeChunk(scd *snowflakeChunkDownloader, idx int, bufStream *bufio.Reader
int(scd.totalUncompressedSize()),
memory.NewGoAllocator(),
}
if usesDistributedBatches(scd.ctx) {
if scd.ResultBatches[idx].Rec, err = arc.decodeArrowBatch(scd); err != nil {
return err
}
return nil
}
highPrec := higherPrecisionEnabled(scd.ctx)
respd, err = arc.decodeArrowChunk(scd.RowSet.RowType, highPrec)
if err != nil {
Expand Down Expand Up @@ -512,6 +559,10 @@ func (scd *streamChunkDownloader) getRowType() []execResponseRowType {
return scd.RowSet.RowType
}

func (scd *streamChunkDownloader) getResultBatches() []*ResultBatch {
return nil
}

func useStreamDownloader(ctx context.Context) bool {
val := ctx.Value(streamChunkDownload)
if val == nil {
Expand Down Expand Up @@ -630,3 +681,32 @@ func copyChunkStream(body io.Reader, rows chan<- []*string) error {
}
return nil
}

// ResultBatch object represents a chunk of data, or subset of rows, retrievable in array.Record format
type ResultBatch struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrowBatch

Rec *[]array.Record
idx int
scd *snowflakeChunkDownloader
funcDownloadHelper func(context.Context, *snowflakeChunkDownloader, int) error
}

// Fetch returns an array of records representing a chunk in the query
func (rb *ResultBatch) Fetch() (*[]array.Record, error) {
// chunk has already been downloaded
if rb.Rec != nil {
return rb.Rec, nil
}
if err := rb.funcDownloadHelper(context.Background(), rb.scd, rb.idx); err != nil {
return nil, err
}
return rb.Rec, nil
}

func usesDistributedBatches(ctx context.Context) bool {
val := ctx.Value(distributedResultBatches)
if val == nil {
return false
}
a, ok := val.(bool)
return a && ok
}
42 changes: 42 additions & 0 deletions chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package gosnowflake
import (
"bytes"
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -387,3 +388,44 @@ func TestWithStreamDownloader(t *testing.T) {
}
})
}

func TestWithDistributedResultBatches(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test succinctly demonstrates the API pathway, but is there another test that can be added to demonstrate that these batches are indeed fetched in a distributed manner? 100 rows is not a lot and would probably be fetched within one chunk. This test doesn't demonstrate to me that these batches can be called independently.

More importantly, with these changes, do you see your memory issues improved? Solved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good - I will have a first PR iteration with the logic changes, and a second only focusing on this test so that we can iterate on the former first! In terms of memory, it's fully up to the user to handle the workers as they wish - in our case we optimized the client side for parallel downloading, but releasing the records as soon as they've been read and written, and we have not seen any memory issue (queried all the way up to 10GB with very little memory pressure but really good performance).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ changed it to 3000 rows which is approximately 6 ResultBatch objects (ran it a few times and, on average, it's 6 chunks of data)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test seems better but is there a way to explicitly illustrate these batches being fetched on in different worker nodes? Something like multiple goroutines, perhaps? Maybe there are some other ways to demonstrate this but at face value, this test doesn't tell me much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just updated the test with clear download workers - it is now checking also that the metadata received (rowcount) is correct :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestWithArrowBatches()

ctx := WithDistributedResultBatches(context.Background())
numrows := 3000 // approximately 6 ResultBatch objects
config, err := ParseDSN(dsn)
if err != nil {
t.Error(err)
}
sc, err := buildSnowflakeConn(ctx, *config)
if err != nil {
t.Error(err)
}
if err = authenticateWithConfig(sc); err != nil {
t.Error(err)
}

query := fmt.Sprintf(selectRandomGenerator, numrows)
rows, err := sc.QueryContext(ctx, query, []driver.NamedValue{})
if err != nil {
t.Error(err)
}
defer rows.Close()
batches, err := rows.(*snowflakeRows).GetBatches()
if err != nil {
t.Error(err)
}

cnt := 0
for _, b := range batches {
_, err := b.Fetch()
if err != nil {
t.Error(err)
}
for _, r := range *b.Rec {
cnt += int(r.NumRows())
}
}
if cnt != numrows {
t.Errorf("number of rows didn't match. expected: %v, got: %v", numrows, cnt)
}
}
Loading