view cmd/meshmigrate/main.go @ 5707:d5cafd49bed8 sr-v2

Used unused parameter.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 20 Feb 2024 21:25:08 +0100
parents a3a975ea93ca
children
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

// meshmigrate can be used to migrate mesh indices from one version to another.
package main

import (
	"crypto/sha1"
	"database/sql"
	"encoding/hex"
	"flag"
	"fmt"
	"log"
	"os"
	"regexp"
	"strconv"
	"strings"

	"gemma.intevation.de/gemma/pkg/mesh"
	"github.com/jackc/pgx"
	"github.com/jackc/pgx/stdlib"
)

type credentials struct {
	host     string
	port     uint
	database string
	user     string
	password string
}

func (c *credentials) openDB() (*sql.DB, error) {
	cc, err := pgx.ParseConnectionString("sslmode=prefer")
	if err != nil {
		return nil, err
	}
	cc.Host = c.host
	cc.Port = uint16(c.port)
	cc.User = c.user
	cc.Password = c.password
	cc.Database = c.database
	return stdlib.OpenDB(cc), nil
}

const (
	fetchSQL = `
SELECT
	id,
	coalesce(mesh_index_version, 1) AS mesh_index_version,
	mesh_index
FROM waterway.sounding_results
WHERE mesh_index IS NOT NULL AND `

	updateSQL = `
UPDATE waterway.sounding_results SET
	mesh_index_version = $1,
	mesh_index = $2,
	mesh_checksum = $3
WHERE id = $4`
)

func restore(creds *credentials, dry bool) error {
	db, err := creds.openDB()
	if err != nil {
		return err
	}
	defer db.Close()

	dir, err := os.Getwd()
	if err != nil {
		return err
	}
	log.Println(dir)
	entries, err := os.ReadDir(dir)
	if err != nil {
		return err
	}
	log.Println(len(entries))
	pattern := regexp.MustCompile(`^mesh-(\d+)-v(\d+)\.idx\.gz$`)
	for _, entry := range entries {
		name := entry.Name()
		m := pattern.FindStringSubmatch(name)
		if m == nil {
			continue
		}
		id, err := strconv.ParseInt(m[1], 10, 64)
		if err != nil {
			return err
		}
		version, err := strconv.ParseInt(m[2], 10, 64)
		if err != nil {
			return err
		}
		data, err := os.ReadFile(name)
		if err != nil {
			return err
		}
		h := sha1.New()
		h.Write(data)
		checksum := hex.EncodeToString(h.Sum(nil))
		log.Printf("restore mesh %d version %d\n", id, version)
		if dry {
			continue
		}
		if _, err := db.Exec(updateSQL, version, data, checksum, id); err != nil {
			return err
		}
	}
	return nil
}

func process(
	creds *credentials,
	to int,
	ids []int64,
	limit int,
	dry, backup bool,
) error {
	db, err := creds.openDB()
	if err != nil {
		return err
	}
	defer db.Close()

	sql := fetchSQL + fmt.Sprintf("coalesce(mesh_index_version, 1) < %d", to)

	if len(ids) > 0 {
		sql += " AND " + idsFilter(ids)
	}

	if limit > -1 {
		sql += fmt.Sprintf(" LIMIT %d", limit)
	}

	rows, err := db.Query(sql)
	if err != nil {
		return err
	}
	defer rows.Close()

	var totalIn, totalOut int64
	migrated := 0

	for rows.Next() {
		var id int64
		var version int
		var data []byte
		if err := rows.Scan(&id, &version, &data); err != nil {
			return err
		}
		if len(data) == 0 {
			log.Printf("mesh %d is empty\n", id)
			continue
		}
		log.Printf("processing mesh %d\n", id)
		totalIn += int64(len(data))
		migrated++
		if backup {
			fname := fmt.Sprintf("mesh-%d-v%d.idx.gz", id, version)
			if err := os.WriteFile(fname, data, 0666); err != nil {
				return err
			}
		}
		src := new(mesh.STRTree)
		if err := src.FromBytes(data, version); err != nil {
			return err
		}
		src.OptimizeForSerialization(to)
		out, newVersion, err := src.Bytes(to)
		if err != nil {
			return err
		}
		totalOut += int64(len(out))
		if backup {
			fname := fmt.Sprintf("migrated-mesh-%d-v%d.idx.gz", id, newVersion)
			if err := os.WriteFile(fname, out, 0666); err != nil {
				return err
			}
		}
		if dry {
			continue
		}
		h := sha1.New()
		h.Write(out)
		checksum := hex.EncodeToString(h.Sum(nil))
		if _, err := db.Exec(updateSQL, newVersion, out, checksum, id); err != nil {
			return err
		}
	}
	if rows.Err(); err != nil {
		return err
	}
	log.Printf("migrated meshes: %d\n", migrated)
	log.Printf("in: %d (%.2f MB)\n", totalIn, float64(totalIn)/(1024*1024))
	log.Printf("out: %d (%.2f MB)\n", totalOut, float64(totalOut)/(1024*1024))
	log.Printf("ratio: %.2f%%\n", float64(totalOut)/float64(totalIn)*100)

	return nil
}

func idsFilter(ids []int64) string {
	if len(ids) == 0 {
		return ""
	}
	var b strings.Builder
	for i, id := range ids {
		if i > 0 {
			b.WriteByte(',')
		}
		b.WriteString(strconv.FormatInt(id, 10))
	}
	return "id IN (" + b.String() + ")"
}

func check(err error) {
	if err != nil {
		log.Fatalf("error: %v\n", err)
	}
}

func toIDs(s string) ([]int64, error) {
	if s == "" {
		return nil, nil
	}
	var ids []int64
	for _, f := range strings.Split(s, ",") {
		f = strings.TrimSpace(f)
		id, err := strconv.ParseInt(f, 10, 64)
		if err != nil {
			return nil, err
		}
		ids = append(ids, id)
	}
	return ids, nil
}

func checkMode(mode string) error {
	switch mode {
	case "migrate", "restore":
		return nil
	default:
		return fmt.Errorf("unknown mode %q", mode)
	}
}

func main() {
	var (
		creds  credentials
		idsS   string
		limit  int
		dry    bool
		backup bool
		to     int
		mode   string
	)
	flag.StringVar(&creds.host, "host", "localhost", "host of the database server")
	flag.UintVar(&creds.port, "port", 5432, "port of the database server")
	flag.StringVar(&creds.database, "database", "gemma", "database name")
	flag.StringVar(&creds.user, "user", "gemma", "database user")
	flag.StringVar(&creds.password, "password", "gemma", "database user password")
	flag.IntVar(&limit, "limit", -1, "limiting number of mesh to migrate (-1: no limit)")
	flag.StringVar(&idsS, "ids", "", "filter ids (empty: no ids)")
	flag.BoolVar(&dry, "dry", false, "to a dry run")
	flag.BoolVar(&backup, "backup", true, "store backup in file system")
	flag.IntVar(&to, "to", 2, "version to store")
	flag.StringVar(&mode, "mode", "migrate", "operation mode: migrate, restore")
	flag.Parse()
	ids, err := toIDs(idsS)
	check(err)
	check(checkMode(mode))
	if mode == "migrate" {
		check(process(&creds, to, ids, limit, dry, backup))
	} else {
		check(restore(&creds, dry))
	}
}