annotate pkg/imports/queue.go @ 5124:6910c1cad1fb queued-stage-done

Moved misplaced log line around.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 27 Mar 2020 00:40:52 +0100
parents eeb45e3e0a5a
children adf7b9f1273b
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1017
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
1 // This is Free Software under GNU Affero General Public License v >= 3.0
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
2 // without warranty, see README.md and license for details.
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
3 //
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
4 // SPDX-License-Identifier: AGPL-3.0-or-later
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
5 // License-Filename: LICENSES/AGPL-3.0.txt
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
6 //
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
7 // Copyright (C) 2018 by via donau
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
8 // – Österreichische Wasserstraßen-Gesellschaft mbH
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
9 // Software engineering by Intevation GmbH
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
10 //
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
11 // Author(s):
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de>
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
13
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
14 package imports
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
15
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
16 import (
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
17 "context"
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
18 "database/sql"
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
19 "encoding/json"
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
20 "fmt"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
21 "log"
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
22 "runtime/debug"
3246
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
23 "sort"
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
24 "strings"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
25 "sync"
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
26 "time"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
27
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
28 "github.com/jackc/pgx/pgtype"
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
29
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
30 "gemma.intevation.de/gemma/pkg/auth"
2187
7c83b5277c1c Import queue: Removed boilerplate code to deserialize jobs from JSON by making it part of the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2168
diff changeset
31 "gemma.intevation.de/gemma/pkg/common"
1000
14425e35e3c2 Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 998
diff changeset
32 "gemma.intevation.de/gemma/pkg/config"
4180
91cb4a7b1b13 Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents: 4084
diff changeset
33 "gemma.intevation.de/gemma/pkg/pgxutils"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
34 )
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
35
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
36 type (
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
37 // Feedback is passed to the Do method of a Job to log
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
38 // informations, warnings or errors.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
39 Feedback interface {
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
40 // Info logs informations.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
41 Info(fmt string, args ...interface{})
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
42 // Warn logs warnings.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
43 Warn(fmt string, args ...interface{})
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
44 // Error logs errors.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
45 Error(fmt string, args ...interface{})
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
46 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
47
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
48 // UnchangedError may be issued by Do of a Job to indicate
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
49 // That the database has not changed.
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
50 UnchangedError string
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
51
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
52 // Job is the central abstraction of an import job
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
53 // run by the import queue.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
54 Job interface {
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
55 // Do is called to do the actual import.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
56 // Bind transactions to ctx and conn, please-
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
57 // id is the number of the import job.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
58 // feedback can be used to log the import process.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
59 // If no error is return the import is assumed to
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
60 // be successfull. The non-error return value is
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
61 // serialized as a JSON string into the database as
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
62 // a summary to the import to be used by the review process.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
63 Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error)
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
64 // CleanUp is called to clean up ressources hold by the import.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
65 // It is called whether the import succeeded or not.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
66 CleanUp() error
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
67 }
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
68
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
69 FeedbackJob interface {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
70 Job
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
71 CreateFeedback(int64) Feedback
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
72 }
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
73
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
74 // JobKind is the type of an import.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
75 // Choose a unique name for every import.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
76 JobKind string
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
77
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
78 // JobCreator is used to bring a job to life as it is stored
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
79 // in pure meta-data form to the database.
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
80 JobCreator interface {
5104
cb736582e8fc Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5103
diff changeset
81 // Description is the long name of the import.
cb736582e8fc Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5103
diff changeset
82 Description() string
cb736582e8fc Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5103
diff changeset
83 // Create build the actual job.
cb736582e8fc Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5103
diff changeset
84 Create() Job
3219
4acbee65275d Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2872
diff changeset
85 // Depends returns two lists of ressources locked by this type of import.
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
86 // Imports are run concurrently if they have disjoint sets
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
87 // of dependencies.
3219
4acbee65275d Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2872
diff changeset
88 // The first list are locked exclusively.
4acbee65275d Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2872
diff changeset
89 // The second allows multiple read users but only one writing one.
4acbee65275d Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2872
diff changeset
90 Depends() [2][]string
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
91 // StageDone is called if an import is positively reviewed
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
92 // (state = accepted). This can be used to finalize the imported
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
93 // data to move it e.g from the staging area.
5034
59a99655f34d Added feedback support for StageDone.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5033
diff changeset
94 StageDone(context.Context, *sql.Tx, int64, Feedback) error
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
95 // AutoAccept indicates that imports of this kind
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
96 // don't need a review.
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
97 AutoAccept() bool
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
98 }
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
99
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
100 JobRemover interface {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
101 JobCreator
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
102 RemoveJob() bool
5110
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
103 }
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
104
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
105 idJob struct {
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
106 id int64
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
107 kind JobKind
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
108 user string
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
109 waitRetry pgtype.Interval
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
110 triesLeft sql.NullInt64
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
111 sendEmail bool
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
112 data string
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
113 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
114 )
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
115
3225
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
116 const (
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
117 pollDuration = time.Second * 10
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
118 runExclusive = -66666
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
119 )
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
120
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
121 const (
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
122 ReviewJobSuffix = "#review"
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
123 reviewJobRetries = 10
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
124 reviewJobWait = time.Minute
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
125 )
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
126
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
127 type importQueue struct {
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
128 cmdCh chan func(*importQueue)
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
129
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
130 creatorsMu sync.Mutex
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
131 creators map[JobKind]JobCreator
3221
899914a18d7e Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3219
diff changeset
132 usedDeps map[string]int
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
133
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
134 waiting map[int64]chan struct{}
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
135 }
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
136
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
137 var iqueue = importQueue{
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
138 cmdCh: make(chan func(*importQueue)),
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
139
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
140 creators: map[JobKind]JobCreator{},
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
141 usedDeps: map[string]int{},
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
142 waiting: make(map[int64]chan struct{}),
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
143 }
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
144
1189
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
145 var (
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
146 // ImportStateNames is a list of the states a job can be in.
1189
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
147 ImportStateNames = []string{
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
148 "queued",
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
149 "running",
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
150 "failed",
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
151 "unchanged",
1189
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
152 "pending",
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
153 "accepted",
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
154 "declined",
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
155 "reviewed",
1189
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
156 }
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
157 )
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
158
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
159 const (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
160 queueUser = "sys_admin"
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
161
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
162 reEnqueueRunningSQL = `
4748
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
163 UPDATE import.imports SET
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
164 state = 'queued'::import_state,
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
165 changed = CURRENT_TIMESTAMP
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
166 WHERE state = 'running'::import_state`
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
167
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
168 insertJobSQL = `
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
169 INSERT INTO import.imports (
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
170 kind,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
171 due,
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
172 trys_left,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
173 retry_wait,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
174 username,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
175 send_email,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
176 data
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
177 ) VALUES (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
178 $1,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
179 COALESCE($2, CURRENT_TIMESTAMP),
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
180 $3,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
181 $4,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
182 $5,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
183 $6,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
184 $7
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
185 ) RETURNING id`
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
186
5122
0b6b62d247e8 Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents: 5121
diff changeset
187 // Select oldest queued job but prioritize review jobs
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
188 selectJobSQL = `
5122
0b6b62d247e8 Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents: 5121
diff changeset
189 SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `')
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
190 id,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
191 kind,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
192 trys_left,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
193 retry_wait,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
194 username,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
195 send_email,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
196 data
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
197 FROM import.imports
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
198 WHERE
1718
8ddbedf296d7 Re-scheduled imports: be a bit more tolerant about start time.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1708
diff changeset
199 due <= CURRENT_TIMESTAMP + interval '5 seconds' AND
5122
0b6b62d247e8 Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents: 5121
diff changeset
200 state = 'queued'::import_state AND
0b6b62d247e8 Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents: 5121
diff changeset
201 kind = ANY($1)
0b6b62d247e8 Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents: 5121
diff changeset
202 ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
203 LIMIT 1`
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
204
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
205 updateStateSQL = `
4748
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
206 UPDATE import.imports SET
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
207 state = $1::import_state,
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
208 changed = CURRENT_TIMESTAMP
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
209 WHERE id = $2`
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
210
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
211 updateStateSummarySQL = `
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
212 UPDATE import.imports SET
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
213 state = $1::import_state,
4748
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
214 changed = CURRENT_TIMESTAMP,
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
215 summary = $2
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
216 WHERE id = $3`
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
217
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
218 deleteJobSQL = `
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
219 DELETE FROM import.imports WHERE id = $1`
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
220
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
221 logMessageSQL = `
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
222 INSERT INTO import.import_logs (
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
223 import_id,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
224 kind,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
225 msg
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
226 ) VALUES (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
227 $1,
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
228 $2::log_type,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
229 $3
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
230 )`
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
231 )
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
232
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
233 func init() {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
234 go iqueue.importLoop()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
235 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
236
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
237 // Error makes UnchangedError an error.
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
238 func (ue UnchangedError) Error() string {
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
239 return string(ue)
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
240 }
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
241
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
242 type reviewedJobCreator struct {
5101
1b0b13e70bc1 Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5100
diff changeset
243 jobCreator JobCreator
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
244 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
245
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
246 func (*reviewedJobCreator) AutoAccept() bool {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
247 return true
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
248 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
249
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
250 func (*reviewedJobCreator) RemoveJob() bool {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
251 return true
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
252 }
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
253
5101
1b0b13e70bc1 Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5100
diff changeset
254 func (rjc *reviewedJobCreator) Depends() [2][]string {
5102
8cc5b08ffc2b End endless recursion.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5101
diff changeset
255 return rjc.jobCreator.Depends()
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
256 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
257
5101
1b0b13e70bc1 Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5100
diff changeset
258 func (rjc *reviewedJobCreator) Description() string {
5120
22899babe85d Catch another endless recursin call.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5119
diff changeset
259 return rjc.jobCreator.Description() + ReviewJobSuffix
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
260 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
261
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
262 func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
263 return nil
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
264 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
265
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
266 type reviewedJob struct {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
267 ID int64 `json:"id"`
5103
a11705203f3f Nmae json field in reviewed job correctly.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5102
diff changeset
268 Accepted bool `json:"accepted"`
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
269 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
270
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
271 func (*reviewedJobCreator) Create() Job {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
272 return new(reviewedJob)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
273 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
274
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
275 func (*reviewedJob) CleanUp() error { return nil }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
276
5110
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
277 func (r *reviewedJob) CreateFeedback(int64) Feedback {
5119
5f62ac3db148 Don't write "import #42 started" into imports logs and remove the supress code to prevent this in review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5117
diff changeset
278 return logFeedback(r.ID)
5110
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
279 }
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
280
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
281 func (rj *reviewedJob) Do(
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
282 ctx context.Context,
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
283 importID int64,
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
284 conn *sql.Conn,
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
285 feedback Feedback,
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
286 ) (interface{}, error) {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
287
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
288 tx, err := conn.BeginTx(ctx, nil)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
289 if err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
290 return nil, err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
291 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
292 defer tx.Rollback()
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
293
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
294 var signer string
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
295 if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
296 return nil, err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
297 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
298
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
299 var user, kind string
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
300 if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
301 return nil, err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
302 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
303
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
304 jc := FindJobCreator(JobKind(kind))
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
305 if jc == nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
306 return nil, fmt.Errorf("no job creator found for '%s'", kind)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
307 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
308
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
309 importFeedback := logFeedback(rj.ID)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
310
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
311 if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
312 userTx, err := conn.BeginTx(ctx, nil)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
313 if err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
314 return err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
315 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
316 defer userTx.Rollback()
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
317
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
318 if rj.Accepted {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
319 err = jc.StageDone(ctx, userTx, rj.ID, importFeedback)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
320 } else {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
321 _, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
322 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
323 if err == nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
324 err = userTx.Commit()
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
325 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
326 return err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
327 }); err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
328 return nil, err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
329 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
330
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
331 // Remove the import track
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
332 if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
333 return nil, err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
334 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
335
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
336 var state string
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
337 if rj.Accepted {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
338 state = "accepted"
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
339 } else {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
340 state = "declined"
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
341 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
342
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
343 if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
344 return nil, err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
345 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
346
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
347 importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
348
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
349 return nil, tx.Commit()
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
350 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
351
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
352 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
353 q.creatorsMu.Lock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
354 defer q.creatorsMu.Unlock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
355 q.creators[kind] = jc
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
356 q.creators[kind+ReviewJobSuffix] = &reviewedJobCreator{jobCreator: jc}
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
357
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
358 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
359
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
360 // FindJobCreator looks up a JobCreator in the global import queue.
1193
58acc343b1b6 Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1190
diff changeset
361 func FindJobCreator(kind JobKind) JobCreator {
58acc343b1b6 Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1190
diff changeset
362 return iqueue.jobCreator(kind)
58acc343b1b6 Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1190
diff changeset
363 }
58acc343b1b6 Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1190
diff changeset
364
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
365 // ImportKindNames is a list of the names of the imports the
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
366 // global import queue supports.
1495
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
367 func ImportKindNames() []string {
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
368 return iqueue.importKindNames()
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
369 }
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
370
3246
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
371 // LogImportKindNames logs a list of importer types registered
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
372 // to the global import queue.
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
373 func LogImportKindNames() {
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
374 kinds := ImportKindNames()
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
375 sort.Strings(kinds)
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
376 log.Printf("info: registered import kinds: %s",
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
377 strings.Join(kinds, ", "))
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
378 }
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
379
1694
4a2fad8f57de Imports: Resolved golint issues unrelated to exported symbols commenting.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 1657
diff changeset
380 // HasImportKindName checks if the import queue supports a given kind.
1627
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
381 func HasImportKindName(kind string) bool {
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
382 return iqueue.hasImportKindName(kind)
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
383 }
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
384
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
385 //
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
386 func (q *importQueue) hasImportKindName(kind string) bool {
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
387 q.creatorsMu.Lock()
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
388 defer q.creatorsMu.Unlock()
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
389 return q.creators[JobKind(kind)] != nil
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
390 }
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
391
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
392 // RegisterJobCreator adds a JobCreator to the global import queue.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
393 // This a good candidate to be called in a init function for
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
394 // a particular JobCreator.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
395 func RegisterJobCreator(kind JobKind, jc JobCreator) {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
396 iqueue.registerJobCreator(kind, jc)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
397 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
398
1495
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
399 func (q *importQueue) importKindNames() []string {
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
400 q.creatorsMu.Lock()
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
401 defer q.creatorsMu.Unlock()
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
402 names := make([]string, len(q.creators))
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
403 var i int
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
404 for kind := range q.creators {
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
405 names[i] = string(kind)
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
406 i++
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
407 }
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
408 // XXX: Consider using sort.Strings to make output deterministic.
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
409 return names
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
410 }
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
411
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
412 func (idj *idJob) nextRetry(feedback Feedback) bool {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
413 switch {
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
414 case idj.waitRetry.Status != pgtype.Present && !idj.triesLeft.Valid:
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
415 return false
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
416 case idj.waitRetry.Status == pgtype.Present && !idj.triesLeft.Valid:
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
417 return true
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
418 case idj.triesLeft.Valid:
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
419 if idj.triesLeft.Int64 < 1 {
5114
da26076ffafe More neutral messages in retries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5113
diff changeset
420 feedback.Warn("no retries left")
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
421 } else {
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
422 idj.triesLeft.Int64--
5114
da26076ffafe More neutral messages in retries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5113
diff changeset
423 feedback.Info("failed but will retry")
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
424 return true
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
425 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
426 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
427 return false
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
428 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
429
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
430 func (idj *idJob) nextDue() time.Time {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
431 now := time.Now()
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
432 if idj.waitRetry.Status == pgtype.Present {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
433 var d time.Duration
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
434 if err := idj.waitRetry.AssignTo(&d); err != nil {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
435 log.Printf("error: converting waitRetry failed: %v\n", err)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
436 } else {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
437 now = now.Add(d)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
438 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
439 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
440 return now
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
441 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
442
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
443 func (idj *idJob) triesLeftPointer() *int {
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
444 if !idj.triesLeft.Valid {
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
445 return nil
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
446 }
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
447 t := int(idj.triesLeft.Int64)
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
448 return &t
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
449 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
450
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
451 func (idj *idJob) waitRetryPointer() *time.Duration {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
452 if idj.waitRetry.Status != pgtype.Present {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
453 return nil
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
454 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
455 d := new(time.Duration)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
456 if err := idj.waitRetry.AssignTo(d); err != nil {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
457 log.Printf("error: converting waitRetry failed: %v\n", err)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
458 return nil
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
459 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
460 return d
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
461 }
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
462
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
463 func (q *importQueue) lockDependencies(jc JobCreator) {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
464 deps := jc.Depends()
3225
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
465 q.creatorsMu.Lock()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
466 defer q.creatorsMu.Unlock()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
467 for _, d := range deps[0] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
468 q.usedDeps[d] = runExclusive
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
469 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
470 for _, d := range deps[1] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
471 q.usedDeps[d]++
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
472 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
473 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
474
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
475 func (q *importQueue) unlockDependencies(jc JobCreator) {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
476 deps := jc.Depends()
3225
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
477 q.creatorsMu.Lock()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
478 defer q.creatorsMu.Unlock()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
479 for _, d := range deps[0] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
480 q.usedDeps[d] = 0
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
481 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
482 for _, d := range deps[1] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
483 q.usedDeps[d]--
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
484 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
485 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
486
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
487 func (q *importQueue) jobCreator(kind JobKind) JobCreator {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
488 q.creatorsMu.Lock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
489 defer q.creatorsMu.Unlock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
490 return q.creators[kind]
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
491 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
492
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
493 func (q *importQueue) addJob(
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
494 kind JobKind,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
495 due time.Time,
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
496 triesLeft *int,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
497 waitRetry *time.Duration,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
498 user string,
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
499 sendEmail bool,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
500 data string,
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
501 sync bool,
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
502 ) (int64, chan struct{}, error) {
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
503
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
504 var id int64
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
505 if due.IsZero() {
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
506 due = time.Now()
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
507 }
4084
350a24c92848 Deliver times from import queue in UTC.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3246
diff changeset
508 due = due.UTC()
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
509
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
510 var tl sql.NullInt64
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
511 if triesLeft != nil {
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
512 tl = sql.NullInt64{Int64: int64(*triesLeft), Valid: true}
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
513 }
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
514
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
515 var wr pgtype.Interval
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
516 if waitRetry != nil {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
517 if err := wr.Set(*waitRetry); err != nil {
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
518 return 0, nil, err
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
519 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
520 } else {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
521 wr = pgtype.Interval{Status: pgtype.Null}
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
522 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
523
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
524 errCh := make(chan error)
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
525 var done chan struct{}
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
526
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
527 q.cmdCh <- func(q *importQueue) {
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
528 ctx := context.Background()
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
529 errCh <- auth.RunAs(ctx, user, func(conn *sql.Conn) error {
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
530 err := conn.QueryRowContext(
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
531 ctx,
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
532 insertJobSQL,
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
533 string(kind),
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
534 due,
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
535 tl,
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
536 &wr,
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
537 user,
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
538 sendEmail,
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
539 data).Scan(&id)
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
540
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
541 if err == nil && sync {
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
542 log.Printf("info: register wait for %d\n", id)
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
543 done = make(chan struct{})
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
544 q.waiting[id] = done
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
545 }
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
546
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
547 return err
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
548 })
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
549 }
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
550
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
551 return id, done, <-errCh
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
552 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
553
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
554 // AddJob adds a job to the global import queue to be executed
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
555 // as soon as possible after due.
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
556 // This is gone in a separate Go routine
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
557 // so this will not block.
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
558 func AddJob(
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
559 kind JobKind,
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
560 due time.Time,
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
561 triesLeft *int,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
562 waitRetry *time.Duration,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
563 user string,
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
564 sendEmail bool,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
565 data string,
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
566 ) (int64, error) {
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
567 id, _, err := iqueue.addJob(
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
568 kind,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
569 due,
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
570 triesLeft,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
571 waitRetry,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
572 user,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
573 sendEmail,
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
574 data,
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
575 false)
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
576 return id, err
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
577 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
578
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
579 const (
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
580 isPendingSQL = `
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
581 SELECT
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
582 state = 'pending'::import_state,
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
583 kind
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
584 FROM import.imports
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
585 WHERE id = $1`
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
586
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
587 selectUserSQL = `
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
588 SELECT username from import.imports WHERE ID = $1`
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
589
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
590 selectUserKindSQL = `
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
591 SELECT username, kind from import.imports WHERE ID = $1`
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
592
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
593 reviewSQL = `
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
594 UPDATE import.imports SET
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
595 state = $1::import_state,
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
596 changed = CURRENT_TIMESTAMP,
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
597 signer = $2
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
598 WHERE id = $3`
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
599
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
600 deleteImportDataSQL = `SELECT import.del_import($1)`
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
601
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
602 deleteImportTrackSQL = `
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
603 DELETE FROM import.track_imports WHERE import_id = $1`
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
604 )
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
605
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
606 func (q *importQueue) decideImportTx(
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
607 ctx context.Context,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
608 tx *sql.Tx,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
609 id int64,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
610 accepted bool,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
611 reviewer string,
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
612 ) (chan struct{}, error) {
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
613 var (
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
614 pending bool
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
615 kind string
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
616 )
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
617
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
618 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); {
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
619 case err == sql.ErrNoRows:
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
620 return nil, fmt.Errorf("cannot find import #%d", id)
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
621 case err != nil:
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
622 return nil, err
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
623 case !pending:
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
624 return nil, fmt.Errorf("#%d is not pending", id)
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
625 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
626
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
627 jc := q.jobCreator(JobKind(kind))
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
628 if jc == nil {
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
629 return nil, fmt.Errorf("no job creator for kind '%s'", kind)
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
630 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
631
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
632 r := &reviewedJob{
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
633 ID: id,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
634 Accepted: accepted,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
635 }
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
636 serialized, err := common.ToJSONString(r)
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
637 if err != nil {
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
638 return nil, err
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
639 }
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
640
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
641 // Try a little harder to persist the decision.
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
642 tries := reviewJobRetries
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
643 wait := reviewJobWait
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
644
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
645 rID, done, err := q.addJob(
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
646 JobKind(kind+ReviewJobSuffix),
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
647 time.Now(),
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
648 &tries,
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
649 &wait,
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
650 reviewer,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
651 false,
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
652 serialized,
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
653 true)
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
654 if err != nil {
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
655 return nil, err
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
656 }
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
657 log.Printf("info: add review job %d\n", rID)
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
658 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id)
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
659 if err != nil && done != nil {
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
660 go func() {
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
661 q.cmdCh <- func(q *importQueue) {
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
662 delete(q.waiting, rID)
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
663 }
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
664 }()
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
665 done = nil
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
666 }
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
667 return done, err
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
668 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
669
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
670 func (q *importQueue) decideImport(
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
671 ctx context.Context,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
672 id int64,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
673 accepted bool,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
674 reviewer string,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
675 ) error {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
676 if ctx == nil {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
677 ctx = context.Background()
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
678 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
679
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
680 var done chan struct{}
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
681
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
682 if err := auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error {
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
683 tx, err := conn.BeginTx(ctx, nil)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
684 if err != nil {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
685 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
686 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
687 defer tx.Rollback()
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
688 done, err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
689 if err == nil {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
690 err = tx.Commit()
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
691 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
692 return err
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
693 }); err != nil {
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
694 return err
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
695 }
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
696
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
697 <-done
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
698 return nil
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
699 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
700
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
701 func DecideImport(
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
702 ctx context.Context,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
703 id int64,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
704 accepted bool,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
705 reviewer string,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
706 ) error {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
707 return iqueue.decideImport(ctx, id, accepted, reviewer)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
708 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
709
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
710 type logFeedback int64
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
711
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
712 func (lf logFeedback) log(kind, format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
713 ctx := context.Background()
1327
cabf4789e02b To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1193
diff changeset
714 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
715 _, err := conn.ExecContext(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
716 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
717 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
718 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
719 if err != nil {
1760
22148eb0f986 More on harmonizing logging.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1758
diff changeset
720 log.Printf("error: logging failed: %v\n", err)
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
721 }
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
722 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
723
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
724 func (lf logFeedback) Info(format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
725 lf.log("info", format, args...)
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
726 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
727
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
728 func (lf logFeedback) Warn(format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
729 lf.log("warn", format, args...)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
730 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
731
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
732 func (lf logFeedback) Error(format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
733 lf.log("error", format, args...)
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
734 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
735
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
736 func survive(fn func() error) func() error {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
737 return func() (err error) {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
738 defer func() {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
739 if p := recover(); p != nil {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
740 err = fmt.Errorf("%v: %s", p, string(debug.Stack()))
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
741 }
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
742 }()
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
743 return fn()
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
744 }
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
745 }
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
746
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
747 func reEnqueueRunning() error {
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
748 ctx := context.Background()
1327
cabf4789e02b To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1193
diff changeset
749 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
750 _, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
751 return err
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
752 })
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
753 }
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
754
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
755 func (q *importQueue) fetchJob() (*idJob, error) {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
756
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
757 var which []string
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
758
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
759 q.creatorsMu.Lock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
760 nextCreator:
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
761 for kind, jc := range q.creators {
3225
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
762 deps := jc.Depends()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
763 for _, d := range deps[0] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
764 if q.usedDeps[d] != 0 {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
765 continue nextCreator
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
766 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
767 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
768 for _, d := range deps[1] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
769 if q.usedDeps[d] == runExclusive {
3221
899914a18d7e Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3219
diff changeset
770 continue nextCreator
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
771 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
772 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
773 which = append(which, string(kind))
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
774 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
775 q.creatorsMu.Unlock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
776
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
777 if len(which) == 0 {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
778 return nil, sql.ErrNoRows
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
779 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
780
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
781 var kinds pgtype.TextArray
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
782 if err := kinds.Set(which); err != nil {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
783 return nil, err
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
784 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
785
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
786 var ji idJob
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
787 ctx := context.Background()
1327
cabf4789e02b To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1193
diff changeset
788 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
789 tx, err := conn.BeginTx(ctx, nil)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
790 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
791 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
792 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
793 defer tx.Rollback()
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
794 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
795 &ji.id,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
796 &ji.kind,
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
797 &ji.triesLeft,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
798 &ji.waitRetry,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
799 &ji.user,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
800 &ji.sendEmail,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
801 &ji.data,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
802 ); err != nil {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
803 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
804 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
805 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
806 if err == nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
807 err = tx.Commit()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
808 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
809 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
810 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
811 switch {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
812 case err == sql.ErrNoRows:
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
813 return nil, nil
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
814 case err != nil:
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
815 return nil, err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
816 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
817 return &ji, nil
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
818 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
819
2872
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
820 func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
821 // As it is important to keep the persistent model
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
822 // in sync with the in-memory model try harder to store
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
823 // the state.
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
824 const maxTries = 10
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
825 var sleep = time.Second
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
826
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
827 for try := 1; ; try++ {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
828 var err error
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
829 if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
830 return err
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
831 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
832 log.Printf("warn: [try %d/%d] Storing state failed: %v (try again in %s).\n",
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
833 try, maxTries, err, sleep)
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
834
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
835 time.Sleep(sleep)
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
836 if sleep < time.Minute {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
837 if sleep *= 2; sleep > time.Minute {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
838 sleep = time.Minute
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
839 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
840 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
841 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
842 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
843
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
844 func updateStateSummary(
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
845 ctx context.Context,
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
846 id int64,
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
847 state string,
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
848 summary interface{},
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
849 ) error {
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
850 var s sql.NullString
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
851 if summary != nil {
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
852 var b strings.Builder
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
853 if err := json.NewEncoder(&b).Encode(summary); err != nil {
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
854 return err
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
855 }
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
856 s = sql.NullString{String: b.String(), Valid: true}
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
857 }
2823
b150e5b37afe Import queue: As it is important to keep the in-memory and the persistent model in sync try harder to store the final state change if it fails.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2801
diff changeset
858
2872
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
859 return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
860 _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id)
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
861 return err
2872
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
862 })
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
863 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
864
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
865 func deleteJob(ctx context.Context, id int64) error {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
866 return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
867 _, err := conn.ExecContext(ctx, deleteJobSQL, id)
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
868 return err
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
869 })
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
870 }
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
871
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
872 func errorAndFail(id int64, format string, args ...interface{}) error {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
873 ctx := context.Background()
2872
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
874 return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
875 tx, err := conn.BeginTx(ctx, nil)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
876 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
877 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
878 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
879 defer tx.Rollback()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
880 _, err = conn.ExecContext(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
881 ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
882 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
883 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
884 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
885 _, err = conn.ExecContext(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
886 ctx, updateStateSQL, "failed", id)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
887 if err == nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
888 err = tx.Commit()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
889 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
890 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
891 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
892 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
893
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
894 func (q *importQueue) importLoop() {
1000
14425e35e3c2 Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 998
diff changeset
895 config.WaitReady()
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
896 // re-enqueue the jobs that are in state running.
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
897 // They where in progess when the server went down.
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
898 if err := reEnqueueRunning(); err != nil {
1751
c3a6aaf926c3 Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1740
diff changeset
899 log.Printf("error: re-enqueuing failed: %v", err)
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
900 }
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
901
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
902 for {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
903 var idj *idJob
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
904 var err error
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
905
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
906 for {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
907 if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows {
1751
c3a6aaf926c3 Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1740
diff changeset
908 log.Printf("error: db: %v\n", err)
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
909 }
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
910 if idj != nil {
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
911 break
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
912 }
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
913 select {
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
914 case cmd := <-q.cmdCh:
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
915 cmd(q)
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
916
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
917 case <-time.After(pollDuration):
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
918 }
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
919 }
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
920
1751
c3a6aaf926c3 Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1740
diff changeset
921 log.Printf("info: starting import #%d\n", idj.id)
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
922
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
923 jc := q.jobCreator(idj.kind)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
924 if jc == nil {
1010
8f23ec811afb Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1005
diff changeset
925 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
926 continue
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
927 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
928
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
929 // Lock dependencies.
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
930 q.lockDependencies(jc)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
931
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
932 go func(jc JobCreator, idj *idJob) {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
933
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
934 defer func() {
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
935 // Unlock the dependencies.
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
936 q.unlockDependencies(jc)
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
937 // Unlock waiting.
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
938 q.cmdCh <- func(q *importQueue) {
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
939 if w := q.waiting[idj.id]; w != nil {
5124
6910c1cad1fb Moved misplaced log line around.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5123
diff changeset
940 log.Printf("info: unlock waiting %d\n", idj.id)
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
941 close(w)
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
942 delete(q.waiting, idj.id)
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
943 }
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
944 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
945 }()
1010
8f23ec811afb Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1005
diff changeset
946
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
947 job := jc.Create()
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
948 if err := common.FromJSONString(idj.data, job); err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
949 errorAndFail(idj.id, "failed to create job for import #%d: %v",
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
950 idj.id, err)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
951 return
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
952 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
953
5110
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
954 var feedback Feedback
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
955 if fc, ok := job.(FeedbackJob); ok {
5110
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
956 feedback = fc.CreateFeedback(idj.id)
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
957 } else {
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
958 feedback = logFeedback(idj.id)
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
959 }
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
960
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
961 ctx := context.Background()
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
962 var summary interface{}
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
963
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
964 errDo := survive(func() error {
1327
cabf4789e02b To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1193
diff changeset
965 return auth.RunAs(ctx, idj.user,
1168
930fdd8b474f Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1138
diff changeset
966 func(conn *sql.Conn) error {
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
967 var err error
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
968 summary, err = job.Do(ctx, idj.id, conn, feedback)
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
969 return err
1168
930fdd8b474f Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1138
diff changeset
970 })
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
971 })()
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
972
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
973 var unchanged, retry bool
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
974 if v, ok := errDo.(UnchangedError); ok {
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
975 feedback.Info("unchanged: %s", v.Error())
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
976 unchanged = true
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
977 } else if errDo != nil {
4180
91cb4a7b1b13 Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents: 4084
diff changeset
978 feedback.Error("error in import: %v",
91cb4a7b1b13 Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents: 4084
diff changeset
979 pgxutils.ReadableError{Err: errDo})
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
980 retry = idj.nextRetry(feedback)
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
981 }
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
982
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
983 var errCleanup error
2801
054f5d61452d Import queue: Cleanup up debris after an import in all cases if we are not going to retry it.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2187
diff changeset
984 if !retry { // cleanup debris
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
985 if errCleanup = survive(job.CleanUp)(); errCleanup != nil {
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
986 feedback.Error("error cleanup: %v", errCleanup)
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
987 }
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
988 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
989
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
990 var remove bool
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
991 if remover, ok := jc.(JobRemover); ok {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
992 remove = remover.RemoveJob()
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
993 }
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
994
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
995 var state string
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
996 switch {
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
997 case unchanged:
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
998 state = "unchanged"
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
999 case errDo != nil || errCleanup != nil:
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
1000 state = "failed"
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
1001 case jc.AutoAccept():
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
1002 state = "accepted"
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
1003 default:
1190
e3de65179889 The imort queue has now six states:
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1189
diff changeset
1004 state = "pending"
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
1005 }
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
1006 if !remove {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
1007 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
1008 log.Printf("error: setting state of job %d failed: %v\n", idj.id, err)
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
1009 }
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
1010 log.Printf("info: import #%d finished: %s\n", idj.id, state)
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
1011 }
1646
a0982c38eac0 Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1642
diff changeset
1012 if idj.sendEmail {
a0982c38eac0 Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1642
diff changeset
1013 go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
a0982c38eac0 Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1642
diff changeset
1014 }
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
1015
5115
bb5459faadb7 Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5114
diff changeset
1016 if retry {
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
1017 nid, _, err := q.addJob(
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
1018 idj.kind,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
1019 idj.nextDue(),
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
1020 idj.triesLeftPointer(),
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
1021 idj.waitRetryPointer(),
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
1022 idj.user, idj.sendEmail,
5123
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
1023 idj.data,
eeb45e3e0a5a Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5122
diff changeset
1024 false)
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
1025 if err != nil {
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
1026 log.Printf("error: retry enqueue failed: %v\n", err)
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
1027 } else {
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
1028 log.Printf("info: re-enqueued job with id %d\n", nid)
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
1029 }
5115
bb5459faadb7 Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5114
diff changeset
1030 }
bb5459faadb7 Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5114
diff changeset
1031 if remove {
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
1032 if err := deleteJob(ctx, idj.id); err != nil {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
1033 log.Printf("error: deleting job %d failed: %v\n", idj.id, err)
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
1034 }
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
1035 }
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
1036 }(jc, idj)
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
1037 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
1038 }