view contrib/gmaggregate/main.go @ 5560:f2204f91d286

Join the log lines of imports to the log exports to recover data from them. Used in SR export to extract information that where in the meta json but now are only found in the log.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Feb 2022 18:34:40 +0100
parents e9ef27c75e5c
children 2dd155cc95ec
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSE
//
// Copyright (C) 2021 by via donau
//   - Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

//go:generate ragel -Z -G2 -o matcher.go matcher.rl
//go:generate go fmt matcher.go

package main

import (
	"container/heap"
	"context"
	"database/sql"
	"encoding/csv"
	"flag"
	"fmt"
	"log"
	"os"
	"runtime"
	"sort"
	"strconv"
	"strings"
	"sync"
	"time"

	_ "github.com/jackc/pgx/v4/stdlib"
)

const (
	selectOldGMLogsSQL = `
  SELECT
    lo.import_id,
    lo.time,
    lo.kind,
    lo.msg
  FROM import.imports im
  JOIN import.import_logs lo
    ON lo.import_id = im.id
  WHERE im.kind = 'gm'
  ORDER BY lo.import_id`

	createFilteredLogsSQL = `
  CREATE TABLE filtered_logs (
    import_id integer                  NOT NULL,
    time      timestamp with time zone NOT NULL,
    kind      log_type                 NOT NULL,
    msg       text                     NOT NULL
  )`

	insertFilteredLogsSQL = `
  INSERT INTO filtered_logs (import_id, time, kind, msg)
    VALUES ($1, $2, $3::log_type, $4)`

	deleteOldGMLogsSQL = `
  DELETE FROM import.import_logs WHERE import_id IN (
    SELECT import_id FROM filtered_logs)`

	copyDataSQL = `
  INSERT INTO import.import_logs (import_id, time, kind, msg)
  SELECT import_id, time, kind, msg
  FROM filtered_logs`

	dropFilteredLogsSQL = `DROP TABLE filtered_logs`
)

type phases int

const (
	nonePhase   phases = 0
	filterPhase phases = 1 << iota
	transferPhase
)

type gauge struct {
	gid           string
	unknown       bool
	assumeZPG     bool
	ignMeasCodes  []string
	rescaleErrors []string
	missingValues []string
	assumeCM      int
	badValues     int
	measurements  int
	predictions   int
}

type aggregator struct {
	current string
	hold    *line

	lastGauge *gauge
	gauges    []*gauge

	stack [4]string
}

type line struct {
	time time.Time
	kind string
	msg  string
}

type importLines struct {
	seq   int
	id    int64
	lines []line
}

type processor struct {
	cond       *sync.Cond
	aggregated []*importLines
	nextOutSeq int
	done       bool
}

type writer interface {
	prepare(context.Context, *sql.Conn) error
	write(*importLines)
	finish()
	error() error
}

type csvWriter struct {
	err  error
	file *os.File
	out  *csv.Writer
	row  [1 + 1 + 1 + 1]string
}

type sqlWriter struct {
	err  error
	ctx  context.Context
	tx   *sql.Tx
	stmt *sql.Stmt
}

func (ps phases) has(p phases) bool {
	return ps&p == p
}

func parsePhases(s string) (phases, error) {
	ps := nonePhase
	for _, x := range strings.Split(s, ",") {
		switch strings.ToLower(strings.TrimSpace(x)) {
		case "transfer":
			ps |= transferPhase
		case "filter":
			ps |= filterPhase
		default:
			return nonePhase, fmt.Errorf("invalid phase '%s'", x)
		}
	}
	return ps, nil
}

func (g *gauge) getAssumeZPG() bool               { return g.assumeZPG }
func (g *gauge) getUnknown() bool                 { return g.unknown }
func (g *gauge) getIgnoredMeasureCodes() []string { return g.ignMeasCodes }
func (g *gauge) getRescaleErrors() []string       { return g.rescaleErrors }
func (g *gauge) getMissingValues() []string       { return g.missingValues }
func (g *gauge) getAssumeCM() int                 { return g.assumeCM }
func (g *gauge) getBadValues() int                { return g.badValues }
func (g *gauge) getPredictions() int              { return g.predictions }
func (g *gauge) getMeasurements() int             { return g.measurements }
func (g *gauge) nothingChanged() bool             { return g.measurements == 0 && g.predictions == 0 }

func (agg *aggregator) reset() {
	agg.current = ""
	agg.hold = nil
	agg.lastGauge = nil
	agg.gauges = nil
}

