view pkg/imports/queue.go @ 1130:42617bba8709

Go from 50cm to 10cm stepwidth in calculating the contorlines.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 07 Nov 2018 17:54:31 +0100
parents a244b18cb916
children a5069da2f0b7
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"runtime/debug"
	"sync"
	"time"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/config"
)

type (
	Feedback interface {
		Info(fmt string, args ...interface{})
		Warn(fmt string, args ...interface{})
		Error(fmt string, args ...interface{})
	}

	Job interface {
		Do(*sql.Conn, Feedback) error
		CleanUp() error
	}

	JobKind string

	JobCreator func(kind JobKind, data string) (Job, error)

	idJob struct {
		id   int64
		kind JobKind
		user string
		data string
	}
)

const pollDuration = time.Second * 10

var (
	signalChan = make(chan struct{})

	creatorsMu sync.Mutex
	creators   = map[JobKind]JobCreator{}
)

const (
	queueUser = "sys_admin"

	reEnqueueRunningSQL = `
UPDATE waterway.imports SET state = 'queued'::waterway.import_state
WHERE state = 'running'::waterway.import_state
`

	insertJobSQL = `
INSERT INTO waterway.imports (
  kind,
  username,
  data
) VALUES (
  $1,
  $2,
  $3
) RETURNING id`

	selectJobSQL = `
SELECT
  id,
  kind,
  username,
  data
FROM waterway.imports 
WHERE state = 'queued'::waterway.import_state AND enqueued IN (
  SELECT min(enqueued)
  FROM waterway.imports 
  WHERE state = 'queued'::waterway.import_state
)
LIMIT 1
`
	updateStateSQL = `
UPDATE waterway.imports SET state = $1::waterway.import_state
WHERE id = $2
`
	logMessageSQL = `
INSERT INTO waterway.import_logs (
  import_id,
  kind,
  msg
) VALUES (
  $1,
  $2::waterway.log_type,
  $3
)`
)

func init() {
	go importLoop()
}

func RegisterJobCreator(kind JobKind, jc JobCreator) {
	log.Printf("info: register import job creator for kind '%s'\n", kind)
	creatorsMu.Lock()
	defer creatorsMu.Unlock()
	creators[kind] = jc
}

func jobCreator(kind JobKind) JobCreator {
	creatorsMu.Lock()
	defer creatorsMu.Unlock()
	return creators[kind]
}

func AddJob(kind JobKind, user, data string) (int64, error) {
	ctx := context.Background()
	var id int64
	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		return conn.QueryRowContext(ctx, insertJobSQL, string(kind), user, data).Scan(&id)
	})
	if err == nil {
		select {
		case signalChan <- struct{}{}:
		default:
		}
	}
	return id, err
}

type logFeedback int64

func (lf logFeedback) log(kind, format string, args ...interface{}) {
	ctx := context.Background()
	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(
			ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
		return err
	})
	if err != nil {
		log.Printf("logging failed: %v\n", err)
	}
}

func (lf logFeedback) Info(format string, args ...interface{}) {
	lf.log("info", format, args...)
}

func (lf logFeedback) Warn(format string, args ...interface{}) {
	lf.log("warn", format, args...)
}

func (lf logFeedback) Error(format string, args ...interface{}) {
	lf.log("error", format, args...)
}

func survive(fn func() error) func() error {
	return func() error {
		errCh := make(chan error)
		go func() {
			defer func() {
				if err := recover(); err != nil {
					errCh <- fmt.Errorf("%v: %s",
						err, string(debug.Stack()))
				}
			}()
			errCh <- fn()
		}()
		return <-errCh
	}
}

func reEnqueueRunning() error {
	ctx := context.Background()
	return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
		return err
	})
}

func fetchJob() (*idJob, error) {
	var ji idJob
	ctx := context.Background()
	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		if err = tx.QueryRowContext(ctx, selectJobSQL).Scan(
			&ji.id, &ji.kind, &ji.user, &ji.data); err != nil {
			return err
		}
		_, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}
	return &ji, nil
}

func updateState(id int64, state string) error {
	ctx := context.Background()
	return auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		_, err := conn.ExecContext(ctx, updateStateSQL, state, id)
		return err
	})
}

func errorAndFail(id int64, format string, args ...interface{}) error {
	ctx := context.Background()
	err := auth.RunAs(queueUser, ctx, func(conn *sql.Conn) error {
		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return err
		}
		defer tx.Rollback()
		_, err = conn.ExecContext(
			ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
		if err != nil {
			return err
		}
		_, err = conn.ExecContext(
			ctx, updateStateSQL, "failed", id)
		if err == nil {
			err = tx.Commit()
		}
		return err
	})
	return err
}

func importLoop() {
	config.WaitReady()
	// re-enqueue the jobs that are in state running.
	// They where in progess when the server went down.
	if err := reEnqueueRunning(); err != nil {
		log.Printf("re-enqueuing failed: %v", err)
	}

	for {
		var idj *idJob
		var err error

		for {
			if idj, err = fetchJob(); err != nil {
				log.Printf("db error: %v\n", err)
			}
			if idj != nil {
				break
			}
			select {
			case <-signalChan:
			case <-time.After(pollDuration):
			}
		}

		log.Printf("starting import #%d\n", idj.id)

		jc := jobCreator(idj.kind)
		if jc == nil {
			errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
			continue
		}

		job, err := jc(idj.kind, idj.data)
		if err != nil {
			errorAndFail(idj.id, "failed to create job for import #%d: %v",
				idj.id, err)
			continue
		}

		feedback := logFeedback(idj.id)

		feedback.Info("import #%d started\n", idj.id)

		errDo := survive(func() error {
			return auth.RunAs(idj.user, context.Background(),
				func(conn *sql.Conn) error { return job.Do(conn, feedback) })
		})()
		if errDo != nil {
			feedback.Error("error do: %v\n", errDo)
		}
		errCleanup := survive(job.CleanUp)()
		if errCleanup != nil {
			feedback.Error("error cleanup: %v\n", errCleanup)
		}

		var state string
		if errDo != nil || errCleanup != nil {
			state = "failed"
		} else {
			state = "successful"
		}
		if err := updateState(idj.id, state); err != nil {
			log.Printf("setting state of job %d failed: %v\n", idj.id, err)
		}
		log.Printf("import #%d finished: %s\n", idj.id, state)
	}
}