Mercurial > gemma
view pkg/controllers/uploadedimports.go @ 5123:eeb45e3e0a5a queued-stage-done
Added mechanism to have sync import jobs on import queue.
Review jobs are now sync with a controller waiting for 20 secs before returning.
If all reviews return earlier the controller extists earlier, too.
If one or more decisions took longer they are run in background till they are
decided and the the controller returns a error message for these imports
that the process is st still running.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 26 Mar 2020 22:24:45 +0100 |
parents | f9bb06f2dbe3 |
children | 5f47eeea988d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "fmt" "log" "net/http" "os" "strconv" "time" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/imports" mw "gemma.intevation.de/gemma/pkg/middleware" "gemma.intevation.de/gemma/pkg/misc" ) const maxUploadSize = 25 * 1024 * 1024 // badUploadParameterError is a local type to find bad request parameters. type badUploadParameterError string func importWaterwayProfiles() http.HandlerFunc { return uploadedImport( imports.WPJobKind, "wp.csv", func(req *http.Request, dir string) (imports.Job, error) { url := req.FormValue("url") if url == "" { return nil, badUploadParameterError("missing 'url' parameter") } featureType := req.FormValue("feature-type") if featureType == "" { return nil, badUploadParameterError("missing 'feature-type' parameter") } sortBy := req.FormValue("sort-by") var precision *float64 if p := req.FormValue("precision"); p != "" { v, err := strconv.ParseFloat(p, 64) if err != nil { return nil, badUploadParameterError( fmt.Sprintf("Invalid 'precision' parameter: %v", err)) } precision = &v } user := req.FormValue("user") password := req.FormValue("password") return &imports.WaterwayProfiles{ Dir: dir, URL: url, FeatureType: featureType, SortBy: sortBy, Precision: precision, User: user, Password: password, }, nil }, ) } func importApprovedGaugeMeasurements() http.HandlerFunc { return uploadedImport( imports.AGMJobKind, "agm.csv", func(req *http.Request, dir string) (imports.Job, error) { originator := req.FormValue("originator") if originator == "" { return nil, badUploadParameterError("missing 'originator' parameter") } return &imports.ApprovedGaugeMeasurements{ Dir: dir, Originator: originator, }, nil }, ) } func importUploadedBottleneck() http.HandlerFunc { return uploadedImport( imports.UBNJobKind, "data.xml", func(req *http.Request, dir string) (imports.Job, error) { var tolerance float64 if t := req.FormValue("tolerance"); t != "" { v, err := strconv.ParseFloat(t, 64) if err != nil { return nil, badUploadParameterError( fmt.Sprintf("Invalid 'tolerance' parameter: %v", err)) } tolerance = v } return &imports.UploadedBottleneck{Dir: dir, Tolerance: tolerance}, nil }, ) } func importUploadedFairwayAvailability() http.HandlerFunc { return uploadedImport( imports.UFAJobKind, "data.xml", func(_ *http.Request, dir string) (imports.Job, error) { return &imports.UploadedFairwayAvailability{Dir: dir}, nil }, ) } func importUploadedGaugeMeasurement() http.HandlerFunc { return uploadedImport( imports.UGMJobKind, "data.xml", func(_ *http.Request, dir string) (imports.Job, error) { return &imports.UploadedGaugeMeasurement{Dir: dir}, nil }, ) } func importUploadedStretchShape() http.HandlerFunc { return uploadedImport( imports.STSHJobKind, "stretch.zip", func(_ *http.Request, dir string) (imports.Job, error) { return &imports.StretchShape{Dir: dir}, nil }, ) } func (bup badUploadParameterError) Error() string { return string(bup) } func uploadedImport( kind imports.JobKind, fname string, create func(*http.Request, string) (imports.Job, error), ) http.HandlerFunc { return func(rw http.ResponseWriter, req *http.Request) { dir, err := misc.StoreUploadedFile( req, string(kind), fname, maxUploadSize) if err != nil { log.Printf("error: %v\n", err) http.Error(rw, "error: "+err.Error(), http.StatusInternalServerError) return } job, err := create(req, dir) if err != nil { if err2 := os.RemoveAll(dir); err2 != nil { log.Printf("warn: %v\n", err2) } if err2, ok := err.(badUploadParameterError); ok { http.Error(rw, string(err2), http.StatusBadRequest) return } log.Printf("error: %v\n", err) http.Error(rw, "error: "+err.Error(), http.StatusInternalServerError) return } serialized, err := common.ToJSONString(job) if err != nil { log.Printf("error: %v\n", err) http.Error(rw, "error: "+err.Error(), http.StatusInternalServerError) return } session, _ := auth.GetSession(req) sendEmail := req.FormValue("send-email") != "" jobID, err := imports.AddJob( kind, time.Time{}, // due nil, // trys nil, // retry wait session.User, sendEmail, serialized) if err != nil { log.Printf("error: %v\n", err) http.Error(rw, "error: "+err.Error(), http.StatusInternalServerError) return } log.Printf("info: added import #%d to queue\n", jobID) result := struct { ID int64 `json:"id"` }{ ID: jobID, } mw.SendJSON(rw, http.StatusCreated, &result) } }