func (agg *aggregator) find(name string) *gauge {
	if agg.lastGauge != nil && name == agg.lastGauge.gid {
		return agg.lastGauge
	}
	for _, g := range agg.gauges {
		if g.gid == name {
			agg.lastGauge = g
			return g
		}
	}
	g := &gauge{gid: name}
	agg.gauges = append(agg.gauges, g)
	agg.lastGauge = g
	return g
}

func extend(haystack []string, needle string) []string {
	for _, straw := range haystack {
		if straw == needle {
			return haystack
		}
	}
	return append(haystack, needle)
}

func (agg *aggregator) logBool(
	access func(*gauge) bool,
	header string,
	log func(string),
) {
	var sb strings.Builder
	for _, g := range agg.gauges {
		if access(g) {
			if sb.Len() == 0 {
				sb.WriteString(header)
			} else {
				sb.WriteString(", ")
			}
			sb.WriteString(g.gid)
		}
	}
	if sb.Len() > 0 {
		log(sb.String())
	}
}

func (agg *aggregator) logInt(
	access func(*gauge) int,
	header string,
	log func(string),
) {
	gs := make([]*gauge, 0, len(agg.gauges))
	for _, g := range agg.gauges {
		if access(g) > 0 {
			gs = append(gs, g)
		}
	}

	if len(gs) == 0 {
		return
	}

	sort.SliceStable(gs, func(i, j int) bool {
		return access(gs[i]) < access(gs[j])
	})

	var sb strings.Builder
	var last int

	for _, g := range gs {
		if c := access(g); c != last {
			if sb.Len() == 0 {
				sb.WriteString(header)
			} else {
				sb.WriteString("); ")
			}
			sb.WriteString(strconv.Itoa(c))
			sb.WriteString(" (")
			last = c
		} else {
			sb.WriteString(", ")
		}
		sb.WriteString(g.gid)
	}

	sb.WriteByte(')')
	log(sb.String())
}

func (agg *aggregator) logString(
	access func(*gauge) []string,
	header string,
	log func(string),
) {
	var sb strings.Builder
	for _, g := range agg.gauges {
		if s := access(g); len(s) > 0 {
			if sb.Len() == 0 {
				sb.WriteString(header)
			} else {
				sb.WriteString(", ")
			}
			sb.WriteString(g.gid)
			sb.WriteString(" (")
			for i, v := range s {
				if i > 0 {
					sb.WriteString("; ")
				}
				sb.WriteString(v)
			}
			sb.WriteByte(')')
		}
	}
	if sb.Len() > 0 {
		log(sb.String())
	}
}

func (agg *aggregator) aggregate(out []line, last time.Time) []line {

	// Guarantee that new lines has a time after already put out lines.
	if n := len(out); n > 0 && !out[n-1].time.Before(last) {
		last = out[n-1].time.Add(time.Millisecond)
	}

	log := func(kind, msg string) {
		out = append(out, line{last, kind, msg})
		last = last.Add(time.Millisecond)
	}

	infoLog := func(msg string) { log("info", msg) }
	warnLog := func(msg string) { log("warn", msg) }
	errLog := func(msg string) { log("error", msg) }

	agg.logBool(
		(*gauge).getUnknown,
		"Cannot find following gauges: ",
		warnLog)

	agg.logBool(
		(*gauge).getAssumeZPG,
		"'Reference_code' not specified. Assuming 'ZPG': ",
		warnLog)

	agg.logInt(
		(*gauge).getAssumeCM,
		"'Unit' not specified. Assuming 'cm': ",
		warnLog)

	agg.logInt(
		(*gauge).getBadValues,
		"Ignored measurements with value -99999: ",
		warnLog)

	agg.logString(
		(*gauge).getMissingValues,
		"Missing mandatory values: ",
		warnLog)

	agg.logString(
		(*gauge).getRescaleErrors,
		"Cannot convert units: ",
		errLog)

	agg.logString(
		(*gauge).getRescaleErrors,
		"Ignored measure codes: ",
		warnLog)

	agg.logInt(
		(*gauge).getPredictions,
		"New predictions: ",
		infoLog)

	agg.logInt(
		(*gauge).getMeasurements,
		"New measurements: ",
		infoLog)

	agg.logBool(
		(*gauge).nothingChanged,
		"No changes for: ",
		infoLog)

	if agg.hold != nil {
		agg.hold.time = last
		out = append(out, *agg.hold)
	}
	return out
}

func (agg *aggregator) run(
	wg *sync.WaitGroup,
	logs <-chan *importLines,
	pr *processor,
) {
	defer wg.Done()
	for l := range logs {
		// Do sorting by time in user land to take advantage
		// of concurrent workers.
		lines := l.lines
		sort.Slice(lines, func(i, j int) bool {
			return lines[i].time.Before(lines[j].time)
		})

		out := lines[:0:len(lines)]
		for i := range lines {
			line := &lines[i]
			if !agg.match(line.msg, line) {
				out = append(out, *line)
			}
		}
		l.lines = agg.aggregate(out, lines[len(lines)-1].time)
		pr.consume(l)
		agg.reset()
	}
}

