view pkg/mesh/cache.go @ 5718:3d497077f888 uploadwg

Implemented direct file upload as alternative import method for WG. For testing and data corrections it is useful to be able to import waterway gauges data directly by uploading a xml file.
author Sascha Wilde <wilde@sha-bang.de>
date Thu, 18 Apr 2024 19:23:19 +0200
parents 03dfbe675842
children 1ea1d3ef2258
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package mesh

import (
	"context"
	"database/sql"
	"sync"
	"time"
)

type (
	cacheKey struct {
		date       time.Time
		bottleneck string
	}

	cacheEntry struct {
		checksum string
		tree     *STRTree
		access   time.Time
	}

	// Cache holds Octrees for a defined amount of time in memory
	// before they are released.
	Cache struct {
		sync.Mutex
		entries map[cacheKey]*cacheEntry
	}
)

const (
	cleanupCacheSleep = 6 * time.Minute
	maxCacheAge       = 5 * time.Minute
	maxCacheEntries   = 4
)

const (
	directMeshSQL = `
SELECT mesh_index, coalesce(mesh_index_version, 1) AS mesh_index_version
FROM waterway.sounding_results
WHERE id = $1
`
	fetchMeshSQL = `
SELECT mesh_checksum, mesh_index, coalesce(mesh_index_version, 1) AS mesh_index_version
FROM waterway.sounding_results
WHERE bottleneck_id = $1 AND date_info = $2::date
  AND mesh_checksum IS NOT NULL AND mesh_index IS NOT NULL
`
	checkMeshSQL = `
SELECT CASE
  WHEN mesh_checksum = $3 THEN NULL
  ELSE mesh_index
  END,
  coalesce(mesh_index_version, 1) AS mesh_index_version
FROM waterway.sounding_results
WHERE bottleneck_id = $1 AND date_info = $2::date
  AND mesh_checksum IS NOT NULL AND mesh_index IS NOT NULL
`
)

var cache Cache

func (c *Cache) background() {
	for {
		time.Sleep(cleanupCacheSleep)
		c.cleanup()
	}
}

func (c *Cache) cleanup() {
	c.Lock()
	defer c.Unlock()
	good := time.Now().Add(-maxCacheAge)
	for k, v := range c.entries {
		if v.access.Before(good) {
			delete(c.entries, k)
		}
	}
}

// FromCache fetches an Octree from the global Octree cache.
func FromCache(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string, date time.Time,
) (*STRTree, error) {
	return cache.get(ctx, conn, bottleneck, date)
}

// FetchMeshDirectly loads a mesh directly from the database.
func FetchMeshDirectly(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) (*STRTree, error) {
	var data []byte
	var version int
	err := tx.QueryRowContext(ctx, directMeshSQL, id).Scan(&data, &version)
	if err != nil {
		return nil, err
	}
	tree := new(STRTree)
	if err := tree.FromBytes(data, version); err != nil {
		return nil, err
	}
	return tree, nil
}

func (c *Cache) get(
	ctx context.Context,
	conn *sql.Conn,
	bottleneck string, date time.Time,
) (*STRTree, error) {
	c.Lock()
	defer c.Unlock()

	// Start background cleanup lazily.
	if c.entries == nil {
		c.entries = map[cacheKey]*cacheEntry{}
		go c.background()
	}

	key := cacheKey{date, bottleneck}
	entry := c.entries[key]

	var data []byte
	var checksum string
	var version int

	if entry == nil {
		// fetch from database
		err := conn.QueryRowContext(
			ctx, fetchMeshSQL, bottleneck, date).Scan(&checksum, &data, &version)
		switch {
		case err == sql.ErrNoRows:
			return nil, nil
		case err != nil:
			return nil, err
		}
	} else {
		// check if we are not outdated.
		err := conn.QueryRowContext(
			ctx, checkMeshSQL, bottleneck, date, entry.checksum).Scan(&data, &version)
		switch {
		case err == sql.ErrNoRows:
			return nil, nil
		case err != nil:
			return nil, err
		}
		if data == nil { // we are still current
			entry.access = time.Now()
			return entry.tree, nil
		}
	}

	tree := new(STRTree)

	if err := tree.FromBytes(data, version); err != nil {
		return nil, err
	}

	now := time.Now()

	if entry != nil {
		entry.tree = tree
		entry.access = now
		return tree, nil
	}

	for len(c.entries) >= maxCacheEntries {
		// Evict the entry that is accessed the longest time ago.
		var oldestKey cacheKey
		oldest := now

		for k, v := range c.entries {
			if v.access.Before(oldest) {
				oldest = v.access
				oldestKey = k
			}
		}
		delete(c.entries, oldestKey)
	}

	c.entries[key] = &cacheEntry{
		checksum: checksum,
		tree:     tree,
		access:   now,
	}

	return tree, nil
}