Mercurial > gemma
annotate pkg/imports/queue.go @ 5124:6910c1cad1fb queued-stage-done
Moved misplaced log line around.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Fri, 27 Mar 2020 00:40:52 +0100 |
parents | eeb45e3e0a5a |
children | adf7b9f1273b |
rev | line source |
---|---|
1017
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
1 // This is Free Software under GNU Affero General Public License v >= 3.0 |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
2 // without warranty, see README.md and license for details. |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
3 // |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
4 // SPDX-License-Identifier: AGPL-3.0-or-later |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
5 // License-Filename: LICENSES/AGPL-3.0.txt |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
6 // |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
7 // Copyright (C) 2018 by via donau |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
8 // – Österreichische Wasserstraßen-Gesellschaft mbH |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
9 // Software engineering by Intevation GmbH |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
10 // |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
11 // Author(s): |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
13 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
14 package imports |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
15 |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
16 import ( |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
17 "context" |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
18 "database/sql" |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
19 "encoding/json" |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
20 "fmt" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
21 "log" |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
22 "runtime/debug" |
3246
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
23 "sort" |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
24 "strings" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
25 "sync" |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
26 "time" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
27 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
28 "github.com/jackc/pgx/pgtype" |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
29 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
30 "gemma.intevation.de/gemma/pkg/auth" |
2187
7c83b5277c1c
Import queue: Removed boilerplate code to deserialize jobs from JSON by making it part of the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2168
diff
changeset
|
31 "gemma.intevation.de/gemma/pkg/common" |
1000
14425e35e3c2
Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
998
diff
changeset
|
32 "gemma.intevation.de/gemma/pkg/config" |
4180
91cb4a7b1b13
Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents:
4084
diff
changeset
|
33 "gemma.intevation.de/gemma/pkg/pgxutils" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
34 ) |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
35 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
36 type ( |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
37 // Feedback is passed to the Do method of a Job to log |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
38 // informations, warnings or errors. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
39 Feedback interface { |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
40 // Info logs informations. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
41 Info(fmt string, args ...interface{}) |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
42 // Warn logs warnings. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
43 Warn(fmt string, args ...interface{}) |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
44 // Error logs errors. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
45 Error(fmt string, args ...interface{}) |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
46 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
47 |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
48 // UnchangedError may be issued by Do of a Job to indicate |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
49 // That the database has not changed. |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
50 UnchangedError string |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
51 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
52 // Job is the central abstraction of an import job |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
53 // run by the import queue. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
54 Job interface { |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
55 // Do is called to do the actual import. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
56 // Bind transactions to ctx and conn, please- |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
57 // id is the number of the import job. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
58 // feedback can be used to log the import process. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
59 // If no error is return the import is assumed to |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
60 // be successfull. The non-error return value is |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
61 // serialized as a JSON string into the database as |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
62 // a summary to the import to be used by the review process. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
63 Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error) |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
64 // CleanUp is called to clean up ressources hold by the import. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
65 // It is called whether the import succeeded or not. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
66 CleanUp() error |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
67 } |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
68 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
69 FeedbackJob interface { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
70 Job |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
71 CreateFeedback(int64) Feedback |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
72 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
73 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
74 // JobKind is the type of an import. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
75 // Choose a unique name for every import. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
76 JobKind string |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
77 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
78 // JobCreator is used to bring a job to life as it is stored |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
79 // in pure meta-data form to the database. |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
80 JobCreator interface { |
5104
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
81 // Description is the long name of the import. |
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
82 Description() string |
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
83 // Create build the actual job. |
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
84 Create() Job |
3219
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
85 // Depends returns two lists of ressources locked by this type of import. |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
86 // Imports are run concurrently if they have disjoint sets |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
87 // of dependencies. |
3219
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
88 // The first list are locked exclusively. |
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
89 // The second allows multiple read users but only one writing one. |
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
90 Depends() [2][]string |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
91 // StageDone is called if an import is positively reviewed |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
92 // (state = accepted). This can be used to finalize the imported |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
93 // data to move it e.g from the staging area. |
5034
59a99655f34d
Added feedback support for StageDone.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5033
diff
changeset
|
94 StageDone(context.Context, *sql.Tx, int64, Feedback) error |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
95 // AutoAccept indicates that imports of this kind |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
96 // don't need a review. |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
97 AutoAccept() bool |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
98 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
99 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
100 JobRemover interface { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
101 JobCreator |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
102 RemoveJob() bool |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
103 } |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
104 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
105 idJob struct { |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
106 id int64 |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
107 kind JobKind |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
108 user string |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
109 waitRetry pgtype.Interval |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
110 triesLeft sql.NullInt64 |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
111 sendEmail bool |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
112 data string |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
113 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
114 ) |
987
3841509f6e9e
Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
979
diff
changeset
|
115 |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
116 const ( |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
117 pollDuration = time.Second * 10 |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
118 runExclusive = -66666 |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
119 ) |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
120 |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
121 const ( |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
122 ReviewJobSuffix = "#review" |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
123 reviewJobRetries = 10 |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
124 reviewJobWait = time.Minute |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
125 ) |
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
126 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
127 type importQueue struct { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
128 cmdCh chan func(*importQueue) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
129 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
130 creatorsMu sync.Mutex |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
131 creators map[JobKind]JobCreator |
3221
899914a18d7e
Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3219
diff
changeset
|
132 usedDeps map[string]int |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
133 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
134 waiting map[int64]chan struct{} |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
135 } |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
136 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
137 var iqueue = importQueue{ |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
138 cmdCh: make(chan func(*importQueue)), |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
139 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
140 creators: map[JobKind]JobCreator{}, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
141 usedDeps: map[string]int{}, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
142 waiting: make(map[int64]chan struct{}), |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
143 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
144 |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
145 var ( |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
146 // ImportStateNames is a list of the states a job can be in. |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
147 ImportStateNames = []string{ |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
148 "queued", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
149 "running", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
150 "failed", |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
151 "unchanged", |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
152 "pending", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
153 "accepted", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
154 "declined", |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
155 "reviewed", |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
156 } |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
157 ) |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
158 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
159 const ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
160 queueUser = "sys_admin" |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
161 |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
162 reEnqueueRunningSQL = ` |
4748
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
163 UPDATE import.imports SET |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
164 state = 'queued'::import_state, |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
165 changed = CURRENT_TIMESTAMP |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
166 WHERE state = 'running'::import_state` |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
167 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
168 insertJobSQL = ` |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
169 INSERT INTO import.imports ( |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
170 kind, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
171 due, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
172 trys_left, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
173 retry_wait, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
174 username, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
175 send_email, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
176 data |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
177 ) VALUES ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
178 $1, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
179 COALESCE($2, CURRENT_TIMESTAMP), |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
180 $3, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
181 $4, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
182 $5, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
183 $6, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
184 $7 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
185 ) RETURNING id` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
186 |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
187 // Select oldest queued job but prioritize review jobs |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
188 selectJobSQL = ` |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
189 SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `') |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
190 id, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
191 kind, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
192 trys_left, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
193 retry_wait, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
194 username, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
195 send_email, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
196 data |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
197 FROM import.imports |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
198 WHERE |
1718
8ddbedf296d7
Re-scheduled imports: be a bit more tolerant about start time.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1708
diff
changeset
|
199 due <= CURRENT_TIMESTAMP + interval '5 seconds' AND |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
200 state = 'queued'::import_state AND |
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
201 kind = ANY($1) |
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
202 ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
203 LIMIT 1` |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
204 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
205 updateStateSQL = ` |
4748
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
206 UPDATE import.imports SET |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
207 state = $1::import_state, |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
208 changed = CURRENT_TIMESTAMP |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
209 WHERE id = $2` |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
210 |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
211 updateStateSummarySQL = ` |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
212 UPDATE import.imports SET |
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
213 state = $1::import_state, |
4748
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
214 changed = CURRENT_TIMESTAMP, |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
215 summary = $2 |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
216 WHERE id = $3` |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
217 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
218 deleteJobSQL = ` |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
219 DELETE FROM import.imports WHERE id = $1` |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
220 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
221 logMessageSQL = ` |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
222 INSERT INTO import.import_logs ( |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
223 import_id, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
224 kind, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
225 msg |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
226 ) VALUES ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
227 $1, |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
228 $2::log_type, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
229 $3 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
230 )` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
231 ) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
232 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
233 func init() { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
234 go iqueue.importLoop() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
235 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
236 |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
237 // Error makes UnchangedError an error. |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
238 func (ue UnchangedError) Error() string { |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
239 return string(ue) |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
240 } |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
241 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
242 type reviewedJobCreator struct { |
5101
1b0b13e70bc1
Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5100
diff
changeset
|
243 jobCreator JobCreator |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
244 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
245 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
246 func (*reviewedJobCreator) AutoAccept() bool { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
247 return true |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
248 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
249 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
250 func (*reviewedJobCreator) RemoveJob() bool { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
251 return true |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
252 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
253 |
5101
1b0b13e70bc1
Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5100
diff
changeset
|
254 func (rjc *reviewedJobCreator) Depends() [2][]string { |
5102
8cc5b08ffc2b
End endless recursion.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5101
diff
changeset
|
255 return rjc.jobCreator.Depends() |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
256 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
257 |
5101
1b0b13e70bc1
Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5100
diff
changeset
|
258 func (rjc *reviewedJobCreator) Description() string { |
5120
22899babe85d
Catch another endless recursin call.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5119
diff
changeset
|
259 return rjc.jobCreator.Description() + ReviewJobSuffix |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
260 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
261 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
262 func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
263 return nil |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
264 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
265 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
266 type reviewedJob struct { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
267 ID int64 `json:"id"` |
5103
a11705203f3f
Nmae json field in reviewed job correctly.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5102
diff
changeset
|
268 Accepted bool `json:"accepted"` |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
269 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
270 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
271 func (*reviewedJobCreator) Create() Job { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
272 return new(reviewedJob) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
273 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
274 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
275 func (*reviewedJob) CleanUp() error { return nil } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
276 |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
277 func (r *reviewedJob) CreateFeedback(int64) Feedback { |
5119
5f62ac3db148
Don't write "import #42 started" into imports logs and remove the supress code to prevent this in review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5117
diff
changeset
|
278 return logFeedback(r.ID) |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
279 } |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
280 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
281 func (rj *reviewedJob) Do( |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
282 ctx context.Context, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
283 importID int64, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
284 conn *sql.Conn, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
285 feedback Feedback, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
286 ) (interface{}, error) { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
287 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
288 tx, err := conn.BeginTx(ctx, nil) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
289 if err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
290 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
291 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
292 defer tx.Rollback() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
293 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
294 var signer string |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
295 if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
296 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
297 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
298 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
299 var user, kind string |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
300 if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
301 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
302 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
303 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
304 jc := FindJobCreator(JobKind(kind)) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
305 if jc == nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
306 return nil, fmt.Errorf("no job creator found for '%s'", kind) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
307 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
308 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
309 importFeedback := logFeedback(rj.ID) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
310 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
311 if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
312 userTx, err := conn.BeginTx(ctx, nil) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
313 if err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
314 return err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
315 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
316 defer userTx.Rollback() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
317 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
318 if rj.Accepted { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
319 err = jc.StageDone(ctx, userTx, rj.ID, importFeedback) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
320 } else { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
321 _, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
322 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
323 if err == nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
324 err = userTx.Commit() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
325 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
326 return err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
327 }); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
328 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
329 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
330 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
331 // Remove the import track |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
332 if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
333 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
334 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
335 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
336 var state string |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
337 if rj.Accepted { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
338 state = "accepted" |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
339 } else { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
340 state = "declined" |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
341 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
342 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
343 if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
344 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
345 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
346 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
347 importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
348 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
349 return nil, tx.Commit() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
350 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
351 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
352 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
353 q.creatorsMu.Lock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
354 defer q.creatorsMu.Unlock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
355 q.creators[kind] = jc |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
356 q.creators[kind+ReviewJobSuffix] = &reviewedJobCreator{jobCreator: jc} |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
357 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
358 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
359 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
360 // FindJobCreator looks up a JobCreator in the global import queue. |
1193
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
361 func FindJobCreator(kind JobKind) JobCreator { |
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
362 return iqueue.jobCreator(kind) |
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
363 } |
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
364 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
365 // ImportKindNames is a list of the names of the imports the |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
366 // global import queue supports. |
1495
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
367 func ImportKindNames() []string { |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
368 return iqueue.importKindNames() |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
369 } |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
370 |
3246
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
371 // LogImportKindNames logs a list of importer types registered |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
372 // to the global import queue. |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
373 func LogImportKindNames() { |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
374 kinds := ImportKindNames() |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
375 sort.Strings(kinds) |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
376 log.Printf("info: registered import kinds: %s", |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
377 strings.Join(kinds, ", ")) |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
378 } |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
379 |
1694
4a2fad8f57de
Imports: Resolved golint issues unrelated to exported symbols commenting.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
1657
diff
changeset
|
380 // HasImportKindName checks if the import queue supports a given kind. |
1627
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
381 func HasImportKindName(kind string) bool { |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
382 return iqueue.hasImportKindName(kind) |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
383 } |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
384 |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
385 // |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
386 func (q *importQueue) hasImportKindName(kind string) bool { |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
387 q.creatorsMu.Lock() |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
388 defer q.creatorsMu.Unlock() |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
389 return q.creators[JobKind(kind)] != nil |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
390 } |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
391 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
392 // RegisterJobCreator adds a JobCreator to the global import queue. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
393 // This a good candidate to be called in a init function for |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
394 // a particular JobCreator. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
395 func RegisterJobCreator(kind JobKind, jc JobCreator) { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
396 iqueue.registerJobCreator(kind, jc) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
397 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
398 |
1495
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
399 func (q *importQueue) importKindNames() []string { |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
400 q.creatorsMu.Lock() |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
401 defer q.creatorsMu.Unlock() |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
402 names := make([]string, len(q.creators)) |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
403 var i int |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
404 for kind := range q.creators { |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
405 names[i] = string(kind) |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
406 i++ |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
407 } |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
408 // XXX: Consider using sort.Strings to make output deterministic. |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
409 return names |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
410 } |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
411 |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
412 func (idj *idJob) nextRetry(feedback Feedback) bool { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
413 switch { |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
414 case idj.waitRetry.Status != pgtype.Present && !idj.triesLeft.Valid: |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
415 return false |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
416 case idj.waitRetry.Status == pgtype.Present && !idj.triesLeft.Valid: |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
417 return true |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
418 case idj.triesLeft.Valid: |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
419 if idj.triesLeft.Int64 < 1 { |
5114
da26076ffafe
More neutral messages in retries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5113
diff
changeset
|
420 feedback.Warn("no retries left") |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
421 } else { |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
422 idj.triesLeft.Int64-- |
5114
da26076ffafe
More neutral messages in retries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5113
diff
changeset
|
423 feedback.Info("failed but will retry") |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
424 return true |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
425 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
426 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
427 return false |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
428 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
429 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
430 func (idj *idJob) nextDue() time.Time { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
431 now := time.Now() |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
432 if idj.waitRetry.Status == pgtype.Present { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
433 var d time.Duration |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
434 if err := idj.waitRetry.AssignTo(&d); err != nil { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
435 log.Printf("error: converting waitRetry failed: %v\n", err) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
436 } else { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
437 now = now.Add(d) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
438 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
439 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
440 return now |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
441 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
442 |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
443 func (idj *idJob) triesLeftPointer() *int { |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
444 if !idj.triesLeft.Valid { |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
445 return nil |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
446 } |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
447 t := int(idj.triesLeft.Int64) |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
448 return &t |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
449 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
450 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
451 func (idj *idJob) waitRetryPointer() *time.Duration { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
452 if idj.waitRetry.Status != pgtype.Present { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
453 return nil |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
454 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
455 d := new(time.Duration) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
456 if err := idj.waitRetry.AssignTo(d); err != nil { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
457 log.Printf("error: converting waitRetry failed: %v\n", err) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
458 return nil |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
459 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
460 return d |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
461 } |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
462 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
463 func (q *importQueue) lockDependencies(jc JobCreator) { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
464 deps := jc.Depends() |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
465 q.creatorsMu.Lock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
466 defer q.creatorsMu.Unlock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
467 for _, d := range deps[0] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
468 q.usedDeps[d] = runExclusive |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
469 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
470 for _, d := range deps[1] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
471 q.usedDeps[d]++ |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
472 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
473 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
474 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
475 func (q *importQueue) unlockDependencies(jc JobCreator) { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
476 deps := jc.Depends() |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
477 q.creatorsMu.Lock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
478 defer q.creatorsMu.Unlock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
479 for _, d := range deps[0] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
480 q.usedDeps[d] = 0 |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
481 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
482 for _, d := range deps[1] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
483 q.usedDeps[d]-- |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
484 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
485 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
486 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
487 func (q *importQueue) jobCreator(kind JobKind) JobCreator { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
488 q.creatorsMu.Lock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
489 defer q.creatorsMu.Unlock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
490 return q.creators[kind] |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
491 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
492 |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
493 func (q *importQueue) addJob( |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
494 kind JobKind, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
495 due time.Time, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
496 triesLeft *int, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
497 waitRetry *time.Duration, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
498 user string, |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
499 sendEmail bool, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
500 data string, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
501 sync bool, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
502 ) (int64, chan struct{}, error) { |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
503 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
504 var id int64 |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
505 if due.IsZero() { |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
506 due = time.Now() |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
507 } |
4084
350a24c92848
Deliver times from import queue in UTC.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3246
diff
changeset
|
508 due = due.UTC() |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
509 |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
510 var tl sql.NullInt64 |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
511 if triesLeft != nil { |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
512 tl = sql.NullInt64{Int64: int64(*triesLeft), Valid: true} |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
513 } |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
514 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
515 var wr pgtype.Interval |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
516 if waitRetry != nil { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
517 if err := wr.Set(*waitRetry); err != nil { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
518 return 0, nil, err |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
519 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
520 } else { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
521 wr = pgtype.Interval{Status: pgtype.Null} |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
522 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
523 |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
524 errCh := make(chan error) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
525 var done chan struct{} |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
526 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
527 q.cmdCh <- func(q *importQueue) { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
528 ctx := context.Background() |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
529 errCh <- auth.RunAs(ctx, user, func(conn *sql.Conn) error { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
530 err := conn.QueryRowContext( |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
531 ctx, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
532 insertJobSQL, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
533 string(kind), |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
534 due, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
535 tl, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
536 &wr, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
537 user, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
538 sendEmail, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
539 data).Scan(&id) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
540 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
541 if err == nil && sync { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
542 log.Printf("info: register wait for %d\n", id) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
543 done = make(chan struct{}) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
544 q.waiting[id] = done |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
545 } |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
546 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
547 return err |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
548 }) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
549 } |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
550 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
551 return id, done, <-errCh |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
552 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
553 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
554 // AddJob adds a job to the global import queue to be executed |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
555 // as soon as possible after due. |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
556 // This is gone in a separate Go routine |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
557 // so this will not block. |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
558 func AddJob( |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
559 kind JobKind, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
560 due time.Time, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
561 triesLeft *int, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
562 waitRetry *time.Duration, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
563 user string, |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
564 sendEmail bool, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
565 data string, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
566 ) (int64, error) { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
567 id, _, err := iqueue.addJob( |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
568 kind, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
569 due, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
570 triesLeft, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
571 waitRetry, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
572 user, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
573 sendEmail, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
574 data, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
575 false) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
576 return id, err |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
577 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
578 |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
579 const ( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
580 isPendingSQL = ` |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
581 SELECT |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
582 state = 'pending'::import_state, |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
583 kind |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
584 FROM import.imports |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
585 WHERE id = $1` |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
586 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
587 selectUserSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
588 SELECT username from import.imports WHERE ID = $1` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
589 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
590 selectUserKindSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
591 SELECT username, kind from import.imports WHERE ID = $1` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
592 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
593 reviewSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
594 UPDATE import.imports SET |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
595 state = $1::import_state, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
596 changed = CURRENT_TIMESTAMP, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
597 signer = $2 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
598 WHERE id = $3` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
599 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
600 deleteImportDataSQL = `SELECT import.del_import($1)` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
601 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
602 deleteImportTrackSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
603 DELETE FROM import.track_imports WHERE import_id = $1` |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
604 ) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
605 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
606 func (q *importQueue) decideImportTx( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
607 ctx context.Context, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
608 tx *sql.Tx, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
609 id int64, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
610 accepted bool, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
611 reviewer string, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
612 ) (chan struct{}, error) { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
613 var ( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
614 pending bool |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
615 kind string |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
616 ) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
617 |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
618 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
619 case err == sql.ErrNoRows: |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
620 return nil, fmt.Errorf("cannot find import #%d", id) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
621 case err != nil: |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
622 return nil, err |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
623 case !pending: |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
624 return nil, fmt.Errorf("#%d is not pending", id) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
625 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
626 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
627 jc := q.jobCreator(JobKind(kind)) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
628 if jc == nil { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
629 return nil, fmt.Errorf("no job creator for kind '%s'", kind) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
630 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
631 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
632 r := &reviewedJob{ |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
633 ID: id, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
634 Accepted: accepted, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
635 } |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
636 serialized, err := common.ToJSONString(r) |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
637 if err != nil { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
638 return nil, err |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
639 } |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
640 |
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
641 // Try a little harder to persist the decision. |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
642 tries := reviewJobRetries |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
643 wait := reviewJobWait |
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
644 |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
645 rID, done, err := q.addJob( |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
646 JobKind(kind+ReviewJobSuffix), |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
647 time.Now(), |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
648 &tries, |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
649 &wait, |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
650 reviewer, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
651 false, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
652 serialized, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
653 true) |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
654 if err != nil { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
655 return nil, err |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
656 } |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
657 log.Printf("info: add review job %d\n", rID) |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
658 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id) |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
659 if err != nil && done != nil { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
660 go func() { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
661 q.cmdCh <- func(q *importQueue) { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
662 delete(q.waiting, rID) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
663 } |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
664 }() |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
665 done = nil |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
666 } |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
667 return done, err |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
668 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
669 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
670 func (q *importQueue) decideImport( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
671 ctx context.Context, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
672 id int64, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
673 accepted bool, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
674 reviewer string, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
675 ) error { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
676 if ctx == nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
677 ctx = context.Background() |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
678 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
679 |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
680 var done chan struct{} |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
681 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
682 if err := auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
683 tx, err := conn.BeginTx(ctx, nil) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
684 if err != nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
685 return err |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
686 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
687 defer tx.Rollback() |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
688 done, err = q.decideImportTx(ctx, tx, id, accepted, reviewer) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
689 if err == nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
690 err = tx.Commit() |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
691 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
692 return err |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
693 }); err != nil { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
694 return err |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
695 } |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
696 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
697 <-done |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
698 return nil |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
699 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
700 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
701 func DecideImport( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
702 ctx context.Context, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
703 id int64, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
704 accepted bool, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
705 reviewer string, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
706 ) error { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
707 return iqueue.decideImport(ctx, id, accepted, reviewer) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
708 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
709 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
710 type logFeedback int64 |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
711 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
712 func (lf logFeedback) log(kind, format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
713 ctx := context.Background() |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
714 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
715 _, err := conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
716 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
717 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
718 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
719 if err != nil { |
1760
22148eb0f986
More on harmonizing logging.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1758
diff
changeset
|
720 log.Printf("error: logging failed: %v\n", err) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
721 } |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
722 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
723 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
724 func (lf logFeedback) Info(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
725 lf.log("info", format, args...) |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
726 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
727 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
728 func (lf logFeedback) Warn(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
729 lf.log("warn", format, args...) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
730 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
731 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
732 func (lf logFeedback) Error(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
733 lf.log("error", format, args...) |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
734 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
735 |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
736 func survive(fn func() error) func() error { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
737 return func() (err error) { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
738 defer func() { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
739 if p := recover(); p != nil { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
740 err = fmt.Errorf("%v: %s", p, string(debug.Stack())) |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
741 } |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
742 }() |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
743 return fn() |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
744 } |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
745 } |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
746 |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
747 func reEnqueueRunning() error { |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
748 ctx := context.Background() |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
749 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
750 _, err := conn.ExecContext(ctx, reEnqueueRunningSQL) |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
751 return err |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
752 }) |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
753 } |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
754 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
755 func (q *importQueue) fetchJob() (*idJob, error) { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
756 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
757 var which []string |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
758 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
759 q.creatorsMu.Lock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
760 nextCreator: |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
761 for kind, jc := range q.creators { |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
762 deps := jc.Depends() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
763 for _, d := range deps[0] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
764 if q.usedDeps[d] != 0 { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
765 continue nextCreator |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
766 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
767 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
768 for _, d := range deps[1] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
769 if q.usedDeps[d] == runExclusive { |
3221
899914a18d7e
Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3219
diff
changeset
|
770 continue nextCreator |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
771 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
772 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
773 which = append(which, string(kind)) |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
774 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
775 q.creatorsMu.Unlock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
776 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
777 if len(which) == 0 { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
778 return nil, sql.ErrNoRows |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
779 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
780 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
781 var kinds pgtype.TextArray |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
782 if err := kinds.Set(which); err != nil { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
783 return nil, err |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
784 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
785 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
786 var ji idJob |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
787 ctx := context.Background() |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
788 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
789 tx, err := conn.BeginTx(ctx, nil) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
790 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
791 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
792 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
793 defer tx.Rollback() |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
794 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
795 &ji.id, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
796 &ji.kind, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
797 &ji.triesLeft, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
798 &ji.waitRetry, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
799 &ji.user, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
800 &ji.sendEmail, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
801 &ji.data, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
802 ); err != nil { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
803 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
804 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
805 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
806 if err == nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
807 err = tx.Commit() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
808 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
809 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
810 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
811 switch { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
812 case err == sql.ErrNoRows: |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
813 return nil, nil |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
814 case err != nil: |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
815 return nil, err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
816 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
817 return &ji, nil |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
818 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
819 |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
820 func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
821 // As it is important to keep the persistent model |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
822 // in sync with the in-memory model try harder to store |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
823 // the state. |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
824 const maxTries = 10 |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
825 var sleep = time.Second |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
826 |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
827 for try := 1; ; try++ { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
828 var err error |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
829 if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
830 return err |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
831 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
832 log.Printf("warn: [try %d/%d] Storing state failed: %v (try again in %s).\n", |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
833 try, maxTries, err, sleep) |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
834 |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
835 time.Sleep(sleep) |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
836 if sleep < time.Minute { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
837 if sleep *= 2; sleep > time.Minute { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
838 sleep = time.Minute |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
839 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
840 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
841 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
842 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
843 |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
844 func updateStateSummary( |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
845 ctx context.Context, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
846 id int64, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
847 state string, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
848 summary interface{}, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
849 ) error { |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
850 var s sql.NullString |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
851 if summary != nil { |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
852 var b strings.Builder |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
853 if err := json.NewEncoder(&b).Encode(summary); err != nil { |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
854 return err |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
855 } |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
856 s = sql.NullString{String: b.String(), Valid: true} |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
857 } |
2823
b150e5b37afe
Import queue: As it is important to keep the in-memory and the persistent model in sync try harder to store the final state change if it fails.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2801
diff
changeset
|
858 |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
859 return tryHardToStoreState(ctx, func(conn *sql.Conn) error { |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
860 _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
861 return err |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
862 }) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
863 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
864 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
865 func deleteJob(ctx context.Context, id int64) error { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
866 return tryHardToStoreState(ctx, func(conn *sql.Conn) error { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
867 _, err := conn.ExecContext(ctx, deleteJobSQL, id) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
868 return err |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
869 }) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
870 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
871 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
872 func errorAndFail(id int64, format string, args ...interface{}) error { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
873 ctx := context.Background() |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
874 return tryHardToStoreState(ctx, func(conn *sql.Conn) error { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
875 tx, err := conn.BeginTx(ctx, nil) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
876 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
877 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
878 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
879 defer tx.Rollback() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
880 _, err = conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
881 ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
882 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
883 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
884 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
885 _, err = conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
886 ctx, updateStateSQL, "failed", id) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
887 if err == nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
888 err = tx.Commit() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
889 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
890 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
891 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
892 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
893 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
894 func (q *importQueue) importLoop() { |
1000
14425e35e3c2
Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
998
diff
changeset
|
895 config.WaitReady() |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
896 // re-enqueue the jobs that are in state running. |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
897 // They where in progess when the server went down. |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
898 if err := reEnqueueRunning(); err != nil { |
1751
c3a6aaf926c3
Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1740
diff
changeset
|
899 log.Printf("error: re-enqueuing failed: %v", err) |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
900 } |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
901 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
902 for { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
903 var idj *idJob |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
904 var err error |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
905 |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
906 for { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
907 if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows { |
1751
c3a6aaf926c3
Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1740
diff
changeset
|
908 log.Printf("error: db: %v\n", err) |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
909 } |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
910 if idj != nil { |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
911 break |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
912 } |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
913 select { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
914 case cmd := <-q.cmdCh: |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
915 cmd(q) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
916 |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
917 case <-time.After(pollDuration): |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
918 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
919 } |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
920 |
1751
c3a6aaf926c3
Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1740
diff
changeset
|
921 log.Printf("info: starting import #%d\n", idj.id) |
987
3841509f6e9e
Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
979
diff
changeset
|
922 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
923 jc := q.jobCreator(idj.kind) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
924 if jc == nil { |
1010
8f23ec811afb
Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1005
diff
changeset
|
925 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
926 continue |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
927 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
928 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
929 // Lock dependencies. |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
930 q.lockDependencies(jc) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
931 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
932 go func(jc JobCreator, idj *idJob) { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
933 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
934 defer func() { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
935 // Unlock the dependencies. |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
936 q.unlockDependencies(jc) |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
937 // Unlock waiting. |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
938 q.cmdCh <- func(q *importQueue) { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
939 if w := q.waiting[idj.id]; w != nil { |
5124
6910c1cad1fb
Moved misplaced log line around.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5123
diff
changeset
|
940 log.Printf("info: unlock waiting %d\n", idj.id) |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
941 close(w) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
942 delete(q.waiting, idj.id) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
943 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
944 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
945 }() |
1010
8f23ec811afb
Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1005
diff
changeset
|
946 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
947 job := jc.Create() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
948 if err := common.FromJSONString(idj.data, job); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
949 errorAndFail(idj.id, "failed to create job for import #%d: %v", |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
950 idj.id, err) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
951 return |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
952 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
953 |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
954 var feedback Feedback |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
955 if fc, ok := job.(FeedbackJob); ok { |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
956 feedback = fc.CreateFeedback(idj.id) |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
957 } else { |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
958 feedback = logFeedback(idj.id) |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
959 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
960 |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
961 ctx := context.Background() |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
962 var summary interface{} |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
963 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
964 errDo := survive(func() error { |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
965 return auth.RunAs(ctx, idj.user, |
1168
930fdd8b474f
Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1138
diff
changeset
|
966 func(conn *sql.Conn) error { |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
967 var err error |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
968 summary, err = job.Do(ctx, idj.id, conn, feedback) |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
969 return err |
1168
930fdd8b474f
Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1138
diff
changeset
|
970 }) |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
971 })() |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
972 |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
973 var unchanged, retry bool |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
974 if v, ok := errDo.(UnchangedError); ok { |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
975 feedback.Info("unchanged: %s", v.Error()) |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
976 unchanged = true |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
977 } else if errDo != nil { |
4180
91cb4a7b1b13
Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents:
4084
diff
changeset
|
978 feedback.Error("error in import: %v", |
91cb4a7b1b13
Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents:
4084
diff
changeset
|
979 pgxutils.ReadableError{Err: errDo}) |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
980 retry = idj.nextRetry(feedback) |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
981 } |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
982 |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
983 var errCleanup error |
2801
054f5d61452d
Import queue: Cleanup up debris after an import in all cases if we are not going to retry it.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2187
diff
changeset
|
984 if !retry { // cleanup debris |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
985 if errCleanup = survive(job.CleanUp)(); errCleanup != nil { |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
986 feedback.Error("error cleanup: %v", errCleanup) |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
987 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
988 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
989 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
990 var remove bool |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
991 if remover, ok := jc.(JobRemover); ok { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
992 remove = remover.RemoveJob() |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
993 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
994 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
995 var state string |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
996 switch { |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
997 case unchanged: |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
998 state = "unchanged" |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
999 case errDo != nil || errCleanup != nil: |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1000 state = "failed" |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
1001 case jc.AutoAccept(): |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
1002 state = "accepted" |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
1003 default: |
1190
e3de65179889
The imort queue has now six states:
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1189
diff
changeset
|
1004 state = "pending" |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1005 } |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1006 if !remove { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1007 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1008 log.Printf("error: setting state of job %d failed: %v\n", idj.id, err) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1009 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1010 log.Printf("info: import #%d finished: %s\n", idj.id, state) |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1011 } |
1646
a0982c38eac0
Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1642
diff
changeset
|
1012 if idj.sendEmail { |
a0982c38eac0
Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1642
diff
changeset
|
1013 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) |
a0982c38eac0
Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1642
diff
changeset
|
1014 } |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1015 |
5115
bb5459faadb7
Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5114
diff
changeset
|
1016 if retry { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
1017 nid, _, err := q.addJob( |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1018 idj.kind, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
1019 idj.nextDue(), |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
1020 idj.triesLeftPointer(), |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
1021 idj.waitRetryPointer(), |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
1022 idj.user, idj.sendEmail, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
1023 idj.data, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
1024 false) |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1025 if err != nil { |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1026 log.Printf("error: retry enqueue failed: %v\n", err) |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1027 } else { |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1028 log.Printf("info: re-enqueued job with id %d\n", nid) |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1029 } |
5115
bb5459faadb7
Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5114
diff
changeset
|
1030 } |
bb5459faadb7
Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5114
diff
changeset
|
1031 if remove { |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1032 if err := deleteJob(ctx, idj.id); err != nil { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1033 log.Printf("error: deleting job %d failed: %v\n", idj.id, err) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1034 } |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1035 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1036 }(jc, idj) |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
1037 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
1038 } |