changeset 1193:58acc343b1b6

Implemented the db stuff of the review process. Needs testing.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 16 Nov 2018 18:35:09 +0100
parents 3afa71405b87
children 7db850de0952
files pkg/controllers/importqueue.go pkg/imports/queue.go pkg/imports/sr.go schema/gemma.sql
diffstat 4 files changed, 65 insertions(+), 7 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/importqueue.go	Fri Nov 16 17:51:44 2018 +0100
+++ b/pkg/controllers/importqueue.go	Fri Nov 16 18:35:09 2018 +0100
@@ -292,7 +292,16 @@
 
 const (
 	isPendingSQL = `
-SELECT state = 'pending'::waterway.import_state WHERE id = $1`
+SELECT state = 'pending'::waterway.import_state, kind WHERE id = $1`
+
+	reviewSQL = `
+UPDATE waterway.imports SET state = $1::waterway.import_state
+WHERE id = $2`
+
+	deleteImportDataSQL = `SELECT waterway.del_import($1)`
+
+	deleteImportTrackSQL = `
+DELETE FROM waterway.track_imports WHERE import_id = $1`
 )
 
 func reviewImport(
@@ -313,8 +322,9 @@
 	defer tx.Rollback()
 
 	var pending bool
+	var kind string
 
-	err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending)
+	err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind)
 	switch {
 	case err == sql.ErrNoRows:
 		err = JSONError{
@@ -333,9 +343,23 @@
 	}
 
 	if state == "accepted" {
-		// TODO: Call a stored procedure to do the grunt stuff
-		//       inside the database.
+		if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil {
+			if err = jc.StageDone(tx, ctx, id); err != nil {
+				return
+			}
+		}
+
+		if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
+			return
+		}
 	} else {
+		if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil {
+			return
+		}
+	}
+
+	if _, err = tx.ExecContext(ctx, reviewSQL, state, id); err != nil {
+		return
 	}
 
 	if err = tx.Commit(); err != nil {
--- a/pkg/imports/queue.go	Fri Nov 16 17:51:44 2018 +0100
+++ b/pkg/imports/queue.go	Fri Nov 16 18:35:09 2018 +0100
@@ -45,6 +45,7 @@
 	JobCreator interface {
 		Create(kind JobKind, data string) (Job, error)
 		Depends() []string
+		StageDone(*sql.Tx, context.Context, int64) error
 	}
 
 	idJob struct {
@@ -141,6 +142,10 @@
 	q.creators[kind] = jc
 }
 
+func FindJobCreator(kind JobKind) JobCreator {
+	return iqueue.jobCreator(kind)
+}
+
 func RegisterJobCreator(kind JobKind, jc JobCreator) {
 	log.Printf("info: register import job creator for kind '%s'\n", kind)
 	iqueue.registerJobCreator(kind, jc)
--- a/pkg/imports/sr.go	Fri Nov 16 17:51:44 2018 +0100
+++ b/pkg/imports/sr.go	Fri Nov 16 18:35:09 2018 +0100
@@ -63,6 +63,10 @@
 
 type srJobCreator struct{}
 
+func init() {
+	RegisterJobCreator(SRJobKind, srJobCreator{})
+}
+
 func (srJobCreator) Create(_ JobKind, data string) (Job, error) {
 	return SoundingResult(data), nil
 }
@@ -75,11 +79,20 @@
 	}
 }
 
-func init() {
-	RegisterJobCreator(SRJobKind, srJobCreator{})
+func (srJobCreator) StageDone(
+	tx *sql.Tx,
+	ctx context.Context,
+	id int64,
+) error {
+	_, err := tx.ExecContext(ctx, srStageDoneSQL, id)
+	return err
 }
 
 const (
+	srStageDoneSQL = `
+UPDATE waterway.sounding_results SET staging_done = true
+WHERE id = $1`
+
 	checkDepthReferenceSQL = `
 SELECT true FROM depth_references WHERE depth_reference = $1`
 
--- a/schema/gemma.sql	Fri Nov 16 17:51:44 2018 +0100
+++ b/schema/gemma.sql	Fri Nov 16 18:35:09 2018 +0100
@@ -549,10 +549,24 @@
     UNIQUE (relation, key)
 );
 
+CREATE FUNCTION waterway.del_import(imp_id int) RETURNS void AS
+$$
+DECLARE
+    tmp RECORD;
+BEGIN
+    FOR tmp IN
+        SELECT * FROM waterway.track_imports WHERE import_id = imp_id
+    LOOP
+        EXECUTE format('DELETE FROM %s WHERE id = $1', tmp.relation) USING tmp.key;
+    END LOOP;
+END;
+$$
+LANGUAGE plpgsql;
+
 CREATE FUNCTION waterway.del_import() RETURNS trigger AS
 $$
 BEGIN
-    EXECUTE format('DELETE FROM %I WHERE id = $1', OLD.relation) USING OLD.key;
+    EXECUTE format('DELETE FROM %s WHERE id = $1', OLD.relation) USING OLD.key;
 END;
 $$
 LANGUAGE plpgsql;
@@ -560,4 +574,6 @@
 CREATE TRIGGER delete_import AFTER DELETE ON waterway.track_imports
    FOR EACH ROW EXECUTE PROCEDURE waterway.del_import();
 
+
+
 COMMIT;