comparison pkg/imports/queue.go @ 5490:5f47eeea988d logging

Use own logging package.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 20 Sep 2021 17:45:39 +0200
parents 4f0869b85038
children 35966741e45e
comparison
equal deleted inserted replaced
5488:a726a92ea5c9 5490:5f47eeea988d
17 "context" 17 "context"
18 "database/sql" 18 "database/sql"
19 "encoding/json" 19 "encoding/json"
20 "errors" 20 "errors"
21 "fmt" 21 "fmt"
22 "log"
23 "runtime/debug" 22 "runtime/debug"
24 "sort" 23 "sort"
25 "strings" 24 "strings"
26 "sync" 25 "sync"
27 "time" 26 "time"
29 "github.com/jackc/pgx/pgtype" 28 "github.com/jackc/pgx/pgtype"
30 29
31 "gemma.intevation.de/gemma/pkg/auth" 30 "gemma.intevation.de/gemma/pkg/auth"
32 "gemma.intevation.de/gemma/pkg/common" 31 "gemma.intevation.de/gemma/pkg/common"
33 "gemma.intevation.de/gemma/pkg/config" 32 "gemma.intevation.de/gemma/pkg/config"
33 "gemma.intevation.de/gemma/pkg/log"
34 "gemma.intevation.de/gemma/pkg/pgxutils" 34 "gemma.intevation.de/gemma/pkg/pgxutils"
35 ) 35 )
36 36
37 type ( 37 type (
38 // Feedback is passed to the Do method of a Job to log 38 // Feedback is passed to the Do method of a Job to log
379 // LogImportKindNames logs a list of importer types registered 379 // LogImportKindNames logs a list of importer types registered
380 // to the global import queue. 380 // to the global import queue.
381 func LogImportKindNames() { 381 func LogImportKindNames() {
382 kinds := ImportKindNames() 382 kinds := ImportKindNames()
383 sort.Strings(kinds) 383 sort.Strings(kinds)
384 log.Printf("info: registered import kinds: %s", 384 log.Infof("registered import kinds: %s",
385 strings.Join(kinds, ", ")) 385 strings.Join(kinds, ", "))
386 } 386 }
387 387
388 // HasImportKindName checks if the import queue supports a given kind. 388 // HasImportKindName checks if the import queue supports a given kind.
389 func HasImportKindName(kind string) bool { 389 func HasImportKindName(kind string) bool {
438 func (idj *idJob) nextDue() time.Time { 438 func (idj *idJob) nextDue() time.Time {
439 now := time.Now() 439 now := time.Now()
440 if idj.waitRetry.Status == pgtype.Present { 440 if idj.waitRetry.Status == pgtype.Present {
441 var d time.Duration 441 var d time.Duration
442 if err := idj.waitRetry.AssignTo(&d); err != nil { 442 if err := idj.waitRetry.AssignTo(&d); err != nil {
443 log.Printf("error: converting waitRetry failed: %v\n", err) 443 log.Errorf("converting waitRetry failed: %v\n", err)
444 } else { 444 } else {
445 now = now.Add(d) 445 now = now.Add(d)
446 } 446 }
447 } 447 }
448 return now 448 return now
460 if idj.waitRetry.Status != pgtype.Present { 460 if idj.waitRetry.Status != pgtype.Present {
461 return nil 461 return nil
462 } 462 }
463 d := new(time.Duration) 463 d := new(time.Duration)
464 if err := idj.waitRetry.AssignTo(d); err != nil { 464 if err := idj.waitRetry.AssignTo(d); err != nil {
465 log.Printf("error: converting waitRetry failed: %v\n", err) 465 log.Errorf("converting waitRetry failed: %v\n", err)
466 return nil 466 return nil
467 } 467 }
468 return d 468 return d
469 } 469 }
470 470
558 user, 558 user,
559 sendEmail, 559 sendEmail,
560 data).Scan(&id) 560 data).Scan(&id)
561 561
562 if err == nil && sync { 562 if err == nil && sync {
563 log.Printf("info: register wait for %d\n", id) 563 log.Infof("register wait for %d\n", id)
564 done = make(chan struct{}) 564 done = make(chan struct{})
565 q.waiting[id] = done 565 q.waiting[id] = done
566 } 566 }
567 567
568 return err 568 return err
673 serialized, 673 serialized,
674 true) 674 true)
675 if err != nil { 675 if err != nil {
676 return nil, err 676 return nil, err
677 } 677 }
678 log.Printf("info: add review job %d\n", rID) 678 log.Infof("add review job %d\n", rID)
679 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id) 679 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id)
680 if err != nil && done != nil { 680 if err != nil && done != nil {
681 go func() { 681 go func() {
682 q.cmdCh <- func(q *importQueue) { 682 q.cmdCh <- func(q *importQueue) {
683 delete(q.waiting, rID) 683 delete(q.waiting, rID)
739 _, err := conn.ExecContext( 739 _, err := conn.ExecContext(
740 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) 740 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
741 return err 741 return err
742 }) 742 })
743 if err != nil { 743 if err != nil {
744 log.Printf("error: logging failed: %v\n", err) 744 log.Errorf("logging failed: %v\n", err)
745 } 745 }
746 } 746 }
747 747
748 func (lf logFeedback) Info(format string, args ...interface{}) { 748 func (lf logFeedback) Info(format string, args ...interface{}) {
749 lf.log("info", format, args...) 749 lf.log("info", format, args...)
866 for try := 1; ; try++ { 866 for try := 1; ; try++ {
867 var err error 867 var err error
868 if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries { 868 if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries {
869 return err 869 return err
870 } 870 }
871 log.Printf("warn: [try %d/%d] Storing state failed: %v (try again in %s).\n", 871 log.Warnf("[try %d/%d] Storing state failed: %v (try again in %s).\n",
872 try, maxTries, err, sleep) 872 try, maxTries, err, sleep)
873 873
874 time.Sleep(sleep) 874 time.Sleep(sleep)
875 if sleep < time.Minute { 875 if sleep < time.Minute {
876 if sleep *= 2; sleep > time.Minute { 876 if sleep *= 2; sleep > time.Minute {
933 func (q *importQueue) importLoop() { 933 func (q *importQueue) importLoop() {
934 config.WaitReady() 934 config.WaitReady()
935 // re-enqueue the jobs that are in state running. 935 // re-enqueue the jobs that are in state running.
936 // They where in progess when the server went down. 936 // They where in progess when the server went down.
937 if err := reEnqueueRunning(); err != nil { 937 if err := reEnqueueRunning(); err != nil {
938 log.Printf("error: re-enqueuing failed: %v", err) 938 log.Errorf("re-enqueuing failed: %v", err)
939 } 939 }
940 940
941 for { 941 for {
942 var idj *idJob 942 var idj *idJob
943 var err error 943 var err error
944 944
945 for { 945 for {
946 if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows { 946 if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows {
947 log.Printf("error: db: %v\n", err) 947 log.Errorf("error: db: %v\n", err)
948 } 948 }
949 if idj != nil { 949 if idj != nil {
950 break 950 break
951 } 951 }
952 select { 952 select {
955 955
956 case <-time.After(pollDuration): 956 case <-time.After(pollDuration):
957 } 957 }
958 } 958 }
959 959
960 log.Printf("info: starting import #%d\n", idj.id) 960 log.Infof("starting import #%d\n", idj.id)
961 961
962 jc := q.jobCreator(idj.kind) 962 jc := q.jobCreator(idj.kind)
963 if jc == nil { 963 if jc == nil {
964 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) 964 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
965 continue 965 continue
976 // Unlock the dependencies. 976 // Unlock the dependencies.
977 q.unlockDependencies(jc) 977 q.unlockDependencies(jc)
978 // Unlock waiting. 978 // Unlock waiting.
979 q.cmdCh <- func(q *importQueue) { 979 q.cmdCh <- func(q *importQueue) {
980 if w := q.waiting[idj.id]; w != nil { 980 if w := q.waiting[idj.id]; w != nil {
981 log.Printf("info: unlock waiting %d\n", idj.id) 981 log.Infof("unlock waiting %d\n", idj.id)
982 if retry { 982 if retry {
983 w <- struct{}{} 983 w <- struct{}{}
984 } else { 984 } else {
985 close(w) 985 close(w)
986 } 986 }
1048 default: 1048 default:
1049 state = "pending" 1049 state = "pending"
1050 } 1050 }
1051 if !remove { 1051 if !remove {
1052 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { 1052 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
1053 log.Printf("error: setting state of job %d failed: %v\n", idj.id, err) 1053 log.Errorf("setting state of job %d failed: %v\n", idj.id, err)
1054 } 1054 }
1055 log.Printf("info: import #%d finished: %s\n", idj.id, state) 1055 log.Infof("info: import #%d finished: %s\n", idj.id, state)
1056 } 1056 }
1057 if idj.sendEmail { 1057 if idj.sendEmail {
1058 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) 1058 go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
1059 } 1059 }
1060 1060
1066 idj.waitRetryPointer(), 1066 idj.waitRetryPointer(),
1067 idj.user, idj.sendEmail, 1067 idj.user, idj.sendEmail,
1068 idj.data, 1068 idj.data,
1069 false) 1069 false)
1070 if err != nil { 1070 if err != nil {
1071 log.Printf("error: retry enqueue failed: %v\n", err) 1071 log.Errorf("retry enqueue failed: %v\n", err)
1072 } else { 1072 } else {
1073 log.Printf("info: re-enqueued job with id %d\n", nid) 1073 log.Infof("re-enqueued job with id %d\n", nid)
1074 } 1074 }
1075 } 1075 }
1076 if remove { 1076 if remove {
1077 if err := deleteJob(ctx, idj.id); err != nil { 1077 if err := deleteJob(ctx, idj.id); err != nil {
1078 log.Printf("error: deleting job %d failed: %v\n", idj.id, err) 1078 log.Errorf("deleting job %d failed: %v\n", idj.id, err)
1079 } 1079 }
1080 } 1080 }
1081 }(jc, idj) 1081 }(jc, idj)
1082 } 1082 }
1083 } 1083 }