Add "expires" stuff to message cache migration

This commit is contained in:
binwiederhier 2023-01-09 16:21:00 -05:00
parent 3aba7404fc
commit a6564fb43c
3 changed files with 156 additions and 53 deletions

View file

@ -215,6 +215,22 @@ const (
CREATE INDEX IF NOT EXISTS idx_expires ON messages (expires);
CREATE INDEX IF NOT EXISTS idx_attachment_expires ON messages (attachment_expires);
`
migrate9To10UpdateMessageExpiryQuery = `UPDATE messages SET expires = time + ?`
)
var (
migrations = map[int]func(db *sql.DB, cacheDuration time.Duration) error{
0: migrateFrom0,
1: migrateFrom1,
2: migrateFrom2,
3: migrateFrom3,
4: migrateFrom4,
5: migrateFrom5,
6: migrateFrom6,
7: migrateFrom7,
8: migrateFrom8,
9: migrateFrom9,
}
)
type messageCache struct {
@ -224,12 +240,12 @@ type messageCache struct {
}
// newSqliteCache creates a SQLite file-backed cache
func newSqliteCache(filename, startupQueries string, batchSize int, batchTimeout time.Duration, nop bool) (*messageCache, error) {
func newSqliteCache(filename, startupQueries string, cacheDuration time.Duration, batchSize int, batchTimeout time.Duration, nop bool) (*messageCache, error) {
db, err := sql.Open("sqlite3", filename)
if err != nil {
return nil, err
}
if err := setupCacheDB(db, startupQueries); err != nil {
if err := setupDB(db, startupQueries, cacheDuration); err != nil {
return nil, err
}
var queue *util.BatchingQueue[*message]
@ -247,13 +263,13 @@ func newSqliteCache(filename, startupQueries string, batchSize int, batchTimeout
// newMemCache creates an in-memory cache
func newMemCache() (*messageCache, error) {
return newSqliteCache(createMemoryFilename(), "", 0, 0, false)
return newSqliteCache(createMemoryFilename(), "", 0, 0, 0, false)
}
// newNopCache creates an in-memory cache that discards all messages;
// it is always empty and can be used if caching is entirely disabled
func newNopCache() (*messageCache, error) {
return newSqliteCache(createMemoryFilename(), "", 0, 0, true)
return newSqliteCache(createMemoryFilename(), "", 0, 0, 0, true)
}
// createMemoryFilename creates a unique memory filename to use for the SQLite backend.
@ -637,7 +653,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) {
return messages, nil
}
func setupCacheDB(db *sql.DB, startupQueries string) error {
func setupDB(db *sql.DB, startupQueries string, cacheDuration time.Duration) error {
// Run startup queries
if startupQueries != "" {
if _, err := db.Exec(startupQueries); err != nil {
@ -669,28 +685,18 @@ func setupCacheDB(db *sql.DB, startupQueries string) error {
// Do migrations
if schemaVersion == currentSchemaVersion {
return nil
} else if schemaVersion == 0 {
return migrateFrom0(db)
} else if schemaVersion == 1 {
return migrateFrom1(db)
} else if schemaVersion == 2 {
return migrateFrom2(db)
} else if schemaVersion == 3 {
return migrateFrom3(db)
} else if schemaVersion == 4 {
return migrateFrom4(db)
} else if schemaVersion == 5 {
return migrateFrom5(db)
} else if schemaVersion == 6 {
return migrateFrom6(db)
} else if schemaVersion == 7 {
return migrateFrom7(db)
} else if schemaVersion == 8 {
return migrateFrom8(db)
} else if schemaVersion == 9 {
return migrateFrom9(db)
} else if schemaVersion > currentSchemaVersion {
return fmt.Errorf("unexpected schema version: version %d is higher than current version %d", schemaVersion, currentSchemaVersion)
}
return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
for i := schemaVersion; i < currentSchemaVersion; i++ {
fn, ok := migrations[i]
if !ok {
return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1)
} else if err := fn(db, cacheDuration); err != nil {
return err
}
}
return nil
}
func setupNewCacheDB(db *sql.DB) error {
@ -706,7 +712,7 @@ func setupNewCacheDB(db *sql.DB) error {
return nil
}
func migrateFrom0(db *sql.DB) error {
func migrateFrom0(db *sql.DB, _ time.Duration) error {
log.Info("Migrating cache database schema: from 0 to 1")
if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
return err
@ -717,10 +723,10 @@ func migrateFrom0(db *sql.DB) error {
if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
return err
}
return migrateFrom1(db)
return nil
}
func migrateFrom1(db *sql.DB) error {
func migrateFrom1(db *sql.DB, _ time.Duration) error {
log.Info("Migrating cache database schema: from 1 to 2")
if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
return err
@ -728,10 +734,10 @@ func migrateFrom1(db *sql.DB) error {
if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
return err
}
return migrateFrom2(db)
return nil
}
func migrateFrom2(db *sql.DB) error {
func migrateFrom2(db *sql.DB, _ time.Duration) error {
log.Info("Migrating cache database schema: from 2 to 3")
if _, err := db.Exec(migrate2To3AlterMessagesTableQuery); err != nil {
return err
@ -739,10 +745,10 @@ func migrateFrom2(db *sql.DB) error {
if _, err := db.Exec(updateSchemaVersion, 3); err != nil {
return err
}
return migrateFrom3(db)
return nil
}
func migrateFrom3(db *sql.DB) error {
func migrateFrom3(db *sql.DB, _ time.Duration) error {
log.Info("Migrating cache database schema: from 3 to 4")
if _, err := db.Exec(migrate3To4AlterMessagesTableQuery); err != nil {
return err
@ -750,10 +756,10 @@ func migrateFrom3(db *sql.DB) error {
if _, err := db.Exec(updateSchemaVersion, 4); err != nil {
return err
}
return migrateFrom4(db)
return nil
}
func migrateFrom4(db *sql.DB) error {
func migrateFrom4(db *sql.DB, _ time.Duration) error {
log.Info("Migrating cache database schema: from 4 to 5")
if _, err := db.Exec(migrate4To5AlterMessagesTableQuery); err != nil {
return err
@ -761,10 +767,10 @@ func migrateFrom4(db *sql.DB) error {
if _, err := db.Exec(updateSchemaVersion, 5); err != nil {
return err
}
return migrateFrom5(db)
return nil
}
func migrateFrom5(db *sql.DB) error {
func migrateFrom5(db *sql.DB, _ time.Duration) error {
log.Info("Migrating cache database schema: from 5 to 6")
if _, err := db.Exec(migrate5To6AlterMessagesTableQuery); err != nil {
return err
@ -772,10 +778,10 @@ func migrateFrom5(db *sql.DB) error {
if _, err := db.Exec(updateSchemaVersion, 6); err != nil {
return err
}
return migrateFrom6(db)
return nil
}
func migrateFrom6(db *sql.DB) error {
func migrateFrom6(db *sql.DB, _ time.Duration) error {
log.Info("Migrating cache database schema: from 6 to 7")
if _, err := db.Exec(migrate6To7AlterMessagesTableQuery); err != nil {
return err
@ -783,10 +789,10 @@ func migrateFrom6(db *sql.DB) error {
if _, err := db.Exec(updateSchemaVersion, 7); err != nil {
return err
}
return migrateFrom7(db)
return nil
}
func migrateFrom7(db *sql.DB) error {
func migrateFrom7(db *sql.DB, _ time.Duration) error {
log.Info("Migrating cache database schema: from 7 to 8")
if _, err := db.Exec(migrate7To8AlterMessagesTableQuery); err != nil {
return err
@ -794,10 +800,10 @@ func migrateFrom7(db *sql.DB) error {
if _, err := db.Exec(updateSchemaVersion, 8); err != nil {
return err
}
return migrateFrom8(db)
return nil
}
func migrateFrom8(db *sql.DB) error {
func migrateFrom8(db *sql.DB, _ time.Duration) error {
log.Info("Migrating cache database schema: from 8 to 9")
if _, err := db.Exec(migrate8To9AlterMessagesTableQuery); err != nil {
return err
@ -805,10 +811,10 @@ func migrateFrom8(db *sql.DB) error {
if _, err := db.Exec(updateSchemaVersion, 9); err != nil {
return err
}
return migrateFrom9(db)
return nil
}
func migrateFrom9(db *sql.DB) error {
func migrateFrom9(db *sql.DB, cacheDuration time.Duration) error {
log.Info("Migrating cache database schema: from 9 to 10")
tx, err := db.Begin()
if err != nil {
@ -818,7 +824,9 @@ func migrateFrom9(db *sql.DB) error {
if _, err := tx.Exec(migrate9To10AlterMessagesTableQuery); err != nil {
return err
}
// FIXME add logic to set "expires" column
if _, err := tx.Exec(migrate9To10UpdateMessageExpiryQuery, int64(cacheDuration.Seconds())); err != nil {
return err
}
if _, err := tx.Exec(updateSchemaVersion, 10); err != nil {
return err
}

View file

@ -459,12 +459,108 @@ func TestSqliteCache_Migration_From1(t *testing.T) {
require.Equal(t, 11, len(messages))
}
func TestSqliteCache_Migration_From9(t *testing.T) {
// This primarily tests the awkward migration that introduces the "expires" column.
// The migration logic has to update the column, using the existing "cache-duration" value.
filename := newSqliteTestCacheFile(t)
db, err := sql.Open("sqlite3", filename)
require.Nil(t, err)
// Create "version 8" schema
_, err = db.Exec(`
BEGIN;
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mid TEXT NOT NULL,
time INT NOT NULL,
topic TEXT NOT NULL,
message TEXT NOT NULL,
title TEXT NOT NULL,
priority INT NOT NULL,
tags TEXT NOT NULL,
click TEXT NOT NULL,
icon TEXT NOT NULL,
actions TEXT NOT NULL,
attachment_name TEXT NOT NULL,
attachment_type TEXT NOT NULL,
attachment_size INT NOT NULL,
attachment_expires INT NOT NULL,
attachment_url TEXT NOT NULL,
sender TEXT NOT NULL,
encoding TEXT NOT NULL,
published INT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY,
version INT NOT NULL
);
INSERT INTO schemaVersion (id, version) VALUES (1, 9);
COMMIT;
`)
require.Nil(t, err)
// Insert a bunch of messages
insertQuery := `
INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding, published)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
for i := 0; i < 10; i++ {
_, err = db.Exec(
insertQuery,
fmt.Sprintf("abcd%d", i),
time.Now().Unix(),
"mytopic",
fmt.Sprintf("some message %d", i),
"", // title
0, // priority
"", // tags
"", // click
"", // icon
"", // actions
"", // attachment_name
"", // attachment_type
0, // attachment_size
0, // attachment_type
"", // attachment_url
"9.9.9.9", // sender
"", // encoding
1, // published
)
require.Nil(t, err)
}
// Create cache to trigger migration
cacheDuration := 17 * time.Hour
c, err := newSqliteCache(filename, "", cacheDuration, 0, 0, false)
checkSchemaVersion(t, c.db)
// Check version
rows, err := db.Query(`SELECT version FROM main.schemaVersion WHERE id = 1`)
require.Nil(t, err)
require.True(t, rows.Next())
var version int
require.Nil(t, rows.Scan(&version))
require.Equal(t, currentSchemaVersion, version)
messages, err := c.Messages("mytopic", sinceAllMessages, false)
require.Nil(t, err)
require.Equal(t, 10, len(messages))
for _, m := range messages {
require.True(t, m.Expires > time.Now().Add(cacheDuration-5*time.Second).Unix())
require.True(t, m.Expires < time.Now().Add(cacheDuration+5*time.Second).Unix())
}
}
func TestSqliteCache_StartupQueries_WAL(t *testing.T) {
filename := newSqliteTestCacheFile(t)
startupQueries := `pragma journal_mode = WAL;
pragma synchronous = normal;
pragma temp_store = memory;`
db, err := newSqliteCache(filename, startupQueries, 0, 0, false)
db, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false)
require.Nil(t, err)
require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
require.FileExists(t, filename)
@ -475,7 +571,7 @@ pragma temp_store = memory;`
func TestSqliteCache_StartupQueries_None(t *testing.T) {
filename := newSqliteTestCacheFile(t)
startupQueries := ""
db, err := newSqliteCache(filename, startupQueries, 0, 0, false)
db, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false)
require.Nil(t, err)
require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
require.FileExists(t, filename)
@ -486,7 +582,7 @@ func TestSqliteCache_StartupQueries_None(t *testing.T) {
func TestSqliteCache_StartupQueries_Fail(t *testing.T) {
filename := newSqliteTestCacheFile(t)
startupQueries := `xx error`
_, err := newSqliteCache(filename, startupQueries, 0, 0, false)
_, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false)
require.Error(t, err)
}
@ -538,7 +634,7 @@ func TestMemCache_NopCache(t *testing.T) {
}
func newSqliteTestCache(t *testing.T) *messageCache {
c, err := newSqliteCache(newSqliteTestCacheFile(t), "", 0, 0, false)
c, err := newSqliteCache(newSqliteTestCacheFile(t), "", time.Hour, 0, 0, false)
if err != nil {
t.Fatal(err)
}
@ -550,7 +646,7 @@ func newSqliteTestCacheFile(t *testing.T) string {
}
func newSqliteTestCacheFromFile(t *testing.T, filename, startupQueries string) *messageCache {
c, err := newSqliteCache(filename, startupQueries, 0, 0, false)
c, err := newSqliteCache(filename, startupQueries, time.Hour, 0, 0, false)
if err != nil {
t.Fatal(err)
}

View file

@ -41,7 +41,6 @@ import (
purge accounts that were not logged int o in X
reset daily Limits for users
Make sure account endpoints make sense for admins
add logic to set "expires" column (this is gonna be dirty)
UI:
- Align size of message bar and upgrade banner
- flicker of upgrade banner
@ -199,7 +198,7 @@ func createMessageCache(conf *Config) (*messageCache, error) {
if conf.CacheDuration == 0 {
return newNopCache()
} else if conf.CacheFile != "" {
return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
return newSqliteCache(conf.CacheFile, conf.CacheStartupQueries, conf.CacheDuration, conf.CacheBatchSize, conf.CacheBatchTimeout, false)
}
return newMemCache()
}