Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Expose next_tx_offset of the Replay decoder #1899

Merged
merged 2 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 62 additions & 6 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,23 @@ use anyhow::{anyhow, Context};
use core::{cell::RefCell, ops::RangeBounds};
use parking_lot::{Mutex, RwLock};
use spacetimedb_commitlog::payload::{txdata, Txdata};
use spacetimedb_durability::TxOffset;
use spacetimedb_lib::db::auth::StAccess;
use spacetimedb_lib::{Address, Identity};
use spacetimedb_primitives::{ColList, ConstraintId, IndexId, SequenceId, TableId};
use spacetimedb_sats::{bsatn, buffer::BufReader, AlgebraicValue, ProductValue};
use spacetimedb_schema::schema::{IndexSchema, SequenceSchema, TableSchema};
use spacetimedb_snapshot::ReconstructedSnapshot;
use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository};
use spacetimedb_table::{
indexes::RowPointer,
table::{RowRef, Table},
MemoryUsage,
};
use std::time::{Duration, Instant};
use std::{borrow::Cow, sync::Arc};
use std::{
path::PathBuf,
time::{Duration, Instant},
};
use thiserror::Error;

pub type Result<T> = std::result::Result<T, DBError>;
Expand Down Expand Up @@ -223,6 +227,49 @@ impl Locking {
Ok(datastore)
}

/// Take a snapshot of this [`Locking`] datastore's [`CommittedState`]
/// and store it in `repo`.
///
/// On success, returns:
///
/// - `None` if the committed state is empty
/// (i.e. no transactions have been committed yet)
/// and therefore no snapshot was created
///
/// - or `Some` path to the newly created snapshot directory
///
/// Returns an error if [`SnapshotRepository::create_snapshot`] returns an
/// error.
pub fn take_snapshot(&self, repo: &SnapshotRepository) -> Result<Option<PathBuf>> {
let maybe_offset_and_path = Self::take_snapshot_internal(&self.committed_state, repo)?;
Ok(maybe_offset_and_path.map(|(_, path)| path))
}

pub(crate) fn take_snapshot_internal(
committed_state: &RwLock<CommittedState>,
repo: &SnapshotRepository,
) -> Result<Option<(TxOffset, PathBuf)>> {
let mut committed_state = committed_state.write();
let Some(tx_offset) = committed_state.next_tx_offset.checked_sub(1) else {
return Ok(None);
};

log::info!(
"Capturing snapshot of database {:?} at TX offset {}",
repo.database_identity(),
tx_offset,
);

let CommittedState {
ref mut tables,
ref blob_store,
..
} = *committed_state;
let snapshot_dir = repo.create_snapshot(tables.values_mut(), blob_store, tx_offset)?;

Ok(Some((tx_offset, snapshot_dir)))
}

/// Returns a list over all the currently connected clients,
/// reading from the `st_clients` system table.
pub fn connected_clients<'a>(
Expand Down Expand Up @@ -277,9 +324,18 @@ impl Tx for Locking {
}

impl TxDatastore for Locking {
type Iter<'a> = Iter<'a> where Self: 'a;
type IterByColEq<'a, 'r> = IterByColRange<'a, &'r AlgebraicValue> where Self: 'a;
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>> = IterByColRange<'a, R> where Self: 'a;
type Iter<'a>
= Iter<'a>
where
Self: 'a;
type IterByColEq<'a, 'r>
= IterByColRange<'a, &'r AlgebraicValue>
where
Self: 'a;
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>>
= IterByColRange<'a, R>
where
Self: 'a;

fn iter_tx<'a>(&'a self, ctx: &'a ExecutionContext, tx: &'a Self::Tx, table_id: TableId) -> Result<Self::Iter<'a>> {
tx.iter(ctx, table_id)
Expand Down Expand Up @@ -719,7 +775,7 @@ impl<F> Replay<F> {
f(&mut visitor)
}

pub(crate) fn next_tx_offset(&self) -> u64 {
pub fn next_tx_offset(&self) -> u64 {
self.committed_state.read_arc().next_tx_offset
}
}
Expand Down
49 changes: 21 additions & 28 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,37 +133,30 @@ impl SnapshotWorker {
}

fn take_snapshot(committed_state: &RwLock<CommittedState>, snapshot_repo: &SnapshotRepository) {
let mut committed_state = committed_state.write();
let Some(tx_offset) = committed_state.next_tx_offset.checked_sub(1) else {
log::info!("SnapshotWorker::take_snapshot: refusing to take snapshot at tx_offset -1");
return;
};
log::info!(
"Capturing snapshot of database {:?} at TX offset {}",
snapshot_repo.database_identity(),
tx_offset,
);

let start_time = std::time::Instant::now();
match Locking::take_snapshot_internal(committed_state, snapshot_repo) {
Err(e) => {
log::error!(
"Error capturing snapshot of database {:?}: {e:?}",
snapshot_repo.database_identity()
);
}

let CommittedState {
ref mut tables,
ref blob_store,
..
} = *committed_state;
Ok(None) => {
log::warn!(
"SnapshotWorker::take_snapshot: refusing to take snapshot of database {} at TX offset -1",
snapshot_repo.database_identity()
);
}

if let Err(e) = snapshot_repo.create_snapshot(tables.values_mut(), blob_store, tx_offset) {
log::error!(
"Error capturing snapshot of database {:?}: {e:?}",
snapshot_repo.database_identity()
);
} else {
log::info!(
"Captured snapshot of database {:?} at TX offset {} in {:?}",
snapshot_repo.database_identity(),
tx_offset,
start_time.elapsed()
);
Ok(Some((tx_offset, _path))) => {
log::info!(
"Captured snapshot of database {:?} at TX offset {} in {:?}",
snapshot_repo.database_identity(),
tx_offset,
start_time.elapsed()
);
}
}
}
}
Expand Down
Loading