Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow Record Distributed Result Batches #544

Merged
merged 27 commits into from
Feb 10, 2022

Conversation

ddl-giuliocapolino
Copy link
Contributor

@ddl-giuliocapolino ddl-giuliocapolino commented Feb 8, 2022

Description

For performance and ease-of-use purposes, users may need to access data in arrow.Record format directly from queries, rather than getting it in sql.Rows format and having to covert it manually. This PR covers the implementation and tests to get these records via a new API endpoint called GetResultBatches. We use ResultBatch objects that can independently fetch chunks of data in arrow.Record format. GetResultBatches returns such objects in the correct order as to allow the user to compose, in parallel or not, the full data in Arrow format. Below is a sample client request to fetch the data:

rbCtx := sf.WithDistributedResultBatches(ctx)
d := sf.SnowflakeDriver{}
db, err := d.Open(dsn)
if err != nil {
	return nil, err
}
snowflakeDb := db.(sf.SnowflakeConnection)

rows, err := snowflakeDb.QueryContext(rbCtx, query, []driver.NamedValue{})
if err != nil {
	return nil, err
}

batches, err := rows.(sf.SnowflakeRows).GetBatches()
if err != nil {
	return nil, err
}

For a client using data in arrow.Record format, this can result in more than a 10x speedup in retrieving usable data, as well as provides the client with the ability to manage memory consumption independently.

Checklist

  • Code compiles correctly
  • Run make fmt to fix inconsistent formats
  • Run make lint to get lint errors and fix all of them
  • Created tests which fail without the change (if possible)
  • All tests passing
  • Extended the README / documentation, if necessary

ddl-giuliocapolino and others added 23 commits December 8, 2021 10:34
…w_record_channel

WithArrowRecordsChan - channel in context for Arrow Records
…_converter

Converter for Snowflake arrow.Record type
…ork-repo

ddl: update go.mod and publish fork
…ributed-batches

withDistributedBatches refactor
…-dominodatalab-module

Reverting dominodatalab module
Copy link
Contributor

@sfc-gh-jbahk sfc-gh-jbahk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great; thank you for your work. I had a couple questions that I'd love to hear your answers on before we proceed.

rows.go Outdated
@@ -27,6 +27,11 @@ var (
maxChunkDownloaderErrorCounter = 5
)

// SnowflakeRows provides an API for methods exposed to the clients
type SnowflakeRows interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this makes sense, I think we can consolidate this into SnowflakeResult since this object can and should be able to return both a query ID and its status as well. Asking the user to convert this into separate interfaces seems cumbersome for both sides in terms of usability & maintenance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Felt a bit strange to have SnowflakeResult contain an endpoint related to data since Result is usually referred to by an exec query that does not return any data, but I agree it's better to expose only one interface. Related to that, would you prefer that the implementation of GetBatches() ([]*ResultBatch, error) by snowflakeResult returns an error, or returns an empty array (and nil)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed it out with the former (returns an error), but happy to change it to empty array if you rather have that!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely understandable - this SnowflakeResult object was meant to be detached from the snowflakeResult (related to Exec) however, and actually denote a literal result. Perhaps a confusing misnomer on our end.

I think the return value seems fine as is for now. I'll let you know if it needs changing or change it myself.

chunk_test.go Outdated
@@ -387,3 +388,44 @@ func TestWithStreamDownloader(t *testing.T) {
}
})
}

func TestWithDistributedResultBatches(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test succinctly demonstrates the API pathway, but is there another test that can be added to demonstrate that these batches are indeed fetched in a distributed manner? 100 rows is not a lot and would probably be fetched within one chunk. This test doesn't demonstrate to me that these batches can be called independently.

More importantly, with these changes, do you see your memory issues improved? Solved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good - I will have a first PR iteration with the logic changes, and a second only focusing on this test so that we can iterate on the former first! In terms of memory, it's fully up to the user to handle the workers as they wish - in our case we optimized the client side for parallel downloading, but releasing the records as soon as they've been read and written, and we have not seen any memory issue (queried all the way up to 10GB with very little memory pressure but really good performance).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ changed it to 3000 rows which is approximately 6 ResultBatch objects (ran it a few times and, on average, it's 6 chunks of data)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test seems better but is there a way to explicitly illustrate these batches being fetched on in different worker nodes? Something like multiple goroutines, perhaps? Maybe there are some other ways to demonstrate this but at face value, this test doesn't tell me much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just updated the test with clear download workers - it is now checking also that the metadata received (rowcount) is correct :)

}

// Fetch returns an array of records representing a chunk in the query
func (rb *ResultBatch) Fetch() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious as to the design here: shouldn't this return the actual array.Record itself (along with an error), instead of storing the the record as an accessible object within the ResultBatch struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial rationale was to not have a double memory burden but since the object contains []*array.Record, it won't hurt to have it twice so that the user can Fetch it, but also re-use it in the future if necessary (double benefit). Changing it now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the object need to contain the record? Can it just be a product of the Fetch() function and nothing else? A close analogy would be returning a row object after a query, but you'd need to query the rows again in order to receive the same rows again; the driver won't do that for you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we do need the Rec on the object because we are leveraging the downloadChunkHelper which only returns an error. It's analogous to how rows are downloaded in chunks within the downloadChunkHelper and then are applied to the scd.Chunks object, without explicitly being returned. I don't think we can omit that without having a major refactor on how the download workflow happens unfortunately. However, we can make Rec non-public (rec) if you rather not have it exposed by the object, but we thought it'd be nice for a user to have a pointer to it within the object itself in case it is needed (it's a cheap extra functionality) - up to you though if you'd rather have it lowercase!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick turnaround. I think the rec object might best be left private so the users can't control it and have fetch act as a getter (as you've done).

It looks overall very good to me - I'm going to go over these changes today in a design review and come back with final feedback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good. I will change it to private later this evening along with the clearer test. Consider it done for your design review!

Copy link
Contributor

@sfc-gh-jbahk sfc-gh-jbahk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a design review yesterday and there were a couple points --

  • the ultimate point we wanted to reach with distributed fetching was being able serialize these ResultBatches so workers across different machines are able to just pick up them independently

  • the fact that we require the chunkdownloader as a member in the struct hinders this and requires some refactoring, which we (Snowflake) will get to

  • this has a very good use case in and of itself but we envision having to reserve this API/pathway ​so we'd like to merge it with a couple nomenclature tweak

@@ -34,6 +35,7 @@ type chunkDownloader interface {
getRowType() []execResponseRowType
setNextChunkDownloader(downloader chunkDownloader)
getNextChunkDownloader() chunkDownloader
getResultBatches() []*ResultBatch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getArrowBatches() []*ArrowBatch

chunk_test.go Outdated
@@ -387,3 +389,78 @@ func TestWithStreamDownloader(t *testing.T) {
}
})
}

func TestWithDistributedResultBatches(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestWithArrowBatches()

@@ -630,3 +683,43 @@ func copyChunkStream(body io.Reader, rows chan<- []*string) error {
}
return nil
}

// ResultBatch object represents a chunk of data, or subset of rows, retrievable in array.Record format
type ResultBatch struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrowBatch

@@ -245,6 +259,33 @@ func getChunk(
return newRetryHTTP(ctx, scd.sc.rest.Client, http.NewRequest, u, headers, timeout).execute()
}

func (scd *snowflakeChunkDownloader) startDistributedBatches() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startArrowBatches()

result.go Outdated
type SnowflakeResult interface {
GetQueryID() string
GetStatus() queryStatus
GetBatches() ([]*ResultBatch, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetArrowBatches()

util.go Outdated
@@ -23,6 +23,7 @@ const (
fileStreamFile contextKey = "STREAMING_PUT_FILE"
fileTransferOptions contextKey = "FILE_TRANSFER_OPTIONS"
enableHigherPrecision contextKey = "ENABLE_HIGHER_PRECISION"
distributedResultBatches contextKey = "DISTRIBUTED_RESULT_BATCH"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrowBatches

util.go Outdated
@@ -83,6 +84,12 @@ func WithHigherPrecision(ctx context.Context) context.Context {
return context.WithValue(ctx, enableHigherPrecision, true)
}

// WithDistributedResultBatches returns a context that allows users to retrieve
// array.Record download workers upon querying
func WithDistributedResultBatches(ctx context.Context) context.Context {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WithArrowBatches()

@sfc-gh-jbahk
Copy link
Contributor

Local tests pass successfully. Merging.

@sfc-gh-jbahk sfc-gh-jbahk merged commit 1bd84af into snowflakedb:master Feb 10, 2022
@ddl-giuliocapolino
Copy link
Contributor Author

Thanks @sfc-gh-jbahk !

agam pushed a commit to sigmacomputing/gosnowflake that referenced this pull request Feb 15, 2022
* arrow record channel in context impl + unit test

* iteration on comments 1

* changed mutex map struct to sync.map

* final iteration on comments

* added converter for snowflake records

* added converter tests

* releasing record as soon as possible

* fix value has non-zero nanoseconds error

* iteration on comments

* withDistributedBatches refactor

* reverting withArrowRecordChan changes

* adjusting go.mod for forking

* first iteration on comments

* final iteration on comments

* reverting module change

* revert exposure of SnowflakeRows, QueryContext, and change signature of Fetch

* improved chunk_test to query more than 1 chunk

* improved chunk_test to show download workers, added metadata, and made Rec private

* nomenclature tweaks
@stoffeastrom
Copy link

Hi 👋

I was playing around with the latest release where this is included but I can't seem to access the arrow batches? This seems to be completely internal. The example from @ddl-giuliocapolino seems outdated.

rbCtx := sf.WithDistributedResultBatches(ctx)
d := sf.SnowflakeDriver{}
db, err := d.Open(dsn)
if err != nil {
	return nil, err
}
snowflakeDb := db.(sf.SnowflakeConnection)

rows, err := snowflakeDb.QueryContext(rbCtx, query, []driver.NamedValue{})
if err != nil {
	return nil, err
}

batches, err := rows.(sf.SnowflakeRows).GetBatches()
if err != nil {
	return nil, err
}

Seems to suggest it could be access through SnowflakeConnection but this interface only has

type SnowflakeConnection interface {
	GetQueryStatus(ctx context.Context, queryID string) (*SnowflakeQueryStatus, error)
}

but I might misinterpret things :D

agam added a commit to sigmacomputing/gosnowflake that referenced this pull request Feb 22, 2022
* Refactor & Lint (snowflakedb#525)

* move function calls

* fix lint

* SNOW-530866: Bumped up GoLang connector PATCH version from 1.6.5 to 1.6.6 (snowflakedb#529)

* SNOW-534077 Multi-Statement Missing Result IDs (snowflakedb#534)

* SNOW-440362 Implement UUIDv4 (snowflakedb#541)

* implement uuid

* unexport rfc

* SNOW-535399 Fix GetQueryStatus Bug (snowflakedb#539)

* change errorcode to string

* fix emtpy query status

* fix null pointer dereference

* SNOW-521578 PUT Memory Enhancements (snowflakedb#527)

* modify block size for azure

* fix incorrect source

* open file v read

* move chunk out of loop

* Arrow Record Result Batches (snowflakedb#544)

* arrow record channel in context impl + unit test

* iteration on comments 1

* changed mutex map struct to sync.map

* final iteration on comments

* added converter for snowflake records

* added converter tests

* releasing record as soon as possible

* fix value has non-zero nanoseconds error

* iteration on comments

* withDistributedBatches refactor

* reverting withArrowRecordChan changes

* adjusting go.mod for forking

* first iteration on comments

* final iteration on comments

* reverting module change

* revert exposure of SnowflakeRows, QueryContext, and change signature of Fetch

* improved chunk_test to query more than 1 chunk

* improved chunk_test to show download workers, added metadata, and made Rec private

* nomenclature tweaks

* Formatting & Lint (snowflakedb#545)

* Fix Select 1 Bug (snowflakedb#511)

Make port optional

Co-authored-by: sfc-gh-jbahk <[email protected]>

* SNOW-526255 Fix Time Scale for Arrow (snowflakedb#547)

* fix time scale issue

* add timezone in ltz

* remove high precision context

* Allow Client to specify ClientTimeout in the DSN (snowflakedb#484)

* add failing test

* add fix

* SNOW-544029: Bumped up GoLang connector PATCH version from 1.6.6 to 1.6.7 (snowflakedb#548) (snowflakedb#549)

* Fixes after syncing with upstream.

* Modify instructions to reflect the process I followed this time.

Co-authored-by: sfc-gh-jbahk <[email protected]>
Co-authored-by: Kiran Dama <[email protected]>
Co-authored-by: ddl-giuliocapolino <[email protected]>
Co-authored-by: yiksanchan <[email protected]>
Co-authored-by: Greg Owen <[email protected]>
Co-authored-by: Agam Brahma <[email protected]>
@stoffeastrom
Copy link

@sfc-gh-jbahk Pls see my comment above. Can we get an example of how to use arrow batches directly?

@sfc-gh-jbahk
Copy link
Contributor

Hi @stoffeastrom, this is an internal feature and we have not publicly documented it for this reason.

I also don't actively develop the go driver anymore - please tag @sfc-gh-cshi for future inquiries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants