Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1896153: Non deterministic errors with concurrent context aware queries in v1.12.1 #1292

Open
niger-prequel opened this issue Jan 24, 2025 · 14 comments
Assignees
Labels
bug Erroneous or unexpected behaviour status-triage_done Initial triage done, will be further handled by the driver team

Comments

@niger-prequel
Copy link

  1. What version of GO driver are you using?

1.12.1 and 1.12.0

  1. What operating system and processor architecture are you using?

Debian Bullseye x86

  1. What version of GO are you using?

go1.23.3

  1. Server version:* E.g. 1.90.1

9.1.0

  1. What did you do?

We are loading data into snowflake via the golang data. Our strategy is to use PUT... SQL commands to upload parquet files to an internal stage. then we use a COPY INTO... statement to publish the data. We leverage the database/sql golang abstraction . So generally our code will look like

package example

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "sync"
    "time"

    _ "github.com/snowflakedb/gosnowflake" // Snowflake driver
)

func UploadFilesAndCopy(db *sql.DB, stageName, tableName string, files []string) error {
    // Create a context that will be used for all operations
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()

    // A channel that will hold the file paths we want to upload
    fileCh := make(chan string)

    // We'll use a WaitGroup to ensure all goroutines finish
    var wg sync.WaitGroup

    // Start a fixed number of worker goroutines to process the file uploads
    workerCount := 5
    for i := 0; i < workerCount; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for file := range fileCh {
                putQuery := fmt.Sprintf(
                    "PUT file://%s @%s AUTO_COMPRESS=TRUE OVERWRITE=TRUE",
                    file, stageName,
                )
                if _, err := db.ExecContext(ctx, putQuery); err != nil {
                    log.Printf("failed to PUT file %q to stage %q: %v", file, stageName, err)
                }
            }
        }()
    }

    // Send file names into the channel for workers to pick up
    go func() {
        defer close(fileCh) // Close once we're done sending
        for _, file := range files {
            fileCh <- file
        }
    }()

    // Wait for all workers to finish uploading
    wg.Wait()

    // Now that all files are in the stage, run the COPY INTO command
    copyQuery := fmt.Sprintf(`COPY INTO %s FROM @%s FILE_FORMAT = (TYPE = PARQUET)`, tableName, stageName)
    if _, err := db.ExecContext(ctx, copyQuery); err != nil {
        return fmt.Errorf("failed to COPY INTO %s: %w", tableName, err)
    }

    return nil
}

We're experiencing issues with both 1.12.0 and 1.12.1. On 1.12.0, if context cancellation occurs, other running queries will fail with:

level=error msg="error: 000605: Identified SQL statement is not currently executing." func="gosnowflake.(*snowflakeConn).queryContextInternal" file="connection.go:410"

On 1.12.1 this goes away and we get the correct "context canceled" error message. However, we start experiencing non deterministic errors where the PUT commands will sometimes return an error error: 000605: Identified SQL statement is not currently executing. No context has been canceled in this case.

Going through the release notes and looking at the PRs for what changed, it seems like the #1248 may have introduced some kind of data race into the driver. We were able to get these errors to stop happening across our fleet by not sharing a context between the goroutines and creating a new child context for each spawned worker.

  1. What did you expect to see?

I expect the PUT queries to work concurrently as they did on 1.12.0 and the cancel context error message to reflect the behavior of 1.12.1.

  1. Can you set logging to DEBUG and collect the logs?

Not right now, this happens in production environments where its against our policy to collect these logs.

@niger-prequel niger-prequel added the bug Erroneous or unexpected behaviour label Jan 24, 2025
@github-actions github-actions bot changed the title Non deterministic errors with concurrent context aware queries in v1.12.1 SNOW-1896153: Non deterministic errors with concurrent context aware queries in v1.12.1 Jan 24, 2025
@sfc-gh-dszmolka sfc-gh-dszmolka self-assigned this Jan 27, 2025
@sfc-gh-dszmolka sfc-gh-dszmolka added the status-triage Issue is under initial triage label Jan 27, 2025
@sfc-gh-dszmolka
Copy link
Contributor

hi - thank you for letting us know about this issue and for the example. Will look into it.

@sfc-gh-dszmolka
Copy link
Contributor

I'm trying to reproduce the issue the following way:

  1. create stage and table, real simple
create stage mystage;
create table testtable (contents VARIANT);
  1. downloaded userdata{1..5}.parquet from https://github.com/Teradata/kylo/tree/master/samples/sample-data/parquet as a test input; which is a 1000 rows parquet file each. Although I don't think necessary that file format matters here, at this stage, I try to mimic that as well.

  2. using the UploadFilesAndCopy example you sent (thank you again!), put some minimal 'connect to Snowflake' capability around it, and the repro script looks like now:

package main 

import (
    "context"
    "database/sql"
    "fmt"
    "log"
    "os"
    "sync"
    "time"

    _ "github.com/snowflakedb/gosnowflake" // Snowflake driver
)

func UploadFilesAndCopy(db *sql.DB, stageName, tableName string, files []string) error {
    // Create a context that will be used for all operations
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
    defer cancel()

    // A channel that will hold the file paths we want to upload
    fileCh := make(chan string)

    // We'll use a WaitGroup to ensure all goroutines finish
    var wg sync.WaitGroup

    // Start a fixed number of worker goroutines to process the file uploads
    workerCount := 5
    for i := 0; i < workerCount; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for file := range fileCh {
                putQuery := fmt.Sprintf(
                    "PUT /* UploadFilesAndCopy */ file:///go/test/%s @%s AUTO_COMPRESS=TRUE OVERWRITE=TRUE",
                    file, stageName,
                )
                if _, err := db.ExecContext(ctx, putQuery); err != nil {
                    log.Printf("failed to PUT file %q to stage %q: %v\n", file, stageName, err)
                }
            }
        }()
    }

    // Send file names into the channel for workers to pick up
    go func() {
        defer close(fileCh) // Close once we're done sending
        for _, file := range files {
            fileCh <- file
        }
    }()

    // Wait for all workers to finish uploading
    wg.Wait()

    // Now that all files are in the stage, run the COPY INTO command
    copyQuery := fmt.Sprintf(`COPY INTO %s FROM @%s /* UploadFilesAndCopy */ FILE_FORMAT = (TYPE = PARQUET)`, tableName, stageName)
    if _, err := db.ExecContext(ctx, copyQuery); err != nil {
        return fmt.Errorf("failed to COPY INTO %s: %w", tableName, err)
    }

    return nil
}

func Cleanup(db *sql.DB) error {
    ctx := context.Background()
    queries := [2]string {
        "RM /* Cleanup() */ @GO1292.PUBLIC.MYSTAGE",
        "TRUNCATE TABLE /* Cleanup() */ GO1292.PUBLIC.TESTTABLE",
    }
    for _, query := range queries {
        log.Printf("==> Cleanup: %s\n", query)
        _, err := db.ExecContext(ctx, query)
        if err != nil {
            fmt.Errorf("==> Cleanup: failed to execute query due to error: %s\n", err)
            return err
        }
    }
    return nil
    
}

func main() {
    dsn := os.Getenv("TEST_DSN")
    if dsn == "" {
	log.Fatalf("==> Please set TEST_DSN envvar to an actual full DSN to connect with gosnowflake.\n")
    }

    db, err := sql.Open("snowflake", dsn)
    if err != nil {
        log.Fatalf("failed to connect. %v, err: %v", dsn, err)
    }
    defer db.Close()

    ctx := context.Background()
    conn, err := db.Conn(ctx)
    if err != nil {
    log.Fatalf("failed to connect, err: %v", err)
    }
    defer conn.Close()

    filesToUpload := []string{ "userdata1.parquet", "userdata2.parquet", "userdata3.parquet", "userdata4.parquet", "userdata5.parquet" }
    log.Printf("==> UploadFilesAndCopy start\n")
    err = UploadFilesAndCopy(db, "GO1292.PUBLIC.MYSTAGE", "GO1292.PUBLIC.TESTTABLE", filesToUpload)
    if err != nil {
        fmt.Errorf("==> UploadFilesAndCopy failed: %s\n", err)
    }
    err = Cleanup(db)
    if err != nil {
		fmt.Errorf("==> Cleanup failed: %s\n", err)
    }
}
  1. Then I ran this in 2 x 100 iterations and looked for the error you saw.
    I did not saw any, so either this needs even more iterations, or i'm doing something which doesn't represent what you're doing.

As of next step, would it be possible to please

  1. Check above repro program, if that's representative to what and how you're doing? If not, can you please point out the differences ? Or if you send a similar, runnable repro which does reproduce the issue, that would be of course very helpful.
  2. If there's any input data specific aspect to the issue (e.g. number of Parquet files to put, size of the Parquet files to put, etc), that would be probably important to know.
  3. If the issue is perhaps cloud-specific; I would like to attempt the repro on the same Snowflake deployment you're using. For the above test, I used AWS EU Frankfurt.

Thank you in advance !

@sfc-gh-dszmolka sfc-gh-dszmolka added the status-information_needed Additional information is required from the reporter label Jan 27, 2025
@niger-prequel
Copy link
Author

Check above repro program, if that's representative to what and how you're doing? If not, can you please point out the differences ? Or if you send a similar, runnable repro which does reproduce the issue, that would be of course very helpful.

Generally representative. The biggest differences I kind spot are:

  • We spawn a goroutine per file we upload, not a fixed set of workers.
  • after opening the db, we call db.SetMaxOpenConns(1)
  • we have client_session_keep_alive=TRUE in our dsn
  • we have AUTO_COMPRESS=FALSE
  • stage is a temp stage
  • it is possible for us to have completely unrelated queries (different tables and stages) running on other goroutines from the same db instance.

If there's any input data specific aspect to the issue (e.g. number of Parquet files to put, size of the Parquet files to put, etc), that would be probably important to know.

As far as I know there isn't anything data specific. We've seen this error when we're only uploading 1 file and seen it when we're uploading many. We could tell it was something with the version because only changing the dependency from 1.12.1 to 1.12.0 eliminated the error: 000605: Identified SQL statement is not currently executing. entirely on PUT commands. And vice versa for error: 000605: Identified SQL statement is not currently executing. on context cancellations. We isolated deploys to just toggling these versions.

If the issue is perhaps cloud-specific; I would like to attempt the repro on the same Snowflake deployment you're using. For the above test, I used AWS EU Frankfurt.

  • We've seen this occur on our test instances on GCP us-central1. However we've also seen it in production on customer snowflake instances that are not on GCP but on AWS or Azure in the US (not sure on regions).

@sfc-gh-dszmolka
Copy link
Contributor

thank you so much for the added details! a wild idea maybe, but if you could perhaps provide me with a numerical session_id in which you know the issue definitely happened, i can try to look it up and see what queries were running and hopefully that too, what happened with them.

session_id is something like 34072897429472 so shouldn't expose any sensitive or PII here (and i'm also taking a gamble here and hoping the session_id is unique enough across whole Snowflake to find your session so I wouldn't need any more 'close' id from you like account or such). Thank you in advance if this is possible!

If not, it's not a problem, i'll keep trying.

@niger-prequel
Copy link
Author

I'll try to reproduce it in our nightly CI and give you something to work with tomorrow if I can reproduce it. It happens non deterministically so I'll have to just keep running the battery until I get it to happen.

@niger-prequel
Copy link
Author

@sfc-gh-dszmolka we saw it last night in CI for session 251775374914758

@sfc-gh-dszmolka
Copy link
Contributor

thank you @niger-prequel , let me see what I can find

@sfc-gh-dszmolka
Copy link
Contributor

found this session yesterday - it was thankfully unique, so found the relevant Snowflake account, deployment, everything.

  • I did not see the originally reported error message Identified SQL statement is not currently executing in this session
  • in fact, i did not see this error message for any of the sessions created in this account, since 2025-JAN-01 ..

Would you be able to share a queryId which resulted in this error message you originally reported as an error?

  • interestingly, in the Session you shared, there was two identical PUTs launched for the same file with about ~150ms difference next to each other; if you wish to check for yourself in query history then these are queryIds ending with af4db6 and afb01a in the session you shared.
  • i observed the same behaviour in my repros; each file PUT was duplicated and having their own genuine and unique queryId.
  • this happens in the local repros even with OVERWRITE=FALSE so i'm not sure if this behaviour is specific to the cloud provider on where your (and my) repro account is or even relevant to the issue, so for now i'm setting it aside

In addition and based on your earlier comment, ran more repros, which I now put into https://github.com/sfc-gh-dszmolka/repro-for-gosnowflake-1292 instead of quoting here. I'm now trying to log the sessionId too.

  • maybe my repro attempt is wrong, maybe not, but I observed that when running the originally shared UploadFilesAndCopy with its original definition
// Start a fixed number of worker goroutines to process the file uploads
    workerCount := 5
    for i := 0; i < workerCount; i++ {
        wg.Add(1)
        go func() {
..
        }()
    }

after the temp stage + temp table is created in SessionX , and the first file is PUT also in SessionX, all the subsequent WaitGroup.Add will lead to executing the subsequent PUTs in their own different sessions. Seemingly in parallel.

  • if i'm trying to do the same but without the for cycle, all the queries including all the PUTs are executed in the same session. Seemingly sequentially.
  • using db.SetMaxOpenConns(1) results in the script being blocked, seemingly forever, and never be able to execute the very first query (which is for me, creating the stage). Using 0 or even 2 works and script executes as expected.
  • literally hundreds of runs with both versions with ('parallel') and without ('sequential') for cycles, could not once get to the issue Identified SQL statement is not currently executing yet.

