Mercurial > gemma
comparison pkg/imports/queue.go @ 5490:5f47eeea988d logging
Use own logging package.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 20 Sep 2021 17:45:39 +0200 |
parents | 4f0869b85038 |
children | 35966741e45e |
comparison
equal
deleted
inserted
replaced
5488:a726a92ea5c9 | 5490:5f47eeea988d |
---|---|
17 "context" | 17 "context" |
18 "database/sql" | 18 "database/sql" |
19 "encoding/json" | 19 "encoding/json" |
20 "errors" | 20 "errors" |
21 "fmt" | 21 "fmt" |
22 "log" | |
23 "runtime/debug" | 22 "runtime/debug" |
24 "sort" | 23 "sort" |
25 "strings" | 24 "strings" |
26 "sync" | 25 "sync" |
27 "time" | 26 "time" |
29 "github.com/jackc/pgx/pgtype" | 28 "github.com/jackc/pgx/pgtype" |
30 | 29 |
31 "gemma.intevation.de/gemma/pkg/auth" | 30 "gemma.intevation.de/gemma/pkg/auth" |
32 "gemma.intevation.de/gemma/pkg/common" | 31 "gemma.intevation.de/gemma/pkg/common" |
33 "gemma.intevation.de/gemma/pkg/config" | 32 "gemma.intevation.de/gemma/pkg/config" |
33 "gemma.intevation.de/gemma/pkg/log" | |
34 "gemma.intevation.de/gemma/pkg/pgxutils" | 34 "gemma.intevation.de/gemma/pkg/pgxutils" |
35 ) | 35 ) |
36 | 36 |
37 type ( | 37 type ( |
38 // Feedback is passed to the Do method of a Job to log | 38 // Feedback is passed to the Do method of a Job to log |
379 // LogImportKindNames logs a list of importer types registered | 379 // LogImportKindNames logs a list of importer types registered |
380 // to the global import queue. | 380 // to the global import queue. |
381 func LogImportKindNames() { | 381 func LogImportKindNames() { |
382 kinds := ImportKindNames() | 382 kinds := ImportKindNames() |
383 sort.Strings(kinds) | 383 sort.Strings(kinds) |
384 log.Printf("info: registered import kinds: %s", | 384 log.Infof("registered import kinds: %s", |
385 strings.Join(kinds, ", ")) | 385 strings.Join(kinds, ", ")) |
386 } | 386 } |
387 | 387 |
388 // HasImportKindName checks if the import queue supports a given kind. | 388 // HasImportKindName checks if the import queue supports a given kind. |
389 func HasImportKindName(kind string) bool { | 389 func HasImportKindName(kind string) bool { |
438 func (idj *idJob) nextDue() time.Time { | 438 func (idj *idJob) nextDue() time.Time { |
439 now := time.Now() | 439 now := time.Now() |
440 if idj.waitRetry.Status == pgtype.Present { | 440 if idj.waitRetry.Status == pgtype.Present { |
441 var d time.Duration | 441 var d time.Duration |
442 if err := idj.waitRetry.AssignTo(&d); err != nil { | 442 if err := idj.waitRetry.AssignTo(&d); err != nil { |
443 log.Printf("error: converting waitRetry failed: %v\n", err) | 443 log.Errorf("converting waitRetry failed: %v\n", err) |
444 } else { | 444 } else { |
445 now = now.Add(d) | 445 now = now.Add(d) |
446 } | 446 } |
447 } | 447 } |
448 return now | 448 return now |
460 if idj.waitRetry.Status != pgtype.Present { | 460 if idj.waitRetry.Status != pgtype.Present { |
461 return nil | 461 return nil |
462 } | 462 } |
463 d := new(time.Duration) | 463 d := new(time.Duration) |
464 if err := idj.waitRetry.AssignTo(d); err != nil { | 464 if err := idj.waitRetry.AssignTo(d); err != nil { |
465 log.Printf("error: converting waitRetry failed: %v\n", err) | 465 log.Errorf("converting waitRetry failed: %v\n", err) |
466 return nil | 466 return nil |
467 } | 467 } |
468 return d | 468 return d |
469 } | 469 } |
470 | 470 |
558 user, | 558 user, |
559 sendEmail, | 559 sendEmail, |
560 data).Scan(&id) | 560 data).Scan(&id) |
561 | 561 |
562 if err == nil && sync { | 562 if err == nil && sync { |
563 log.Printf("info: register wait for %d\n", id) | 563 log.Infof("register wait for %d\n", id) |
564 done = make(chan struct{}) | 564 done = make(chan struct{}) |
565 q.waiting[id] = done | 565 q.waiting[id] = done |
566 } | 566 } |
567 | 567 |
568 return err | 568 return err |
673 serialized, | 673 serialized, |
674 true) | 674 true) |
675 if err != nil { | 675 if err != nil { |
676 return nil, err | 676 return nil, err |
677 } | 677 } |
678 log.Printf("info: add review job %d\n", rID) | 678 log.Infof("add review job %d\n", rID) |
679 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id) | 679 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id) |
680 if err != nil && done != nil { | 680 if err != nil && done != nil { |
681 go func() { | 681 go func() { |
682 q.cmdCh <- func(q *importQueue) { | 682 q.cmdCh <- func(q *importQueue) { |
683 delete(q.waiting, rID) | 683 delete(q.waiting, rID) |
739 _, err := conn.ExecContext( | 739 _, err := conn.ExecContext( |
740 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) | 740 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) |
741 return err | 741 return err |
742 }) | 742 }) |
743 if err != nil { | 743 if err != nil { |
744 log.Printf("error: logging failed: %v\n", err) | 744 log.Errorf("logging failed: %v\n", err) |
745 } | 745 } |
746 } | 746 } |
747 | 747 |
748 func (lf logFeedback) Info(format string, args ...interface{}) { | 748 func (lf logFeedback) Info(format string, args ...interface{}) { |
749 lf.log("info", format, args...) | 749 lf.log("info", format, args...) |
866 for try := 1; ; try++ { | 866 for try := 1; ; try++ { |
867 var err error | 867 var err error |
868 if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries { | 868 if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries { |
869 return err | 869 return err |
870 } | 870 } |
871 log.Printf("warn: [try %d/%d] Storing state failed: %v (try again in %s).\n", | 871 log.Warnf("[try %d/%d] Storing state failed: %v (try again in %s).\n", |
872 try, maxTries, err, sleep) | 872 try, maxTries, err, sleep) |
873 | 873 |
874 time.Sleep(sleep) | 874 time.Sleep(sleep) |
875 if sleep < time.Minute { | 875 if sleep < time.Minute { |
876 if sleep *= 2; sleep > time.Minute { | 876 if sleep *= 2; sleep > time.Minute { |
933 func (q *importQueue) importLoop() { | 933 func (q *importQueue) importLoop() { |
934 config.WaitReady() | 934 config.WaitReady() |
935 // re-enqueue the jobs that are in state running. | 935 // re-enqueue the jobs that are in state running. |
936 // They where in progess when the server went down. | 936 // They where in progess when the server went down. |
937 if err := reEnqueueRunning(); err != nil { | 937 if err := reEnqueueRunning(); err != nil { |
938 log.Printf("error: re-enqueuing failed: %v", err) | 938 log.Errorf("re-enqueuing failed: %v", err) |
939 } | 939 } |
940 | 940 |
941 for { | 941 for { |
942 var idj *idJob | 942 var idj *idJob |
943 var err error | 943 var err error |
944 | 944 |
945 for { | 945 for { |
946 if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows { | 946 if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows { |
947 log.Printf("error: db: %v\n", err) | 947 log.Errorf("error: db: %v\n", err) |
948 } | 948 } |
949 if idj != nil { | 949 if idj != nil { |
950 break | 950 break |
951 } | 951 } |
952 select { | 952 select { |
955 | 955 |
956 case <-time.After(pollDuration): | 956 case <-time.After(pollDuration): |
957 } | 957 } |
958 } | 958 } |
959 | 959 |
960 log.Printf("info: starting import #%d\n", idj.id) | 960 log.Infof("starting import #%d\n", idj.id) |
961 | 961 |
962 jc := q.jobCreator(idj.kind) | 962 jc := q.jobCreator(idj.kind) |
963 if jc == nil { | 963 if jc == nil { |
964 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) | 964 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) |
965 continue | 965 continue |
976 // Unlock the dependencies. | 976 // Unlock the dependencies. |
977 q.unlockDependencies(jc) | 977 q.unlockDependencies(jc) |
978 // Unlock waiting. | 978 // Unlock waiting. |
979 q.cmdCh <- func(q *importQueue) { | 979 q.cmdCh <- func(q *importQueue) { |
980 if w := q.waiting[idj.id]; w != nil { | 980 if w := q.waiting[idj.id]; w != nil { |
981 log.Printf("info: unlock waiting %d\n", idj.id) | 981 log.Infof("unlock waiting %d\n", idj.id) |
982 if retry { | 982 if retry { |
983 w <- struct{}{} | 983 w <- struct{}{} |
984 } else { | 984 } else { |
985 close(w) | 985 close(w) |
986 } | 986 } |
1048 default: | 1048 default: |
1049 state = "pending" | 1049 state = "pending" |
1050 } | 1050 } |
1051 if !remove { | 1051 if !remove { |
1052 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { | 1052 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { |
1053 log.Printf("error: setting state of job %d failed: %v\n", idj.id, err) | 1053 log.Errorf("setting state of job %d failed: %v\n", idj.id, err) |
1054 } | 1054 } |
1055 log.Printf("info: import #%d finished: %s\n", idj.id, state) | 1055 log.Infof("info: import #%d finished: %s\n", idj.id, state) |
1056 } | 1056 } |
1057 if idj.sendEmail { | 1057 if idj.sendEmail { |
1058 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) | 1058 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) |
1059 } | 1059 } |
1060 | 1060 |
1066 idj.waitRetryPointer(), | 1066 idj.waitRetryPointer(), |
1067 idj.user, idj.sendEmail, | 1067 idj.user, idj.sendEmail, |
1068 idj.data, | 1068 idj.data, |
1069 false) | 1069 false) |
1070 if err != nil { | 1070 if err != nil { |
1071 log.Printf("error: retry enqueue failed: %v\n", err) | 1071 log.Errorf("retry enqueue failed: %v\n", err) |
1072 } else { | 1072 } else { |
1073 log.Printf("info: re-enqueued job with id %d\n", nid) | 1073 log.Infof("re-enqueued job with id %d\n", nid) |
1074 } | 1074 } |
1075 } | 1075 } |
1076 if remove { | 1076 if remove { |
1077 if err := deleteJob(ctx, idj.id); err != nil { | 1077 if err := deleteJob(ctx, idj.id); err != nil { |
1078 log.Printf("error: deleting job %d failed: %v\n", idj.id, err) | 1078 log.Errorf("deleting job %d failed: %v\n", idj.id, err) |
1079 } | 1079 } |
1080 } | 1080 } |
1081 }(jc, idj) | 1081 }(jc, idj) |
1082 } | 1082 } |
1083 } | 1083 } |