# HG changeset patch # User Sascha Wilde # Date 1648824473 -7200 # Node ID 7ed9e32706d06f3be4e6a0278074b0d6087cb56a # Parent c1bd5f8eaf9abe760932baf5954f45ed4ceb5c24# Parent 41d7bbc9eac1481363ccdbc798682e4021a923f5 Merged delault diff -r c1bd5f8eaf9a -r 7ed9e32706d0 .hgtags --- a/.hgtags Mon Feb 14 12:06:48 2022 +0100 +++ b/.hgtags Fri Apr 01 16:47:53 2022 +0200 @@ -41,3 +41,5 @@ 79155213c4da9ad4d381e1b39d768d9137a5f827 v5.5 79155213c4da9ad4d381e1b39d768d9137a5f827 v5.5 d4fb695f8437f85d107dbcc10406bb9485529abb v5.5 +f1fb8c4f65878c2e11aeb5a62c7d5630e4f52d8e v5.5.1 +0bc838acff6ca3e76350d26389edccbe455aedf1 v5.5.2 diff -r c1bd5f8eaf9a -r 7ed9e32706d0 client/package.json --- a/client/package.json Mon Feb 14 12:06:48 2022 +0100 +++ b/client/package.json Fri Apr 01 16:47:53 2022 +0200 @@ -1,6 +1,6 @@ { "name": "gemmajs", - "version": "5.5.1-dev", + "version": "5.5.3-dev", "license": "AGPL-3.0-or-later", "repository": { "type": "hg", diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/controllers/importqueue.go --- a/pkg/controllers/importqueue.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/controllers/importqueue.go Fri Apr 01 16:47:53 2022 +0200 @@ -26,6 +26,7 @@ "time" "github.com/gorilla/mux" + "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" @@ -59,20 +60,24 @@ FROM import.imports WHERE ` + // XXX: Consider not only supporting 'sr' for log message parsing. selectExportSQL = ` SELECT imports.id AS id, state::varchar, enqueued, changed, - kind, + imports.kind, username, (SELECT country FROM users.list_users lu WHERE lu.username = import.imports.username) AS country, signer, EXISTS(SELECT 1 FROM import.import_logs - WHERE kind = 'warn'::log_type and import_id = imports.id) AS has_warnings, - data + WHERE kind = 'warn'::log_type and import_id = id) AS has_warnings, + data, + CASE WHEN kind = ANY($1) THEN ARRAY(SELECT msg FROM import.import_logs WHERE import_id = id) + ELSE NULL + END AS msgs FROM import.imports WHERE ` @@ -108,7 +113,12 @@ args []interface{} } -func buildFilters(projection string, req *http.Request) (*filledStmt, *filledStmt, *filledStmt, error) { +func buildFilters(projection string, req *http.Request, args ...interface{}) ( + *filledStmt, + *filledStmt, + *filledStmt, + error, +) { var l, a, b filterAnd @@ -122,11 +132,11 @@ } // Always filter review jobs. They are only for internal use. - cond(` NOT kind LIKE '%%` + imports.ReviewJobSuffix + `'`) + cond(` NOT imports.kind LIKE '%%` + imports.ReviewJobSuffix + `'`) if query := req.FormValue("query"); query != "" { query = "%" + query + "%" - cond(` (kind ILIKE $%d OR username ILIKE $%d OR signer ILIKE $%d OR `+ + cond(` (imports.kind ILIKE $%d OR username ILIKE $%d OR signer ILIKE $%d OR `+ `id IN (SELECT import_id FROM import.import_logs WHERE msg ILIKE $%d)) `, query, query, query, query) } @@ -146,7 +156,7 @@ if ks := req.FormValue("kinds"); ks != "" { kinds := toTextArray(ks, imports.ImportKindNames()) - cond(" kind = ANY($%d) ", kinds) + cond(" imports.kind = ANY($%d) ", kinds) } if idss := req.FormValue("ids"); idss != "" { @@ -192,6 +202,10 @@ fa := &filledStmt{} fb := &filledStmt{} + fl.args = append(fl.args, args...) + fa.args = append(fa.args, args...) + fb.args = append(fb.args, args...) + fa.stmt.WriteString(selectEnqueuedSQL) fb.stmt.WriteString(selectEnqueuedSQL) @@ -259,7 +273,26 @@ func exportImports(rw http.ResponseWriter, req *http.Request) { - list, _, _, err := buildFilters(selectExportSQL, req) + type LogLoader interface{ LoadingLogs() bool } + + var lls []string + + imports.All(func(k imports.JobKind, jc imports.JobCreator) { + if ll, ok := jc.(LogLoader); ok && ll.LoadingLogs() { + lls = append(lls, string(k)) + } + }) + + var loaders pgtype.TextArray + if err := loaders.Set(lls); err != nil { + http.Error( + rw, fmt.Sprintf("error: %v", err), + http.StatusInternalServerError) + return + } + + list, _, _, err := buildFilters(selectExportSQL, req, &loaders) + if err != nil { http.Error(rw, "error: "+err.Error(), http.StatusBadRequest) return @@ -287,6 +320,8 @@ return } + start := time.Now() + conn := mw.GetDBConn(req) ctx := req.Context() var rows *sql.Rows @@ -296,74 +331,104 @@ } defer rows.Close() - stringString := func(s sql.NullString) string { - if s.Valid { - return s.String - } - return "" + // Extract some meta infos from the import. + type Description interface { + Description([]string) (string, error) } - // Extract some meta infos from the import. - type Description interface { - Description() (string, error) + type dataset struct { + id int64 + state string + enqueued time.Time + changed time.Time + kind string + user string + country string + signer sql.NullString + warnings bool + data string + msgs pgtype.TextArray } - for rows.Next() { - var ( - id int64 - state string - enqueued time.Time - changed time.Time - kind string - user string - country string - signer sql.NullString - warnings bool - data string - description string - ) - if err = rows.Scan( - &id, - &state, - &enqueued, - &changed, - &kind, - &user, - &country, - &signer, - &warnings, - &data, - ); err != nil { - return - } + // Log unsupported description interfaces per kind only once. + unsupported := make(map[string]bool) + + store := func(ds *dataset) error { + + var description string // Do some introspection on the job to be more verbose. - if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil { + if jc := imports.FindJobCreator(imports.JobKind(ds.kind)); jc != nil { job := jc.Create() - if err := common.FromJSONString(data, job); err != nil { + if err := common.FromJSONString(ds.data, job); err != nil { log.Errorf("%v\n", err) } else if desc, ok := job.(Description); ok { - if description, err = desc.Description(); err != nil { - log.Errorf("%v\n", err) + var ms []string + if ds.msgs.Status == pgtype.Present { + if err := ds.msgs.AssignTo(&ms); err != nil { + return err + } + } + if description, err = desc.Description(ms); err != nil { + return err + } + description = strings.Replace(description, ",", "|", -1) + } else { + if !unsupported[ds.kind] { + unsupported[ds.kind] = true + log.Debugf("%s: description not supported\n", ds.kind) } } } - record[0] = strconv.FormatInt(id, 10) - record[1] = kind - record[2] = enqueued.UTC().Format(common.TimeFormat) - record[3] = changed.UTC().Format(common.TimeFormat) - record[4] = user - record[5] = country - record[6] = stringString(signer) - record[7] = state - record[8] = strconv.FormatBool(warnings) - record[9] = strings.Replace(description, ",", "|", -1) + var signer string + if ds.signer.Valid { + signer = ds.signer.String + } + + record[0] = strconv.FormatInt(ds.id, 10) + record[1] = ds.kind + record[2] = ds.enqueued.UTC().Format(common.TimeFormat) + record[3] = ds.changed.UTC().Format(common.TimeFormat) + record[4] = ds.user + record[5] = ds.country + record[6] = signer + record[7] = ds.state + record[8] = strconv.FormatBool(ds.warnings) + record[9] = description - if err := out.Write(record); err != nil { + return out.Write(record) + } + + for rows.Next() { + var curr dataset + + if err := rows.Scan( + &curr.id, + &curr.state, + &curr.enqueued, + &curr.changed, + &curr.kind, + &curr.user, + &curr.country, + &curr.signer, + &curr.warnings, + &curr.data, + &curr.msgs, + ); err != nil { log.Errorf("%v\n", err) return } + + if err := store(&curr); err != nil { + log.Errorf("%v\n", err) + return + } + } + + if err := rows.Err(); err != nil { + log.Errorf("%v\n", err) + return } out.Flush() @@ -371,10 +436,7 @@ log.Errorf("%v\n", err) } - if err = rows.Err(); err != nil { - log.Errorf("%v\n", err) - return - } + log.Debugf("Export took: %v\n", time.Since(start)) } func listImports(req *http.Request) (jr mw.JSONResult, err error) { diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/bn.go --- a/pkg/imports/bn.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/bn.go Fri Apr 01 16:47:53 2022 +0200 @@ -160,7 +160,7 @@ ) // Description gives a short info about relevant facts of this import. -func (bn *Bottleneck) Description() (string, error) { +func (bn *Bottleneck) Description([]string) (string, error) { return bn.URL, nil } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/dma.go --- a/pkg/imports/dma.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/dma.go Fri Apr 01 16:47:53 2022 +0200 @@ -42,7 +42,7 @@ } // Description gives a short info about relevant facts of this import. -func (dma *DistanceMarksAshore) Description() (string, error) { +func (dma *DistanceMarksAshore) Description([]string) (string, error) { return dma.URL + "|" + dma.FeatureType, nil } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/dmv.go --- a/pkg/imports/dmv.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/dmv.go Fri Apr 01 16:47:53 2022 +0200 @@ -37,7 +37,7 @@ } // Description gives a short info about relevant facts of this import. -func (dmv *DistanceMarksVirtual) Description() (string, error) { +func (dmv *DistanceMarksVirtual) Description([]string) (string, error) { return dmv.URL, nil } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/dsr.go --- a/pkg/imports/dsr.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/dsr.go Fri Apr 01 16:47:53 2022 +0200 @@ -30,7 +30,7 @@ } // Description gives a short info about relevant facts of this import. -func (dsr *DeleteSoundingResult) Description() (string, error) { +func (dsr *DeleteSoundingResult) Description([]string) (string, error) { return dsr.BottleneckID + "|" + dsr.Date.Format(common.DateFormat), nil } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/fa.go --- a/pkg/imports/fa.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/fa.go Fri Apr 01 16:47:53 2022 +0200 @@ -160,7 +160,7 @@ ) // Description gives a short info about relevant facts of this import. -func (fa *FairwayAvailability) Description() (string, error) { +func (fa *FairwayAvailability) Description([]string) (string, error) { return fa.URL, nil } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/fd.go --- a/pkg/imports/fd.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/fd.go Fri Apr 01 16:47:53 2022 +0200 @@ -48,7 +48,7 @@ } // Description gives a short info about relevant facts of this import. -func (fd *FairwayDimension) Description() (string, error) { +func (fd *FairwayDimension) Description([]string) (string, error) { return strings.Join([]string{ fd.URL, fd.FeatureType, @@ -486,7 +486,7 @@ oldID, ).Scan(&fdid, &lat, &lon) }); err != nil { - feedback.Error(pgxutils.ReadableError{Err: err}.Error() + + feedback.Error(pgxutils.ReadableError{Err: err}.Error()+ "- while tracking invalidation of: %d", oldID) continue } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/gm.go --- a/pkg/imports/gm.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/gm.go Fri Apr 01 16:47:53 2022 +0200 @@ -43,7 +43,7 @@ } // Description gives a short info about relevant facts of this import. -func (gm *GaugeMeasurement) Description() (string, error) { +func (gm *GaugeMeasurement) Description([]string) (string, error) { return gm.URL, nil } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/queue.go --- a/pkg/imports/queue.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/queue.go Fri Apr 01 16:47:53 2022 +0200 @@ -731,6 +731,16 @@ return iqueue.decideImport(ctx, id, accepted, reviewer) } +func (q *importQueue) All(fn func(JobKind, JobCreator)) { + q.creatorsMu.Lock() + defer q.creatorsMu.Unlock() + for k, v := range q.creators { + fn(k, v) + } +} + +func All(fn func(JobKind, JobCreator)) { iqueue.All(fn) } + type logFeedback int64 func (lf logFeedback) log(kind, format string, args ...interface{}) { diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/report.go --- a/pkg/imports/report.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/report.go Fri Apr 01 16:47:53 2022 +0200 @@ -87,7 +87,7 @@ // RequiresRoles enforces to be a sys_admin to run this . func (*Report) RequiresRoles() auth.Roles { return auth.Roles{"sys_admin"} } -func (r *Report) Description() (string, error) { return string(r.Name), nil } +func (r *Report) Description([]string) (string, error) { return string(r.Name), nil } func (*Report) CleanUp() error { return nil } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/sec.go --- a/pkg/imports/sec.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/sec.go Fri Apr 01 16:47:53 2022 +0200 @@ -35,7 +35,7 @@ } // Description gives a short info about relevant facts of this import. -func (sec *Section) Description() (string, error) { +func (sec *Section) Description([]string) (string, error) { return strings.Join([]string{ sec.Name, sec.ObjNam, diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/sr.go --- a/pkg/imports/sr.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/sr.go Fri Apr 01 16:47:53 2022 +0200 @@ -28,13 +28,16 @@ "os" "path" "path/filepath" + "regexp" "strconv" "strings" + "sync" "time" shp "github.com/jonas-p/go-shp" "gemma.intevation.de/gemma/pkg/common" + "gemma.intevation.de/gemma/pkg/log" "gemma.intevation.de/gemma/pkg/mesh" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/wkb" @@ -104,6 +107,9 @@ func (srJobCreator) Create() Job { return new(SoundingResult) } +// LoadingLogs ensures that log lines are loaded when import is exported. +func (srJobCreator) LoadingLogs() bool { return true } + func (srJobCreator) Depends() [2][]string { return [2][]string{ {"sounding_results", "sounding_results_iso_areas", @@ -239,23 +245,81 @@ ` ) +var ( + bottleneckRe, + surveyTypeRe, + dateRe, + negateZRe *regexp.Regexp + recoverRegsOnce sync.Once +) + +func compileRecoverRegs() { + bottleneckRe = regexp.MustCompile(`Bottleneck:\s*(.+)\s*$`) + surveyTypeRe = regexp.MustCompile(`Processing as\s+([^\s]+)\s+beam scan.`) + dateRe = regexp.MustCompile(`Survey date:\s*(\d{4}-\d{2}-\d{2})`) + negateZRe = regexp.MustCompile(`Z values will be negated\.`) +} + // Description gives a short info about relevant facts of this import. -func (sr *SoundingResult) Description() (string, error) { +func (sr *SoundingResult) Description(msgs []string) (string, error) { + + recoverRegsOnce.Do(compileRecoverRegs) + + //log.Debugln(strings.Join(msgs, "\n") + "\n\n") + + var ( + bottleneck, st, date string + negZ bool + ) + + for _, msg := range msgs { + if m := bottleneckRe.FindStringSubmatch(msg); m != nil { + bottleneck = m[1] + continue + } + if m := surveyTypeRe.FindStringSubmatch(msg); m != nil { + st = m[1] + continue + } + if m := dateRe.FindStringSubmatch(msg); m != nil { + date = m[1] + continue + } + if negateZRe.MatchString(msg) { + negZ = true + } + } var descs []string if sr.Bottleneck != nil { descs = append(descs, *sr.Bottleneck) + } else if bottleneck != "" { + log.Debugf("bottleneck recovered: %s\n", bottleneck) + descs = append(descs, bottleneck) } + if sr.Date != nil { descs = append(descs, (*sr).Date.Format(common.DateFormat)) + } else if date != "" { + log.Debugf("date recovered: %s\n", date) + descs = append(descs, date) } + if sr.NegateZ != nil && *sr.NegateZ { descs = append(descs, "negateZ") + } else if negZ { + log.Debugln("negateZ recovered") + descs = append(descs, "negateZ") } + if sr.SurveyType != nil { descs = append(descs, string(*sr.SurveyType)) + } else if st != "" { + log.Debugf("survey type recovered: %s\n", st) + descs = append(descs, st) } + return strings.Join(descs, "|"), nil } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/statsupdate.go --- a/pkg/imports/statsupdate.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/statsupdate.go Fri Apr 01 16:47:53 2022 +0200 @@ -51,7 +51,7 @@ // RequiresRoles enforces to be a sys_admin to run this . func (*StatsUpdate) RequiresRoles() auth.Roles { return auth.Roles{"sys_admin"} } -func (su *StatsUpdate) Description() (string, error) { return su.Name, nil } +func (su *StatsUpdate) Description([]string) (string, error) { return su.Name, nil } func (*StatsUpdate) CleanUp() error { return nil } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/wa.go --- a/pkg/imports/wa.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/wa.go Fri Apr 01 16:47:53 2022 +0200 @@ -46,7 +46,7 @@ } // Description gives a short info about relevant facts of this import. -func (wa *WaterwayArea) Description() (string, error) { +func (wa *WaterwayArea) Description([]string) (string, error) { return wa.URL + "|" + wa.FeatureType, nil } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/wfsjob.go --- a/pkg/imports/wfsjob.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/wfsjob.go Fri Apr 01 16:47:53 2022 +0200 @@ -103,7 +103,7 @@ } // Description gives a short info about relevant facts of this import. -func (wfj *WFSFeatureJob) Description() (string, error) { +func (wfj *WFSFeatureJob) Description([]string) (string, error) { return wfj.URL + "|" + wfj.FeatureType, nil } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/wg.go --- a/pkg/imports/wg.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/wg.go Fri Apr 01 16:47:53 2022 +0200 @@ -41,7 +41,7 @@ } // Description gives a short info about relevant facts of this import. -func (wg *WaterwayGauge) Description() (string, error) { +func (wg *WaterwayGauge) Description([]string) (string, error) { return wg.URL, nil } diff -r c1bd5f8eaf9a -r 7ed9e32706d0 pkg/imports/wp.go --- a/pkg/imports/wp.go Mon Feb 14 12:06:48 2022 +0100 +++ b/pkg/imports/wp.go Fri Apr 01 16:47:53 2022 +0200 @@ -60,7 +60,7 @@ } // Description gives a short info about relevant facts of this import. -func (wp *WaterwayProfiles) Description() (string, error) { +func (wp *WaterwayProfiles) Description([]string) (string, error) { return wp.URL + "|" + wp.FeatureType, nil }