A queryId which has this error message could be helpful, also if you're perhaps able to create a minimal reproduction which leads to the error for you and I could execute on my end - that could eliminate all the possible differences and be greatly helpful. If you're up for it, I can add you as a collaborator for the above repo, or of course any other means of sharing code is absolutely okay.

@niger-prequel
Copy link
Author

@sfc-gh-dszmolka I apologize, I assumed earlier that any error on our nightly CI was the error noted in this issue. That was not true, I've waited for our CI to surface this exact issue and it did this morning. The error was Snowflake statement is not executing: Driver Error: 000605: Identified SQL statement is not currently executing from the driver and the session ID was 251775377707022 .

@sfc-gh-dszmolka
Copy link
Contributor

thank you so much @niger-prequel , will look up that sessionId in the next coming days and see what happened inside it.

@sfc-gh-dszmolka
Copy link
Contributor

This is super weird. Seeing the 20 queries issued in session 251775377707022 but none of them failed on Snowflake end.

Since no queryId was provided which query failed with the 000605: Identified SQL statement is not currently executing error, nor can I reproduce the issue on my end (with above reproducer) I'm not sure how to check this further without asking for more input.

Would you be possible for you to share verbose (DEBUG level) gosnowflake logs of the issue happening, in a more private setting, where you don't need to expose for the whole world to see ? Thinking something about creating an official case with us, getting to me and I can work with you further on the case where you can privately share logs - if that's allowed in your policy.
If that's a possible path for you (sharing DEBUG level driver logs in a private setting), please create a case with us, mentioning this issue or even this comment, and asking to get it to me. Normally, we don't work like this and cases are always taken by an appropriate skillset support agent, but i think we can make this one exception.

If that's not possible, would you be able to provide a runnable reproducer (not a snippet) which I can run on my end in loops in hope it fails for me too ? This would be the most helpful perhaps, of all other options.

Thank you in advance !

@niger-prequel
Copy link
Author

@sfc-gh-dszmolka I am setting the log level to debug for gosnowflake and I'll monitor for the next time this error appears. What is lucky is that I can get this to fail in our nightlies every so often so that environment isn't particularly sensitive. So I can probably share the debug logs here directly if that is simpler.

If that's not possible, would you be able to provide a runnable reproducer (not a snippet) which I can run on my end in loops in hope it fails for me too ? This would be the most helpful perhaps, of all other options.

What's hard about this is that our platform is a bit complex. There are a number of different pipelines executing concurrently on that same instance and how those pipelines are configured is dynamic based on state in our system and the query engine. We support over a dozen databases and their drivers, so the code also isn't unique to the Snowflake driver. There is a lot of conditional logic depending on what driver we're using. . There arealso a number of layers of abstractions to facilitate dynamic pipeline generation between the *sql.DB instance and how it actually gets used. Its not really a static set of code I can just rip out and simply hand to you.

@sfc-gh-dszmolka
Copy link
Contributor

Understand the situation, thank you ! When sharing logs, our recommendation is to please make sure nothing sensitive is shared over the public internet so please either sanitize it, or you can also upload to a private GH repository where you can grant my user access, or really any other ways of sharing a file which doesn't end up sharing with everybody.

@niger-prequel
Copy link
Author

niger-prequel commented Feb 20, 2025

@sfc-gh-dszmolka I've attached debug logs. I redacted anything we'd consider sensitive but again there isn't any production data in this snowflake account so we should be fine. The debug logs do seem to reinforce my hunch that this has to do with context cancellation. To re-iterate, we managed to solve this in prod for ourselves by generating a new child context before every use of ExecContext() or QueryContext(). Like:

func () error {
	newCtx, _ = context.WithCancel(ctx) // forces new child context to isolate from concurrent queries

	_, err = db.ExecContext(newCtx, query)
	if err != nil {
		return err
	}

        return nil
}

using db.ExecContext(ctx, query) is what causes this error to non deterministically emerge.

debug_snowflake_logs.txt

@sfc-gh-dszmolka sfc-gh-dszmolka removed the status-information_needed Additional information is required from the reporter label Feb 25, 2025
@sfc-gh-dszmolka sfc-gh-dszmolka added status-triage_done Initial triage done, will be further handled by the driver team and removed status-triage Issue is under initial triage labels Mar 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Erroneous or unexpected behaviour status-triage_done Initial triage done, will be further handled by the driver team
Projects
None yet
Development

No branches or pull requests

3 participants