Mercurial > gemma
changeset 1193:58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Fri, 16 Nov 2018 18:35:09 +0100 |
parents | 3afa71405b87 |
children | 7db850de0952 |
files | pkg/controllers/importqueue.go pkg/imports/queue.go pkg/imports/sr.go schema/gemma.sql |
diffstat | 4 files changed, 65 insertions(+), 7 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/controllers/importqueue.go Fri Nov 16 17:51:44 2018 +0100 +++ b/pkg/controllers/importqueue.go Fri Nov 16 18:35:09 2018 +0100 @@ -292,7 +292,16 @@ const ( isPendingSQL = ` -SELECT state = 'pending'::waterway.import_state WHERE id = $1` +SELECT state = 'pending'::waterway.import_state, kind WHERE id = $1` + + reviewSQL = ` +UPDATE waterway.imports SET state = $1::waterway.import_state +WHERE id = $2` + + deleteImportDataSQL = `SELECT waterway.del_import($1)` + + deleteImportTrackSQL = ` +DELETE FROM waterway.track_imports WHERE import_id = $1` ) func reviewImport( @@ -313,8 +322,9 @@ defer tx.Rollback() var pending bool + var kind string - err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending) + err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind) switch { case err == sql.ErrNoRows: err = JSONError{ @@ -333,9 +343,23 @@ } if state == "accepted" { - // TODO: Call a stored procedure to do the grunt stuff - // inside the database. + if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil { + if err = jc.StageDone(tx, ctx, id); err != nil { + return + } + } + + if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { + return + } } else { + if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil { + return + } + } + + if _, err = tx.ExecContext(ctx, reviewSQL, state, id); err != nil { + return } if err = tx.Commit(); err != nil {
--- a/pkg/imports/queue.go Fri Nov 16 17:51:44 2018 +0100 +++ b/pkg/imports/queue.go Fri Nov 16 18:35:09 2018 +0100 @@ -45,6 +45,7 @@ JobCreator interface { Create(kind JobKind, data string) (Job, error) Depends() []string + StageDone(*sql.Tx, context.Context, int64) error } idJob struct { @@ -141,6 +142,10 @@ q.creators[kind] = jc } +func FindJobCreator(kind JobKind) JobCreator { + return iqueue.jobCreator(kind) +} + func RegisterJobCreator(kind JobKind, jc JobCreator) { log.Printf("info: register import job creator for kind '%s'\n", kind) iqueue.registerJobCreator(kind, jc)
--- a/pkg/imports/sr.go Fri Nov 16 17:51:44 2018 +0100 +++ b/pkg/imports/sr.go Fri Nov 16 18:35:09 2018 +0100 @@ -63,6 +63,10 @@ type srJobCreator struct{} +func init() { + RegisterJobCreator(SRJobKind, srJobCreator{}) +} + func (srJobCreator) Create(_ JobKind, data string) (Job, error) { return SoundingResult(data), nil } @@ -75,11 +79,20 @@ } } -func init() { - RegisterJobCreator(SRJobKind, srJobCreator{}) +func (srJobCreator) StageDone( + tx *sql.Tx, + ctx context.Context, + id int64, +) error { + _, err := tx.ExecContext(ctx, srStageDoneSQL, id) + return err } const ( + srStageDoneSQL = ` +UPDATE waterway.sounding_results SET staging_done = true +WHERE id = $1` + checkDepthReferenceSQL = ` SELECT true FROM depth_references WHERE depth_reference = $1`
--- a/schema/gemma.sql Fri Nov 16 17:51:44 2018 +0100 +++ b/schema/gemma.sql Fri Nov 16 18:35:09 2018 +0100 @@ -549,10 +549,24 @@ UNIQUE (relation, key) ); +CREATE FUNCTION waterway.del_import(imp_id int) RETURNS void AS +$$ +DECLARE + tmp RECORD; +BEGIN + FOR tmp IN + SELECT * FROM waterway.track_imports WHERE import_id = imp_id + LOOP + EXECUTE format('DELETE FROM %s WHERE id = $1', tmp.relation) USING tmp.key; + END LOOP; +END; +$$ +LANGUAGE plpgsql; + CREATE FUNCTION waterway.del_import() RETURNS trigger AS $$ BEGIN - EXECUTE format('DELETE FROM %I WHERE id = $1', OLD.relation) USING OLD.key; + EXECUTE format('DELETE FROM %s WHERE id = $1', OLD.relation) USING OLD.key; END; $$ LANGUAGE plpgsql; @@ -560,4 +574,6 @@ CREATE TRIGGER delete_import AFTER DELETE ON waterway.track_imports FOR EACH ROW EXECUTE PROCEDURE waterway.del_import(); + + COMMIT;