const timeFormat = "2006-01-02 15:04:05.999999-07"

func newCSVWriter(filename string) (*csvWriter, error) {

	f, err := os.Create(filename)
	if err != nil {
		return nil, err
	}

	return &csvWriter{
		file: f,
		out:  csv.NewWriter(f),
	}, nil
}

func (cw *csvWriter) prepare(context.Context, *sql.Conn) error {
	return nil
}

func (cw *csvWriter) error() error { return cw.err }

func (cw *csvWriter) write(entry *importLines) {
	if cw.err != nil {
		return
	}
	row := cw.row[:]
	row[0] = strconv.FormatInt(entry.id, 10)
	for _, l := range entry.lines {
		row[1] = l.time.Format(timeFormat)
		row[2] = l.kind
		row[3] = l.msg
		if cw.err = cw.out.Write(row); cw.err != nil {
			log.Printf("error: Writing to CSV file failed: %v\n", cw.err)
			return
		}
	}
}

func (cw *csvWriter) finish() {
	cw.out.Flush()
	if err := cw.out.Error(); err != nil {
		log.Printf("error: flushing CSV file failed: %v\n", err)
	}
	if err := cw.file.Close(); err != nil {
		log.Printf("Closing CSV file failed: %v\n", err)
	}
}

func (sw *sqlWriter) prepare(ctx context.Context, conn *sql.Conn) error {

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return err
	}

	if _, err := tx.ExecContext(ctx, createFilteredLogsSQL); err != nil {
		tx.Rollback()
		return fmt.Errorf("cannot create new log table: %v\n", err)
	}

	stmt, err := tx.PrepareContext(ctx, insertFilteredLogsSQL)
	if err != nil {
		tx.Rollback()
		return err
	}

	sw.ctx = ctx
	sw.tx = tx
	sw.stmt = stmt
	return nil
}

func (sw *sqlWriter) error() error { return sw.err }

func (sw *sqlWriter) write(entry *importLines) {
	if sw.err != nil {
		return
	}
	for _, l := range entry.lines {
		if _, sw.err = sw.stmt.ExecContext(
			sw.ctx,
			entry.id,
			l.time,
			l.kind,
			l.msg,
		); sw.err != nil {
			log.Printf("error: writing log line to db failed: %v\n", sw.err)
			return
		}
	}
}

func (sw *sqlWriter) finish() {
	if err := sw.stmt.Close(); err != nil {
		log.Printf("error: close stmt failed: %v\n", err)
	}
	if sw.err == nil {
		if err := sw.tx.Commit(); err != nil {
			log.Printf("error: Commiting transaction failed: %v\n", err)
		}
	} else if err := sw.tx.Rollback(); err != nil {
		log.Printf("error: Rollback transaction failed: %v\n", err)
	}
}

func (pr *processor) Push(x interface{}) {
	pr.aggregated = append(pr.aggregated, x.(*importLines))
}

func (pr *processor) Pop() interface{} {
	n := len(pr.aggregated)
	x := pr.aggregated[n-1]
	pr.aggregated[n-1] = nil
	pr.aggregated = pr.aggregated[:n-1]
	return x
}

func (pr *processor) Len() int { return len(pr.aggregated) }

func (pr *processor) Less(i, j int) bool {
	return pr.aggregated[i].seq < pr.aggregated[j].seq
}

func (pr *processor) Swap(i, j int) {
	pr.aggregated[i], pr.aggregated[j] = pr.aggregated[j], pr.aggregated[i]
}

func (pr *processor) consume(l *importLines) {
	pr.cond.L.Lock()
	heap.Push(pr, l)
	pr.cond.L.Unlock()
	pr.cond.Signal()
}

func (pr *processor) quit() {
	pr.cond.L.Lock()
	pr.done = true
	pr.cond.L.Unlock()
	pr.cond.Signal()
}

func (pr *processor) drain(write func(*importLines)) {

	for {
		pr.cond.L.Lock()
		for !pr.done &&
			(len(pr.aggregated) == 0 || pr.aggregated[0].seq != pr.nextOutSeq) {
			pr.cond.Wait()
		}
		if pr.done {
			for len(pr.aggregated) > 0 {
				write(heap.Pop(pr).(*importLines))
			}
			pr.cond.L.Unlock()
			return
		}
		l := heap.Pop(pr).(*importLines)
		//log.Printf("%d %p\n", c.nextOutSeq, l)
		pr.nextOutSeq++
		pr.cond.L.Unlock()
		write(l)
	}
}

