Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query errors with riverdatabasesql, Bun, and pgdriver #655

Open
NathanBaulch opened this issue Oct 24, 2024 · 10 comments
Open

Query errors with riverdatabasesql, Bun, and pgdriver #655

NathanBaulch opened this issue Oct 24, 2024 · 10 comments

Comments

@NathanBaulch
Copy link
Contributor

I'm completely new to River and immediately ran into the following issue when playing with the examples against Pg 16.4:

msg="Elector: Error attempting to elect" err="ERROR: cannot cast type bigint to interval (SQLSTATE=42846)"

The offending LeaderAttemptElect SQL is attempting to cast an integer time.Duration parameter into a Pg interval which is not allowed. Weird that nobody else appears to have hit the same issue, I wonder where I've gone wrong!?

Anyway, I'm currently working around this by patching all 4 instances of now() + $2 in the river source with now() + make_interval(secs => $2), as recommended in this issue: sqlc-dev/sqlc#429 (comment)

@bgentry
Copy link
Contributor

bgentry commented Oct 24, 2024

@NathanBaulch I’m not sure how this could have slipped past our testing 🤔 can you confirm that you’re using the dbsql driver, as well as the underlying db driver you’re using (pgx or other)?

@NathanBaulch
Copy link
Contributor Author

OK, that was a bit of a rabbit hole but it turns out the bun sql driver my project uses doesn't prepare statements before execution (for alleged perf reasons). That step is required in order to infer that the @ttl parameter should be interpreted as an interval (oid 1186), otherwise the underlying int64 is used.

Is there any interest in supporting non-prepare-ing drivers? I'd be happy to send a PR that replaces ::interval with make_interval(...).

@bgentry
Copy link
Contributor

bgentry commented Oct 25, 2024

Ah, it seems like you're using Bun's own pgdriver which I don't think we've tried, so it seems likely to be an issue specific to that. We have a fair bit of test coverage for the drivers including the LeaderAttemptElect query, but that uses pgx under the hood via its stdlib database/sql package.

Have you tried Bun with pgx, or is that not an option for some reason?

@bgentry bgentry changed the title cannot cast type bigint to interval Query errors with riverdatabasesql, Bun, and pgdriver Oct 25, 2024
@NathanBaulch
Copy link
Contributor Author

I've switched to pgx for now, hopefully no side effects in my project! I guess this issue can serve as a warning to other Bun users.

@tan-apruv
Copy link

tan-apruv commented Jan 1, 2025

Exact same issue with riverdatabasesql.

time=2025-01-01T16:08:07.106+05:00 level=ERROR msg="Elector: Error attempting to elect" attempt=190 client_id=MacBook-Pro_local_2024_12_31T14_54_23_558092 err="ERROR: cannot cast type bigint to interval (SQLSTATE=42846)" sleep_duration=8m51.567188925s

Any help or guidance is highly appreciated

Note: I am using gorm as my ORM in most of the project

@brandur
Copy link
Contributor

brandur commented Jan 4, 2025

Are either of you guys able to provide some kind of basic repro that demonstrates the error?

If all we need to do is replace an ::interval with a make_interval, that seems super plausible to do, but it would be nice if we had something to check that it's actually working in the test suite because otherwise this'll probably regress fast.

@NathanBaulch
Copy link
Contributor Author

NathanBaulch commented Jan 27, 2025

Sure thing, sorry about the delay.
Here's a standalone unit test that uses https://github.com/testcontainers/testcontainers-go to provision a clean DB.

func Test_Bun_Driver_LeaderAttemptElect(t *testing.T) {
	ctx := context.Background()
	pg := must(postgres.Run(ctx, "postgres:alpine", postgres.BasicWaitStrategies()))
	defer testcontainers.TerminateContainer(pg)
	c := must(pgdriver.NewDriver().OpenConnector(pg.MustConnectionString(ctx, "sslmode=disable")))
	db := sql.OpenDB(c)
	defer db.Close()
	driver := riverdatabasesql.New(db)
	migrator := must(rivermigrate.New(driver, nil))
	must(migrator.Migrate(ctx, rivermigrate.DirectionUp, nil))
	must(driver.GetExecutor().LeaderAttemptElect(ctx, &riverdriver.LeaderElectParams{LeaderID: "foo", TTL: time.Second}))
	// BOOM!
}

func must[T any](val T, err error) T { if err != nil { panic(err) } return val }

FWIW, go.mod:

require (
	github.com/riverqueue/river v0.15.0
	github.com/riverqueue/river/riverdriver v0.15.0
	github.com/riverqueue/river/riverdriver/riverdatabasesql v0.15.0
	github.com/testcontainers/testcontainers-go v0.35.0
	github.com/testcontainers/testcontainers-go/modules/postgres v0.35.0
	github.com/uptrace/bun/driver/pgdriver v1.2.9
)

@NathanBaulch
Copy link
Contributor Author

Note that a fix for this might also address #566.

@brandur
Copy link
Contributor

brandur commented Jan 29, 2025

Thanks!

Okay so I spent some time on this, and I was able to get it working for a single SQL operation (QueueCreateOrSetUpdatedAt in this case), which demonstrates that this is at least probably possible.

Partial diff:

diff --git a/driver_test.go b/driver_test.go
index c9e0203..4c28144 100644
--- a/driver_test.go
+++ b/driver_test.go
@@ -9,6 +9,7 @@ import (
 	"time"
 
 	"github.com/jackc/pgx/v5"
+	"github.com/jackc/pgx/v5/pgxpool"
 	"github.com/jackc/pgx/v5/stdlib"
 	"github.com/stretchr/testify/require"
 
@@ -67,6 +68,45 @@ func TestDriverRiverPgxV5(t *testing.T) {
 		})
 }
 
+func TestDriverRiverPgxV5_QueryExecModeSimpleProtocol(t *testing.T) {
+	t.Parallel()
+
+	ctx := context.Background()
+
+	dbPool := riverinternaltest.TestDB(ctx, t)
+
+	config := dbPool.Config().Copy()
+	config.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
+
+	dbPool, err := pgxpool.NewWithConfig(ctx, config)
+	require.NoError(t, err)
+
+	riverdrivertest.Exercise(ctx, t,
+		func(ctx context.Context, t *testing.T) riverdriver.Driver[pgx.Tx] {
+			t.Helper()
+
+			// TODO: Can this be merged with the pool above?
+			dbPool := riverinternaltest.TestDB(ctx, t)
+
+			config := dbPool.Config().Copy()
+			config.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeSimpleProtocol
+
+			dbPool, err := pgxpool.NewWithConfig(ctx, config)
+			require.NoError(t, err)
+
+			return riverpgxv5.New(dbPool)
+		},
+		func(ctx context.Context, t *testing.T) riverdriver.Executor {
+			t.Helper()
+
+			tx, err := dbPool.Begin(ctx)
+			require.NoError(t, err)
+			t.Cleanup(func() { _ = tx.Rollback(ctx) })
+
+			return riverpgxv5.New(nil).UnwrapExecutor(tx)
+		})
+}
+
 func BenchmarkDriverRiverPgxV5_Executor(b *testing.B) {
 	const (
 		clientID = "test-client-id"
diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go
index ae179c3..186a324 100644
--- a/riverdriver/river_driver_interface.go
+++ b/riverdriver/river_driver_interface.go
@@ -442,7 +442,7 @@ type NotifyManyParams struct {
 }
 
 type QueueCreateOrSetUpdatedAtParams struct {
-	Metadata  []byte
+	Metadata  *string
 	Name      string
 	PausedAt  *time.Time
 	UpdatedAt *time.Time
diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go
index 1b02010..bfd512f 100644
--- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go
+++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go
@@ -20,7 +20,7 @@ INSERT INTO river_queue(
     updated_at
 ) VALUES (
     now(),
-    coalesce($1::jsonb, '{}'::jsonb),
+    jsonb(coalesce($1::text, '{}')),
     $2::text,
     coalesce($3::timestamptz, NULL),
     coalesce($4::timestamptz, now())
@@ -31,7 +31,7 @@ RETURNING name, created_at, metadata, paused_at, updated_at
 `
 
 type QueueCreateOrSetUpdatedAtParams struct {
-	Metadata  string
+	Metadata  *string
 	Name      string
 	PausedAt  *time.Time
 	UpdatedAt *time.Time
diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml b/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml
index f8ebbee..67f48af 100644
--- a/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml
+++ b/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml
@@ -45,6 +45,12 @@ sql:
           - db_type: "pg_catalog.interval"
             go_type: "time.Duration"
 
+          - db_type: "text"
+            go_type:
+              type: "string"
+              pointer: true
+            nullable: true
+
           - db_type: "timestamptz"
             go_type: "time.Time"
 
diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go
index 9d70913..1213337 100644
--- a/riverdriver/riverdatabasesql/river_database_sql_driver.go
+++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go
@@ -770,7 +770,7 @@ func (e *Executor) PGAdvisoryXactLock(ctx context.Context, key int64) (*struct{}
 
 func (e *Executor) QueueCreateOrSetUpdatedAt(ctx context.Context, params *riverdriver.QueueCreateOrSetUpdatedAtParams) (*rivertype.Queue, error) {
 	queue, err := dbsqlc.New().QueueCreateOrSetUpdatedAt(ctx, e.dbtx, &dbsqlc.QueueCreateOrSetUpdatedAtParams{
-		Metadata:  valutil.ValOrDefault(string(params.Metadata), "{}"),
+		Metadata:  params.Metadata,
 		Name:      params.Name,
 		PausedAt:  params.PausedAt,
 		UpdatedAt: params.UpdatedAt,
diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql
index 0c4da1a..a7ae7ea 100644
--- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql
+++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql
@@ -15,7 +15,7 @@ INSERT INTO river_queue(
     updated_at
 ) VALUES (
     now(),
-    coalesce(@metadata::jsonb, '{}'::jsonb),
+    jsonb(coalesce(sqlc.narg('metadata')::text, '{}')),
     @name::text,
     coalesce(sqlc.narg('paused_at')::timestamptz, NULL),
     coalesce(sqlc.narg('updated_at')::timestamptz, now())
diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go
index 6351e85..94ae63e 100644
--- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go
+++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql.go
@@ -21,7 +21,7 @@ INSERT INTO river_queue(
     updated_at
 ) VALUES (
     now(),
-    coalesce($1::jsonb, '{}'::jsonb),
+    jsonb(coalesce($1::text, '{}')),
     $2::text,
     coalesce($3::timestamptz, NULL),
     coalesce($4::timestamptz, now())
@@ -32,7 +32,7 @@ RETURNING name, created_at, metadata, paused_at, updated_at
 `
 
 type QueueCreateOrSetUpdatedAtParams struct {
-	Metadata  []byte
+	Metadata  *string
 	Name      string
 	PausedAt  *time.Time
 	UpdatedAt *time.Time
diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml b/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml
index 17ff029..8c6cd75 100644
--- a/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml
+++ b/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml
@@ -36,6 +36,12 @@ sql:
           - db_type: "pg_catalog.interval"
             go_type: "time.Duration"
 
+          - db_type: "text"
+            go_type:
+              type: "string"
+              pointer: true
+            nullable: true
+
           - db_type: "timestamptz"
             go_type: "time.Time"

Probably the most important part is the change to the SQL operation:

--- a/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql
+++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_queue.sql
@@ -15,7 +15,7 @@ INSERT INTO river_queue(
     updated_at
 ) VALUES (
     now(),
-    coalesce(@metadata::jsonb, '{}'::jsonb),
+    jsonb(coalesce(sqlc.narg('metadata')::text, '{}')),
     @name::text,
     coalesce(sqlc.narg('paused_at')::timestamptz, NULL),
     coalesce(sqlc.narg('updated_at')::timestamptz, now())

I'll tell you though, getting this working in a way that both the Pgx and database/sql drivers will accept was like pulling teeth, and took quite a few iterations. The pattern would be similar for other SQL operations, but there's probably 50 other callsites that need fixup, so this would be a pretty substantial project.

I'd never touched QueryExecModeSimpleProtocol before, but given how limited its capabilities are, I'd hazard a guess that it was never meant to be used substantially or for anything too complex.

@NathanBaulch
Copy link
Contributor Author

NathanBaulch commented Jan 31, 2025

Another important issue with switching from the Bun driver to pgx is my app can no longer use pgdriver.NewListener for its own internal pub/sub needs, since this function casts the provided DB driver to bun.Driver. I basically have no choice but to create two DB drivers in parallel with their own separate connection pools.

On a related note, using riverdatabasesql (with underlying pgx stdlib wrapper) means that the River client doesn't support listeners. If I could stick with the Bun driver then I'd be able to easily create a custom River driver that supports listeners, something like:

func NewBunDriver(db *bun.DB) riverdriver.Driver[bun.IDB] {
	return &bunDriver{Driver: riverdatabasesql.New(db.DB), db: db}
}

type bunDriver struct {
	*riverdatabasesql.Driver
	db *bun.DB
}

func (b *bunDriver) SupportsListener() bool { return true }

func (b *bunDriver) GetListener() riverdriver.Listener {
	return &bunListener{Listener: pgdriver.NewListener(b.db)}
}

func (b *bunDriver) UnwrapExecutor(tx bun.IDB) riverdriver.ExecutorTx {
	return b.Driver.UnwrapExecutor(tx.(bun.Tx).Tx)
}

type bunListener struct { *pgdriver.Listener }

func (b *bunListener) Connect(ctx context.Context) error {
	// bun will connect on first listen, even with no channels
	return b.Listener.Listen(ctx)
} 

func (b *bunListener) WaitForNotification(ctx context.Context) (*riverdriver.Notification, error) {
	if channel, payload, err := b.Receive(ctx); err != nil {
		return nil, err
	} else {
		return &riverdriver.Notification{Topic: channel, Payload: payload}, nil
	}
}

func (b *bunListener) Listen(ctx context.Context, topic string) error { return b.Listener.Listen(ctx, topic) }
func (b *bunListener) Unlisten(ctx context.Context, topic string) error { return b.Listener.Unlisten(ctx, topic) }
func (b *bunListener) Ping(context.Context) error { return nil } // bun runs its own ping loop
func (b *bunListener) Close(context.Context) error { return b.Listener.Close() }

This driver also extracts the *sql.Tx from bun.IDB so I can create a river.Client[bun.IDB] that seamlessly works with Bun transactions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants