Mercurial > gemma
annotate pkg/imports/queue.go @ 5123:eeb45e3e0a5a queued-stage-done
Added mechanism to have sync import jobs on import queue.
Review jobs are now sync with a controller waiting for 20 secs before returning.
If all reviews return earlier the controller extists earlier, too.
If one or more decisions took longer they are run in background till they are
decided and the the controller returns a error message for these imports
that the process is st still running.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 26 Mar 2020 22:24:45 +0100 |
parents | 0b6b62d247e8 |
children | 6910c1cad1fb |
rev | line source |
---|---|
1017
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
1 // This is Free Software under GNU Affero General Public License v >= 3.0 |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
2 // without warranty, see README.md and license for details. |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
3 // |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
4 // SPDX-License-Identifier: AGPL-3.0-or-later |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
5 // License-Filename: LICENSES/AGPL-3.0.txt |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
6 // |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
7 // Copyright (C) 2018 by via donau |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
8 // – Österreichische Wasserstraßen-Gesellschaft mbH |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
9 // Software engineering by Intevation GmbH |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
10 // |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
11 // Author(s): |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
13 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
14 package imports |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
15 |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
16 import ( |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
17 "context" |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
18 "database/sql" |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
19 "encoding/json" |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
20 "fmt" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
21 "log" |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
22 "runtime/debug" |
3246
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
23 "sort" |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
24 "strings" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
25 "sync" |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
26 "time" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
27 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
28 "github.com/jackc/pgx/pgtype" |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
29 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
30 "gemma.intevation.de/gemma/pkg/auth" |
2187
7c83b5277c1c
Import queue: Removed boilerplate code to deserialize jobs from JSON by making it part of the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2168
diff
changeset
|
31 "gemma.intevation.de/gemma/pkg/common" |
1000
14425e35e3c2
Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
998
diff
changeset
|
32 "gemma.intevation.de/gemma/pkg/config" |
4180
91cb4a7b1b13
Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents:
4084
diff
changeset
|
33 "gemma.intevation.de/gemma/pkg/pgxutils" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
34 ) |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
35 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
36 type ( |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
37 // Feedback is passed to the Do method of a Job to log |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
38 // informations, warnings or errors. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
39 Feedback interface { |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
40 // Info logs informations. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
41 Info(fmt string, args ...interface{}) |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
42 // Warn logs warnings. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
43 Warn(fmt string, args ...interface{}) |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
44 // Error logs errors. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
45 Error(fmt string, args ...interface{}) |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
46 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
47 |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
48 // UnchangedError may be issued by Do of a Job to indicate |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
49 // That the database has not changed. |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
50 UnchangedError string |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
51 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
52 // Job is the central abstraction of an import job |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
53 // run by the import queue. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
54 Job interface { |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
55 // Do is called to do the actual import. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
56 // Bind transactions to ctx and conn, please- |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
57 // id is the number of the import job. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
58 // feedback can be used to log the import process. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
59 // If no error is return the import is assumed to |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
60 // be successfull. The non-error return value is |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
61 // serialized as a JSON string into the database as |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
62 // a summary to the import to be used by the review process. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
63 Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error) |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
64 // CleanUp is called to clean up ressources hold by the import. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
65 // It is called whether the import succeeded or not. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
66 CleanUp() error |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
67 } |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
68 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
69 FeedbackJob interface { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
70 Job |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
71 CreateFeedback(int64) Feedback |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
72 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
73 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
74 // JobKind is the type of an import. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
75 // Choose a unique name for every import. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
76 JobKind string |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
77 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
78 // JobCreator is used to bring a job to life as it is stored |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
79 // in pure meta-data form to the database. |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
80 JobCreator interface { |
5104
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
81 // Description is the long name of the import. |
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
82 Description() string |
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
83 // Create build the actual job. |
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
84 Create() Job |
3219
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
85 // Depends returns two lists of ressources locked by this type of import. |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
86 // Imports are run concurrently if they have disjoint sets |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
87 // of dependencies. |
3219
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
88 // The first list are locked exclusively. |
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
89 // The second allows multiple read users but only one writing one. |
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
90 Depends() [2][]string |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
91 // StageDone is called if an import is positively reviewed |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
92 // (state = accepted). This can be used to finalize the imported |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
93 // data to move it e.g from the staging area. |
5034
59a99655f34d
Added feedback support for StageDone.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5033
diff
changeset
|
94 StageDone(context.Context, *sql.Tx, int64, Feedback) error |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
95 // AutoAccept indicates that imports of this kind |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
96 // don't need a review. |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
97 AutoAccept() bool |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
98 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
99 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
100 JobRemover interface { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
101 JobCreator |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
102 RemoveJob() bool |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
103 } |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
104 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
105 idJob struct { |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
106 id int64 |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
107 kind JobKind |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
108 user string |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
109 waitRetry pgtype.Interval |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
110 triesLeft sql.NullInt64 |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
111 sendEmail bool |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
112 data string |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
113 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
114 ) |
987
3841509f6e9e
Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
979
diff
changeset
|
115 |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
116 const ( |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
117 pollDuration = time.Second * 10 |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
118 runExclusive = -66666 |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
119 ) |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
120 |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
121 const ( |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
122 ReviewJobSuffix = "#review" |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
123 reviewJobRetries = 10 |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
124 reviewJobWait = time.Minute |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
125 ) |
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
126 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
127 type importQueue struct { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
128 cmdCh chan func(*importQueue) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
129 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
130 creatorsMu sync.Mutex |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
131 creators map[JobKind]JobCreator |
3221
899914a18d7e
Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3219
diff
changeset
|
132 usedDeps map[string]int |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
133 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
134 waiting map[int64]chan struct{} |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
135 } |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
136 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
137 var iqueue = importQueue{ |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
138 cmdCh: make(chan func(*importQueue)), |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
139 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
140 creators: map[JobKind]JobCreator{}, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
141 usedDeps: map[string]int{}, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
142 waiting: make(map[int64]chan struct{}), |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
143 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
144 |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
145 var ( |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
146 // ImportStateNames is a list of the states a job can be in. |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
147 ImportStateNames = []string{ |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
148 "queued", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
149 "running", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
150 "failed", |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
151 "unchanged", |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
152 "pending", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
153 "accepted", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
154 "declined", |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
155 "reviewed", |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
156 } |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
157 ) |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
158 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
159 const ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
160 queueUser = "sys_admin" |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
161 |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
162 reEnqueueRunningSQL = ` |
4748
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
163 UPDATE import.imports SET |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
164 state = 'queued'::import_state, |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
165 changed = CURRENT_TIMESTAMP |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
166 WHERE state = 'running'::import_state` |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
167 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
168 insertJobSQL = ` |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
169 INSERT INTO import.imports ( |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
170 kind, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
171 due, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
172 trys_left, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
173 retry_wait, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
174 username, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
175 send_email, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
176 data |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
177 ) VALUES ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
178 $1, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
179 COALESCE($2, CURRENT_TIMESTAMP), |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
180 $3, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
181 $4, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
182 $5, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
183 $6, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
184 $7 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
185 ) RETURNING id` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
186 |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
187 // Select oldest queued job but prioritize review jobs |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
188 selectJobSQL = ` |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
189 SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `') |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
190 id, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
191 kind, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
192 trys_left, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
193 retry_wait, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
194 username, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
195 send_email, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
196 data |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
197 FROM import.imports |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
198 WHERE |
1718
8ddbedf296d7
Re-scheduled imports: be a bit more tolerant about start time.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1708
diff
changeset
|
199 due <= CURRENT_TIMESTAMP + interval '5 seconds' AND |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
200 state = 'queued'::import_state AND |
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
201 kind = ANY($1) |
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
202 ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
203 LIMIT 1` |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
204 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
205 updateStateSQL = ` |
4748
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
206 UPDATE import.imports SET |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
207 state = $1::import_state, |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
208 changed = CURRENT_TIMESTAMP |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
209 WHERE id = $2` |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
210 |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
211 updateStateSummarySQL = ` |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
212 UPDATE import.imports SET |
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
213 state = $1::import_state, |
4748
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
214 changed = CURRENT_TIMESTAMP, |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
215 summary = $2 |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
216 WHERE id = $3` |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
217 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
218 deleteJobSQL = ` |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
219 DELETE FROM import.imports WHERE id = $1` |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
220 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
221 logMessageSQL = ` |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
222 INSERT INTO import.import_logs ( |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
223 import_id, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
224 kind, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
225 msg |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
226 ) VALUES ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
227 $1, |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
228 $2::log_type, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
229 $3 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
230 )` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
231 ) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
232 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
233 func init() { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
234 go iqueue.importLoop() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
235 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
236 |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
237 // Error makes UnchangedError an error. |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
238 func (ue UnchangedError) Error() string { |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
239 return string(ue) |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
240 } |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
241 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
242 type reviewedJobCreator struct { |
5101
1b0b13e70bc1
Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5100
diff
changeset
|
243 jobCreator JobCreator |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
244 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
245 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
246 func (*reviewedJobCreator) AutoAccept() bool { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
247 return true |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
248 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
249 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
250 func (*reviewedJobCreator) RemoveJob() bool { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
251 return true |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
252 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
253 |
5101
1b0b13e70bc1
Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5100
diff
changeset
|
254 func (rjc *reviewedJobCreator) Depends() [2][]string { |
5102
8cc5b08ffc2b
End endless recursion.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5101
diff
changeset
|
255 return rjc.jobCreator.Depends() |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
256 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
257 |
5101
1b0b13e70bc1
Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5100
diff
changeset
|
258 func (rjc *reviewedJobCreator) Description() string { |
5120
22899babe85d
Catch another endless recursin call.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5119
diff
changeset
|
259 return rjc.jobCreator.Description() + ReviewJobSuffix |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
260 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
261 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
262 func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
263 return nil |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
264 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
265 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
266 type reviewedJob struct { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
267 ID int64 `json:"id"` |
5103
a11705203f3f
Nmae json field in reviewed job correctly.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5102
diff
changeset
|
268 Accepted bool `json:"accepted"` |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
269 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
270 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
271 func (*reviewedJobCreator) Create() Job { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
272 return new(reviewedJob) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
273 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
274 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
275 func (*reviewedJob) CleanUp() error { return nil } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
276 |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
277 func (r *reviewedJob) CreateFeedback(int64) Feedback { |
5119
5f62ac3db148
Don't write "import #42 started" into imports logs and remove the supress code to prevent this in review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5117
diff
changeset
|
278 return logFeedback(r.ID) |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
279 } |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
280 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
281 func (rj *reviewedJob) Do( |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
282 ctx context.Context, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
283 importID int64, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
284 conn *sql.Conn, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
285 feedback Feedback, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
286 ) (interface{}, error) { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
287 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
288 tx, err := conn.BeginTx(ctx, nil) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
289 if err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
290 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
291 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
292 defer tx.Rollback() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
293 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
294 var signer string |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
295 if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
296 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
297 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
298 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
299 var user, kind string |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
300 if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
301 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
302 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
303 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
304 jc := FindJobCreator(JobKind(kind)) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
305 if jc == nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
306 return nil, fmt.Errorf("no job creator found for '%s'", kind) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
307 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
308 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
309 importFeedback := logFeedback(rj.ID) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
310 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
311 if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
312 userTx, err := conn.BeginTx(ctx, nil) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
313 if err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
314 return err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
315 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
316 defer userTx.Rollback() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
317 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
318 if rj.Accepted { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
319 err = jc.StageDone(ctx, userTx, rj.ID, importFeedback) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
320 } else { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
321 _, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
322 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
323 if err == nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
324 err = userTx.Commit() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
325 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
326 return err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
327 }); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
328 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
329 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
330 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
331 // Remove the import track |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
332 if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
333 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
334 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
335 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
336 var state string |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
337 if rj.Accepted { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
338 state = "accepted" |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
339 } else { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
340 state = "declined" |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
341 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
342 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
343 if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
344 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
345 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
346 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
347 importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
348 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
349 return nil, tx.Commit() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
350 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
351 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
352 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
353 q.creatorsMu.Lock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
354 defer q.creatorsMu.Unlock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
355 q.creators[kind] = jc |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
356 q.creators[kind+ReviewJobSuffix] = &reviewedJobCreator{jobCreator: jc} |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
357 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
358 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
359 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
360 // FindJobCreator looks up a JobCreator in the global import queue. |
1193
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
361 func FindJobCreator(kind JobKind) JobCreator { |
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
362 return iqueue.jobCreator(kind) |
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
363 } |
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
364 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
365 // ImportKindNames is a list of the names of the imports the |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
366 // global import queue supports. |
1495
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
367 func ImportKindNames() []string { |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
368 return iqueue.importKindNames() |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
369 } |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
370 |
3246
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
371 // LogImportKindNames logs a list of importer types registered |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
372 // to the global import queue. |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
373 func LogImportKindNames() { |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
374 kinds := ImportKindNames() |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
375 sort.Strings(kinds) |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
376 log.Printf("info: registered import kinds: %s", |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
377 strings.Join(kinds, ", ")) |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
378 } |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
379 |
1694
4a2fad8f57de
Imports: Resolved golint issues unrelated to exported symbols commenting.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
1657
diff
changeset
|
380 // HasImportKindName checks if the import queue supports a given kind. |
1627
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
381 func HasImportKindName(kind string) bool { |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
382 return iqueue.hasImportKindName(kind) |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
383 } |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
384 |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
385 // |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
386 func (q *importQueue) hasImportKindName(kind string) bool { |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
387 q.creatorsMu.Lock() |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
388 defer q.creatorsMu.Unlock() |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
389 return q.creators[JobKind(kind)] != nil |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
390 } |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
391 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
392 // RegisterJobCreator adds a JobCreator to the global import queue. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
393 // This a good candidate to be called in a init function for |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
394 // a particular JobCreator. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
395 func RegisterJobCreator(kind JobKind, jc JobCreator) { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
396 iqueue.registerJobCreator(kind, jc) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
397 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
398 |
1495
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
399 func (q *importQueue) importKindNames() []string { |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
400 q.creatorsMu.Lock() |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
401 defer q.creatorsMu.Unlock() |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
402 names := make([]string, len(q.creators)) |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
403 var i int |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
404 for kind := range q.creators { |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
405 names[i] = string(kind) |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
406 i++ |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
407 } |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
408 // XXX: Consider using sort.Strings to make output deterministic. |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
409 return names |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
410 } |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
411 |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
412 func (idj *idJob) nextRetry(feedback Feedback) bool { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
413 switch { |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
414 case idj.waitRetry.Status != pgtype.Present && !idj.triesLeft.Valid: |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
415 return false |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
416 case idj.waitRetry.Status == pgtype.Present && !idj.triesLeft.Valid: |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
417 return true |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
418 case idj.triesLeft.Valid: |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
419 if idj.triesLeft.Int64 < 1 { |
5114
da26076ffafe
More neutral messages in retries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5113
diff
changeset
|
420 feedback.Warn("no retries left") |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
421 } else { |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
422 idj.triesLeft.Int64-- |
5114
da26076ffafe
More neutral messages in retries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5113
diff
changeset
|
423 feedback.Info("failed but will retry") |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
424 return true |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
425 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
426 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
427 return false |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
428 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
429 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
430 func (idj *idJob) nextDue() time.Time { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
431 now := time.Now() |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
432 if idj.waitRetry.Status == pgtype.Present { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
433 var d time.Duration |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
434 if err := idj.waitRetry.AssignTo(&d); err != nil { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
435 log.Printf("error: converting waitRetry failed: %v\n", err) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
436 } else { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
437 now = now.Add(d) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
438 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
439 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
440 return now |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
441 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
442 |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
443 func (idj *idJob) triesLeftPointer() *int { |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
444 if !idj.triesLeft.Valid { |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
445 return nil |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
446 } |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
447 t := int(idj.triesLeft.Int64) |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
448 return &t |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
449 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
450 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
451 func (idj *idJob) waitRetryPointer() *time.Duration { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
452 if idj.waitRetry.Status != pgtype.Present { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
453 return nil |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
454 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
455 d := new(time.Duration) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
456 if err := idj.waitRetry.AssignTo(d); err != nil { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
457 log.Printf("error: converting waitRetry failed: %v\n", err) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
458 return nil |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
459 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
460 return d |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
461 } |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
462 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
463 func (q *importQueue) lockDependencies(jc JobCreator) { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
464 deps := jc.Depends() |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
465 q.creatorsMu.Lock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
466 defer q.creatorsMu.Unlock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
467 for _, d := range deps[0] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
468 q.usedDeps[d] = runExclusive |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
469 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
470 for _, d := range deps[1] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
471 q.usedDeps[d]++ |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
472 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
473 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
474 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
475 func (q *importQueue) unlockDependencies(jc JobCreator) { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
476 deps := jc.Depends() |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
477 q.creatorsMu.Lock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
478 defer q.creatorsMu.Unlock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
479 for _, d := range deps[0] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
480 q.usedDeps[d] = 0 |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
481 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
482 for _, d := range deps[1] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
483 q.usedDeps[d]-- |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
484 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
485 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
486 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
487 func (q *importQueue) jobCreator(kind JobKind) JobCreator { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
488 q.creatorsMu.Lock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
489 defer q.creatorsMu.Unlock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
490 return q.creators[kind] |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
491 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
492 |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
493 func (q *importQueue) addJob( |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
494 kind JobKind, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
495 due time.Time, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
496 triesLeft *int, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
497 waitRetry *time.Duration, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
498 user string, |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
499 sendEmail bool, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
500 data string, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
501 sync bool, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
502 ) (int64, chan struct{}, error) { |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
503 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
504 var id int64 |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
505 if due.IsZero() { |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
506 due = time.Now() |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
507 } |
4084
350a24c92848
Deliver times from import queue in UTC.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3246
diff
changeset
|
508 due = due.UTC() |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
509 |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
510 var tl sql.NullInt64 |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
511 if triesLeft != nil { |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
512 tl = sql.NullInt64{Int64: int64(*triesLeft), Valid: true} |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
513 } |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
514 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
515 var wr pgtype.Interval |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
516 if waitRetry != nil { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
517 if err := wr.Set(*waitRetry); err != nil { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
518 return 0, nil, err |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
519 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
520 } else { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
521 wr = pgtype.Interval{Status: pgtype.Null} |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
522 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
523 |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
524 errCh := make(chan error) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
525 var done chan struct{} |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
526 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
527 q.cmdCh <- func(q *importQueue) { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
528 ctx := context.Background() |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
529 errCh <- auth.RunAs(ctx, user, func(conn *sql.Conn) error { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
530 err := conn.QueryRowContext( |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
531 ctx, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
532 insertJobSQL, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
533 string(kind), |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
534 due, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
535 tl, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
536 &wr, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
537 user, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
538 sendEmail, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
539 data).Scan(&id) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
540 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
541 if err == nil && sync { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
542 log.Printf("info: register wait for %d\n", id) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
543 done = make(chan struct{}) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
544 q.waiting[id] = done |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
545 } |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
546 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
547 return err |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
548 }) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
549 } |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
550 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
551 return id, done, <-errCh |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
552 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
553 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
554 // AddJob adds a job to the global import queue to be executed |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
555 // as soon as possible after due. |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
556 // This is gone in a separate Go routine |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
557 // so this will not block. |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
558 func AddJob( |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
559 kind JobKind, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
560 due time.Time, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
561 triesLeft *int, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
562 waitRetry *time.Duration, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
563 user string, |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
564 sendEmail bool, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
565 data string, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
566 ) (int64, error) { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
567 id, _, err := iqueue.addJob( |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
568 kind, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
569 due, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
570 triesLeft, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
571 waitRetry, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
572 user, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
573 sendEmail, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
574 data, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
575 false) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
576 return id, err |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
577 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
578 |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
579 const ( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
580 isPendingSQL = ` |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
581 SELECT |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
582 state = 'pending'::import_state, |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
583 kind |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
584 FROM import.imports |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
585 WHERE id = $1` |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
586 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
587 selectUserSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
588 SELECT username from import.imports WHERE ID = $1` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
589 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
590 selectUserKindSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
591 SELECT username, kind from import.imports WHERE ID = $1` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
592 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
593 reviewSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
594 UPDATE import.imports SET |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
595 state = $1::import_state, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
596 changed = CURRENT_TIMESTAMP, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
597 signer = $2 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
598 WHERE id = $3` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
599 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
600 deleteImportDataSQL = `SELECT import.del_import($1)` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
601 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
602 deleteImportTrackSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
603 DELETE FROM import.track_imports WHERE import_id = $1` |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
604 ) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
605 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
606 func (q *importQueue) decideImportTx( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
607 ctx context.Context, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
608 tx *sql.Tx, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
609 id int64, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
610 accepted bool, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
611 reviewer string, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
612 ) (chan struct{}, error) { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
613 var ( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
614 pending bool |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
615 kind string |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
616 ) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
617 |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
618 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
619 case err == sql.ErrNoRows: |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
620 return nil, fmt.Errorf("cannot find import #%d", id) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
621 case err != nil: |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
622 return nil, err |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
623 case !pending: |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
624 return nil, fmt.Errorf("#%d is not pending", id) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
625 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
626 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
627 jc := q.jobCreator(JobKind(kind)) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
628 if jc == nil { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
629 return nil, fmt.Errorf("no job creator for kind '%s'", kind) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
630 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
631 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
632 r := &reviewedJob{ |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
633 ID: id, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
634 Accepted: accepted, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
635 } |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
636 serialized, err := common.ToJSONString(r) |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
637 if err != nil { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
638 return nil, err |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
639 } |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
640 |
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
641 // Try a little harder to persist the decision. |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
642 tries := reviewJobRetries |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
643 wait := reviewJobWait |
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
644 |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
645 rID, done, err := q.addJob( |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
646 JobKind(kind+ReviewJobSuffix), |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
647 time.Now(), |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
648 &tries, |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
649 &wait, |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
650 reviewer, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
651 false, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
652 serialized, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
653 true) |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
654 if err != nil { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
655 return nil, err |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
656 } |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
657 log.Printf("info: add review job %d\n", rID) |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
658 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id) |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
659 if err != nil && done != nil { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
660 go func() { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
661 q.cmdCh <- func(q *importQueue) { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
662 delete(q.waiting, rID) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
663 } |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
664 }() |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
665 done = nil |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
666 } |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
667 return done, err |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
668 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
669 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
670 func (q *importQueue) decideImport( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
671 ctx context.Context, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
672 id int64, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
673 accepted bool, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
674 reviewer string, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
675 ) error { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
676 if ctx == nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
677 ctx = context.Background() |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
678 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
679 |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
680 var done chan struct{} |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
681 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
682 if err := auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
683 tx, err := conn.BeginTx(ctx, nil) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
684 if err != nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
685 return err |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
686 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
687 defer tx.Rollback() |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
688 done, err = q.decideImportTx(ctx, tx, id, accepted, reviewer) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
689 if err == nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
690 err = tx.Commit() |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
691 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
692 return err |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
693 }); err != nil { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
694 return err |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
695 } |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
696 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
697 <-done |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
698 return nil |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
699 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
700 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
701 func DecideImport( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
702 ctx context.Context, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
703 id int64, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
704 accepted bool, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
705 reviewer string, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
706 ) error { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
707 return iqueue.decideImport(ctx, id, accepted, reviewer) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
708 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
709 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
710 type logFeedback int64 |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
711 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
712 func (lf logFeedback) log(kind, format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
713 ctx := context.Background() |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
714 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
715 _, err := conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
716 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
717 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
718 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
719 if err != nil { |
1760
22148eb0f986
More on harmonizing logging.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1758
diff
changeset
|
720 log.Printf("error: logging failed: %v\n", err) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
721 } |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
722 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
723 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
724 func (lf logFeedback) Info(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
725 lf.log("info", format, args...) |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
726 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
727 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
728 func (lf logFeedback) Warn(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
729 lf.log("warn", format, args...) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
730 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
731 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
732 func (lf logFeedback) Error(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
733 lf.log("error", format, args...) |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
734 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
735 |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
736 func survive(fn func() error) func() error { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
737 return func() (err error) { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
738 defer func() { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
739 if p := recover(); p != nil { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
740 err = fmt.Errorf("%v: %s", p, string(debug.Stack())) |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
741 } |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
742 }() |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
743 return fn() |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
744 } |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
745 } |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
746 |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
747 func reEnqueueRunning() error { |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
748 ctx := context.Background() |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
749 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
750 _, err := conn.ExecContext(ctx, reEnqueueRunningSQL) |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
751 return err |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
752 }) |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
753 } |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
754 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
755 func (q *importQueue) fetchJob() (*idJob, error) { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
756 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
757 var which []string |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
758 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
759 q.creatorsMu.Lock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
760 nextCreator: |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
761 for kind, jc := range q.creators { |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
762 deps := jc.Depends() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
763 for _, d := range deps[0] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
764 if q.usedDeps[d] != 0 { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
765 continue nextCreator |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
766 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
767 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
768 for _, d := range deps[1] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
769 if q.usedDeps[d] == runExclusive { |
3221
899914a18d7e
Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3219
diff
changeset
|
770 continue nextCreator |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
771 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
772 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
773 which = append(which, string(kind)) |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
774 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
775 q.creatorsMu.Unlock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
776 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
777 if len(which) == 0 { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
778 return nil, sql.ErrNoRows |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
779 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
780 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
781 var kinds pgtype.TextArray |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
782 if err := kinds.Set(which); err != nil { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
783 return nil, err |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
784 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
785 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
786 var ji idJob |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
787 ctx := context.Background() |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
788 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
789 tx, err := conn.BeginTx(ctx, nil) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
790 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
791 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
792 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
793 defer tx.Rollback() |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
794 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
795 &ji.id, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
796 &ji.kind, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
797 &ji.triesLeft, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
798 &ji.waitRetry, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
799 &ji.user, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
800 &ji.sendEmail, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
801 &ji.data, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
802 ); err != nil { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
803 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
804 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
805 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
806 if err == nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
807 err = tx.Commit() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
808 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
809 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
810 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
811 switch { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
812 case err == sql.ErrNoRows: |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
813 return nil, nil |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
814 case err != nil: |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
815 return nil, err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
816 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
817 return &ji, nil |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
818 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
819 |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
820 func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
821 // As it is important to keep the persistent model |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
822 // in sync with the in-memory model try harder to store |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
823 // the state. |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
824 const maxTries = 10 |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
825 var sleep = time.Second |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
826 |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
827 for try := 1; ; try++ { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
828 var err error |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
829 if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
830 return err |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
831 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
832 log.Printf("warn: [try %d/%d] Storing state failed: %v (try again in %s).\n", |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
833 try, maxTries, err, sleep) |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
834 |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
835 time.Sleep(sleep) |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
836 if sleep < time.Minute { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
837 if sleep *= 2; sleep > time.Minute { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
838 sleep = time.Minute |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
839 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
840 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
841 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
842 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
843 |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
844 func updateStateSummary( |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
845 ctx context.Context, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
846 id int64, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
847 state string, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
848 summary interface{}, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
849 ) error { |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
850 var s sql.NullString |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
851 if summary != nil { |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
852 var b strings.Builder |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
853 if err := json.NewEncoder(&b).Encode(summary); err != nil { |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
854 return err |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
855 } |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
856 s = sql.NullString{String: b.String(), Valid: true} |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
857 } |
2823
b150e5b37afe
Import queue: As it is important to keep the in-memory and the persistent model in sync try harder to store the final state change if it fails.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2801
diff
changeset
|
858 |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
859 return tryHardToStoreState(ctx, func(conn *sql.Conn) error { |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
860 _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
861 return err |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
862 }) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
863 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
864 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
865 func deleteJob(ctx context.Context, id int64) error { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
866 return tryHardToStoreState(ctx, func(conn *sql.Conn) error { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
867 _, err := conn.ExecContext(ctx, deleteJobSQL, id) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
868 return err |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
869 }) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
870 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
871 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
872 func errorAndFail(id int64, format string, args ...interface{}) error { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
873 ctx := context.Background() |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
874 return tryHardToStoreState(ctx, func(conn *sql.Conn) error { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
875 tx, err := conn.BeginTx(ctx, nil) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
876 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
877 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
878 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
879 defer tx.Rollback() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
880 _, err = conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
881 ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
882 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
883 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
884 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
885 _, err = conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
886 ctx, updateStateSQL, "failed", id) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
887 if err == nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
888 err = tx.Commit() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
889 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
890 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
891 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
892 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
893 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
894 func (q *importQueue) importLoop() { |
1000
14425e35e3c2
Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
998
diff
changeset
|
895 config.WaitReady() |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
896 // re-enqueue the jobs that are in state running. |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
897 // They where in progess when the server went down. |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
898 if err := reEnqueueRunning(); err != nil { |
1751
c3a6aaf926c3
Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1740
diff
changeset
|
899 log.Printf("error: re-enqueuing failed: %v", err) |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
900 } |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
901 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
902 for { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
903 var idj *idJob |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
904 var err error |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
905 |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
906 for { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
907 if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows { |
1751
c3a6aaf926c3
Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1740
diff
changeset
|
908 log.Printf("error: db: %v\n", err) |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
909 } |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
910 if idj != nil { |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
911 break |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
912 } |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
913 select { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
914 case cmd := <-q.cmdCh: |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
915 cmd(q) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
916 |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
917 case <-time.After(pollDuration): |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
918 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
919 } |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
920 |
1751
c3a6aaf926c3
Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1740
diff
changeset
|
921 log.Printf("info: starting import #%d\n", idj.id) |
987
3841509f6e9e
Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
979
diff
changeset
|
922 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
923 jc := q.jobCreator(idj.kind) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
924 if jc == nil { |
1010
8f23ec811afb
Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1005
diff
changeset
|
925 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
926 continue |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
927 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
928 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
929 // Lock dependencies. |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
930 q.lockDependencies(jc) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
931 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
932 go func(jc JobCreator, idj *idJob) { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
933 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
934 defer func() { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
935 // Unlock the dependencies. |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
936 q.unlockDependencies(jc) |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
937 // Unlock waiting. |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
938 q.cmdCh <- func(q *importQueue) { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
939 log.Printf("unlock waiting %d\n", idj.id) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
940 if w := q.waiting[idj.id]; w != nil { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
941 close(w) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
942 delete(q.waiting, idj.id) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
943 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
944 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
945 }() |
1010
8f23ec811afb
Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1005
diff
changeset
|
946 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
947 job := jc.Create() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
948 if err := common.FromJSONString(idj.data, job); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
949 errorAndFail(idj.id, "failed to create job for import #%d: %v", |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
950 idj.id, err) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
951 return |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
952 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
953 |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
954 var feedback Feedback |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
955 if fc, ok := job.(FeedbackJob); ok { |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
956 feedback = fc.CreateFeedback(idj.id) |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
957 } else { |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
958 feedback = logFeedback(idj.id) |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
959 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
960 |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
961 ctx := context.Background() |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
962 var summary interface{} |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
963 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
964 errDo := survive(func() error { |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
965 return auth.RunAs(ctx, idj.user, |
1168
930fdd8b474f
Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1138
diff
changeset
|
966 func(conn *sql.Conn) error { |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
967 var err error |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
968 summary, err = job.Do(ctx, idj.id, conn, feedback) |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
969 return err |
1168
930fdd8b474f
Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1138
diff
changeset
|
970 }) |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
971 })() |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
972 |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
973 var unchanged, retry bool |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
974 if v, ok := errDo.(UnchangedError); ok { |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
975 feedback.Info("unchanged: %s", v.Error()) |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
976 unchanged = true |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
977 } else if errDo != nil { |
4180
91cb4a7b1b13
Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents:
4084
diff
changeset
|
978 feedback.Error("error in import: %v", |
91cb4a7b1b13
Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents:
4084
diff
changeset
|
979 pgxutils.ReadableError{Err: errDo}) |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
980 retry = idj.nextRetry(feedback) |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
981 } |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
982 |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
983 var errCleanup error |
2801
054f5d61452d
Import queue: Cleanup up debris after an import in all cases if we are not going to retry it.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2187
diff
changeset
|
984 if !retry { // cleanup debris |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
985 if errCleanup = survive(job.CleanUp)(); errCleanup != nil { |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
986 feedback.Error("error cleanup: %v", errCleanup) |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
987 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
988 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
989 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
990 var remove bool |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
991 if remover, ok := jc.(JobRemover); ok { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
992 remove = remover.RemoveJob() |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
993 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
994 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
995 var state string |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
996 switch { |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
997 case unchanged: |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
998 state = "unchanged" |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
999 case errDo != nil || errCleanup != nil: |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1000 state = "failed" |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
1001 case jc.AutoAccept(): |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
1002 state = "accepted" |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
1003 default: |
1190
e3de65179889
The imort queue has now six states:
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1189
diff
changeset
|
1004 state = "pending" |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1005 } |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1006 if !remove { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1007 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1008 log.Printf("error: setting state of job %d failed: %v\n", idj.id, err) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1009 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1010 log.Printf("info: import #%d finished: %s\n", idj.id, state) |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1011 } |
1646
a0982c38eac0
Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1642
diff
changeset
|
1012 if idj.sendEmail { |
a0982c38eac0
Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1642
diff
changeset
|
1013 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) |
a0982c38eac0
Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1642
diff
changeset
|
1014 } |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1015 |
5115
bb5459faadb7
Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5114
diff
changeset
|
1016 if retry { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
1017 nid, _, err := q.addJob( |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1018 idj.kind, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
1019 idj.nextDue(), |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
1020 idj.triesLeftPointer(), |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
1021 idj.waitRetryPointer(), |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
1022 idj.user, idj.sendEmail, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
1023 idj.data, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
1024 false) |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1025 if err != nil { |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1026 log.Printf("error: retry enqueue failed: %v\n", err) |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1027 } else { |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1028 log.Printf("info: re-enqueued job with id %d\n", nid) |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1029 } |
5115
bb5459faadb7
Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5114
diff
changeset
|
1030 } |
bb5459faadb7
Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5114
diff
changeset
|
1031 if remove { |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1032 if err := deleteJob(ctx, idj.id); err != nil { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1033 log.Printf("error: deleting job %d failed: %v\n", idj.id, err) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1034 } |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1035 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1036 }(jc, idj) |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
1037 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
1038 } |