func (pr *processor) filterPhase(db *sql.DB, worker int, wr writer) error {

	log.Println("filter phase started")

	ctx := context.Background()

	con1, err := db.Conn(ctx)
	if err != nil {
		return err
	}
	defer con1.Close()

	con2, err := db.Conn(ctx)
	if err != nil {
		return err
	}
	defer con2.Close()

	tx, err := con1.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
	if err != nil {
		return err
	}
	defer tx.Rollback()

	if err := wr.prepare(ctx, con2); err != nil {
		return err
	}
	defer wr.finish()

	logs := make(chan *importLines)
	var wg sync.WaitGroup

	for i := 0; i < worker; i++ {
		wg.Add(1)
		go new(aggregator).run(&wg, logs, pr)
	}

	writeDone := make(chan struct{})

	go func() {
		defer close(writeDone)
		pr.drain(wr.write)
	}()

	log.Println("Querying for old logs started. (Can take a while.)")
	rows, err := tx.QueryContext(ctx, selectOldGMLogsSQL)
	if err != nil {
		return err
	}
	defer rows.Close()

	log.Println("Querying done. (Maybe restart the gemma server, now?)")

	var (
		count    int64
		current  *importLines
		seq      int
		l        line
		importID int64
		start    = time.Now()
		last     = start
	)

	log.Println("Filtering started.")
	for rows.Next() {
		if err := rows.Scan(&importID, &l.time, &l.kind, &l.msg); err != nil {
			return err
		}

		if current == nil || importID != current.id {
			if current != nil {
				logs <- current
			}
			current = &importLines{
				seq: seq,
				id:  importID,
			}
			seq++
		}
		current.lines = append(current.lines, l)

		if count++; count%1_000_000 == 0 {
			now := time.Now()
			diff := now.Sub(last)
			log.Printf("lines: %d rate: %.2f lines/s\n",
				count,
				1_000_000/diff.Seconds())
			last = now
		}
	}
	if current != nil && len(current.lines) > 0 {
		logs <- current
	}
	close(logs)
	wg.Wait()

	pr.quit()

	<-writeDone

	rate := float64(count) / time.Since(start).Seconds()
	log.Printf("lines: %d rate: %.2f lines/s imports: %d\n",
		count, rate, seq)
	return nil
}

func (pr *processor) transferPhase(db *sql.DB) error {
	log.Println("Transfer phase started.")
	ctx := context.Background()
	conn, err := db.Conn(ctx)
	if err != nil {
		return err
	}
	defer conn.Close()
	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return err
	}
	defer tx.Rollback()

	for _, sql := range []string{
		deleteOldGMLogsSQL,
		copyDataSQL,
		dropFilteredLogsSQL,
	} {
		if _, err := tx.ExecContext(ctx, sql); err != nil {
			return err
		}
	}
	return tx.Commit()
}

func newProcessor() *processor {
	return &processor{
		cond: sync.NewCond(new(sync.Mutex)),
	}
}

func process(
	host, dbname string, port int,
	worker int,
	csvFile string,
	ps phases,
) error {

	p := newProcessor()
	var wr writer

	if csvFile != "" {
		var err error
		if wr, err = newCSVWriter(csvFile); err != nil {
			return fmt.Errorf("error: Cannot create CSV file: %v", err)
		}
	} else {
		wr = new(sqlWriter)
	}

	dsn := fmt.Sprintf("host=%s dbname=%s port=%d", host, dbname, port)
	db, err := sql.Open("pgx", dsn)
	if err != nil {
		return err
	}
	defer db.Close()

	if ps.has(filterPhase) {
		if err := p.filterPhase(db, worker, wr); err != nil {
			return err
		}
	}
	if ps.has(transferPhase) {
		if err := p.transferPhase(db); err != nil {
			return err
		}
	}

	return nil
}

func main() {
	var (
		host   = flag.String("h", "/var/run/postgresql", "database host")
		dbname = flag.String("d", "gemma", "database")
		port   = flag.Int("p", 5432, "database port")
		worker = flag.Int("w", runtime.NumCPU(), "workers to aggregate")
		csv    = flag.String("c", "", "CSV file to be written")
		phases = flag.String("phases", "filter,transfer", "Phases filter and/or transfer")
	)

	flag.Parse()

	ps, err := parsePhases(*phases)
	if err != nil {
		log.Fatalf("error: %v\n", err)
	}

	start := time.Now()
	if err := process(*host, *dbname, *port, *worker, *csv, ps); err != nil {
		log.Fatalf("error: %v\n", err)
	}
	log.Printf("time took: %s\n", time.Since(start))
}