Mercurial > gemma
annotate pkg/imports/queue.go @ 5122:0b6b62d247e8 queued-stage-done
Prioritize review jobs on selection
This reverts rev. 37784b70eea3 and instead moves review jobs forward
in the queue when fetching the next job to be run. Also optimized
index setup for filtering by state but not enqueued.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 26 Mar 2020 14:41:23 +0100 |
parents | 8bfc71eecde1 |
children | eeb45e3e0a5a |
rev | line source |
---|---|
1017
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
1 // This is Free Software under GNU Affero General Public License v >= 3.0 |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
2 // without warranty, see README.md and license for details. |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
3 // |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
4 // SPDX-License-Identifier: AGPL-3.0-or-later |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
5 // License-Filename: LICENSES/AGPL-3.0.txt |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
6 // |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
7 // Copyright (C) 2018 by via donau |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
8 // – Österreichische Wasserstraßen-Gesellschaft mbH |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
9 // Software engineering by Intevation GmbH |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
10 // |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
11 // Author(s): |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
13 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
14 package imports |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
15 |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
16 import ( |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
17 "context" |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
18 "database/sql" |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
19 "encoding/json" |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
20 "fmt" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
21 "log" |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
22 "runtime/debug" |
3246
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
23 "sort" |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
24 "strings" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
25 "sync" |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
26 "time" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
27 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
28 "github.com/jackc/pgx/pgtype" |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
29 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
30 "gemma.intevation.de/gemma/pkg/auth" |
2187
7c83b5277c1c
Import queue: Removed boilerplate code to deserialize jobs from JSON by making it part of the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2168
diff
changeset
|
31 "gemma.intevation.de/gemma/pkg/common" |
1000
14425e35e3c2
Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
998
diff
changeset
|
32 "gemma.intevation.de/gemma/pkg/config" |
4180
91cb4a7b1b13
Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents:
4084
diff
changeset
|
33 "gemma.intevation.de/gemma/pkg/pgxutils" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
34 ) |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
35 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
36 type ( |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
37 // Feedback is passed to the Do method of a Job to log |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
38 // informations, warnings or errors. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
39 Feedback interface { |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
40 // Info logs informations. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
41 Info(fmt string, args ...interface{}) |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
42 // Warn logs warnings. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
43 Warn(fmt string, args ...interface{}) |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
44 // Error logs errors. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
45 Error(fmt string, args ...interface{}) |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
46 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
47 |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
48 // UnchangedError may be issued by Do of a Job to indicate |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
49 // That the database has not changed. |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
50 UnchangedError string |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
51 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
52 // Job is the central abstraction of an import job |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
53 // run by the import queue. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
54 Job interface { |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
55 // Do is called to do the actual import. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
56 // Bind transactions to ctx and conn, please- |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
57 // id is the number of the import job. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
58 // feedback can be used to log the import process. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
59 // If no error is return the import is assumed to |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
60 // be successfull. The non-error return value is |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
61 // serialized as a JSON string into the database as |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
62 // a summary to the import to be used by the review process. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
63 Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error) |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
64 // CleanUp is called to clean up ressources hold by the import. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
65 // It is called whether the import succeeded or not. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
66 CleanUp() error |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
67 } |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
68 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
69 FeedbackJob interface { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
70 Job |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
71 CreateFeedback(int64) Feedback |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
72 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
73 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
74 // JobKind is the type of an import. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
75 // Choose a unique name for every import. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
76 JobKind string |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
77 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
78 // JobCreator is used to bring a job to life as it is stored |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
79 // in pure meta-data form to the database. |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
80 JobCreator interface { |
5104
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
81 // Description is the long name of the import. |
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
82 Description() string |
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
83 // Create build the actual job. |
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
84 Create() Job |
3219
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
85 // Depends returns two lists of ressources locked by this type of import. |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
86 // Imports are run concurrently if they have disjoint sets |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
87 // of dependencies. |
3219
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
88 // The first list are locked exclusively. |
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
89 // The second allows multiple read users but only one writing one. |
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
90 Depends() [2][]string |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
91 // StageDone is called if an import is positively reviewed |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
92 // (state = accepted). This can be used to finalize the imported |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
93 // data to move it e.g from the staging area. |
5034
59a99655f34d
Added feedback support for StageDone.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5033
diff
changeset
|
94 StageDone(context.Context, *sql.Tx, int64, Feedback) error |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
95 // AutoAccept indicates that imports of this kind |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
96 // don't need a review. |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
97 AutoAccept() bool |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
98 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
99 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
100 JobRemover interface { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
101 JobCreator |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
102 RemoveJob() bool |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
103 } |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
104 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
105 idJob struct { |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
106 id int64 |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
107 kind JobKind |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
108 user string |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
109 waitRetry pgtype.Interval |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
110 triesLeft sql.NullInt64 |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
111 sendEmail bool |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
112 data string |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
113 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
114 ) |
987
3841509f6e9e
Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
979
diff
changeset
|
115 |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
116 const ( |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
117 pollDuration = time.Second * 10 |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
118 runExclusive = -66666 |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
119 ) |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
120 |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
121 const ( |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
122 ReviewJobSuffix = "#review" |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
123 reviewJobRetries = 10 |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
124 reviewJobWait = time.Minute |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
125 ) |
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
126 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
127 type importQueue struct { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
128 signalChan chan struct{} |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
129 creatorsMu sync.Mutex |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
130 creators map[JobKind]JobCreator |
3221
899914a18d7e
Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3219
diff
changeset
|
131 usedDeps map[string]int |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
132 } |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
133 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
134 var iqueue = importQueue{ |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
135 signalChan: make(chan struct{}), |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
136 creators: map[JobKind]JobCreator{}, |
3221
899914a18d7e
Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3219
diff
changeset
|
137 usedDeps: map[string]int{}, |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
138 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
139 |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
140 var ( |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
141 // ImportStateNames is a list of the states a job can be in. |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
142 ImportStateNames = []string{ |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
143 "queued", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
144 "running", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
145 "failed", |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
146 "unchanged", |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
147 "pending", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
148 "accepted", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
149 "declined", |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
150 "reviewed", |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
151 } |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
152 ) |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
153 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
154 const ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
155 queueUser = "sys_admin" |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
156 |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
157 reEnqueueRunningSQL = ` |
4748
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
158 UPDATE import.imports SET |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
159 state = 'queued'::import_state, |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
160 changed = CURRENT_TIMESTAMP |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
161 WHERE state = 'running'::import_state` |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
162 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
163 insertJobSQL = ` |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
164 INSERT INTO import.imports ( |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
165 kind, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
166 due, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
167 trys_left, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
168 retry_wait, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
169 username, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
170 send_email, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
171 data |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
172 ) VALUES ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
173 $1, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
174 COALESCE($2, CURRENT_TIMESTAMP), |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
175 $3, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
176 $4, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
177 $5, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
178 $6, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
179 $7 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
180 ) RETURNING id` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
181 |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
182 // Select oldest queued job but prioritize review jobs |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
183 selectJobSQL = ` |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
184 SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `') |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
185 id, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
186 kind, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
187 trys_left, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
188 retry_wait, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
189 username, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
190 send_email, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
191 data |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
192 FROM import.imports |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
193 WHERE |
1718
8ddbedf296d7
Re-scheduled imports: be a bit more tolerant about start time.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1708
diff
changeset
|
194 due <= CURRENT_TIMESTAMP + interval '5 seconds' AND |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
195 state = 'queued'::import_state AND |
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
196 kind = ANY($1) |
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
197 ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
198 LIMIT 1` |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
199 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
200 updateStateSQL = ` |
4748
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
201 UPDATE import.imports SET |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
202 state = $1::import_state, |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
203 changed = CURRENT_TIMESTAMP |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
204 WHERE id = $2` |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
205 |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
206 updateStateSummarySQL = ` |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
207 UPDATE import.imports SET |
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
208 state = $1::import_state, |
4748
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
209 changed = CURRENT_TIMESTAMP, |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
210 summary = $2 |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
211 WHERE id = $3` |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
212 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
213 deleteJobSQL = ` |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
214 DELETE FROM import.imports WHERE id = $1` |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
215 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
216 logMessageSQL = ` |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
217 INSERT INTO import.import_logs ( |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
218 import_id, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
219 kind, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
220 msg |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
221 ) VALUES ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
222 $1, |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
223 $2::log_type, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
224 $3 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
225 )` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
226 ) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
227 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
228 func init() { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
229 go iqueue.importLoop() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
230 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
231 |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
232 // Error makes UnchangedError an error. |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
233 func (ue UnchangedError) Error() string { |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
234 return string(ue) |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
235 } |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
236 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
237 type reviewedJobCreator struct { |
5101
1b0b13e70bc1
Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5100
diff
changeset
|
238 jobCreator JobCreator |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
239 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
240 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
241 func (*reviewedJobCreator) AutoAccept() bool { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
242 return true |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
243 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
244 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
245 func (*reviewedJobCreator) RemoveJob() bool { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
246 return true |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
247 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
248 |
5101
1b0b13e70bc1
Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5100
diff
changeset
|
249 func (rjc *reviewedJobCreator) Depends() [2][]string { |
5102
8cc5b08ffc2b
End endless recursion.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5101
diff
changeset
|
250 return rjc.jobCreator.Depends() |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
251 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
252 |
5101
1b0b13e70bc1
Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5100
diff
changeset
|
253 func (rjc *reviewedJobCreator) Description() string { |
5120
22899babe85d
Catch another endless recursin call.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5119
diff
changeset
|
254 return rjc.jobCreator.Description() + ReviewJobSuffix |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
255 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
256 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
257 func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
258 return nil |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
259 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
260 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
261 type reviewedJob struct { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
262 ID int64 `json:"id"` |
5103
a11705203f3f
Nmae json field in reviewed job correctly.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5102
diff
changeset
|
263 Accepted bool `json:"accepted"` |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
264 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
265 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
266 func (*reviewedJobCreator) Create() Job { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
267 return new(reviewedJob) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
268 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
269 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
270 func (*reviewedJob) CleanUp() error { return nil } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
271 |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
272 func (r *reviewedJob) CreateFeedback(int64) Feedback { |
5119
5f62ac3db148
Don't write "import #42 started" into imports logs and remove the supress code to prevent this in review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5117
diff
changeset
|
273 return logFeedback(r.ID) |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
274 } |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
275 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
276 func (rj *reviewedJob) Do( |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
277 ctx context.Context, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
278 importID int64, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
279 conn *sql.Conn, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
280 feedback Feedback, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
281 ) (interface{}, error) { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
282 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
283 tx, err := conn.BeginTx(ctx, nil) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
284 if err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
285 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
286 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
287 defer tx.Rollback() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
288 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
289 var signer string |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
290 if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
291 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
292 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
293 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
294 var user, kind string |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
295 if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
296 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
297 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
298 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
299 jc := FindJobCreator(JobKind(kind)) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
300 if jc == nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
301 return nil, fmt.Errorf("no job creator found for '%s'", kind) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
302 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
303 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
304 importFeedback := logFeedback(rj.ID) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
305 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
306 if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
307 userTx, err := conn.BeginTx(ctx, nil) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
308 if err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
309 return err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
310 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
311 defer userTx.Rollback() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
312 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
313 if rj.Accepted { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
314 err = jc.StageDone(ctx, userTx, rj.ID, importFeedback) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
315 } else { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
316 _, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
317 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
318 if err == nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
319 err = userTx.Commit() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
320 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
321 return err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
322 }); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
323 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
324 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
325 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
326 // Remove the import track |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
327 if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
328 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
329 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
330 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
331 var state string |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
332 if rj.Accepted { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
333 state = "accepted" |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
334 } else { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
335 state = "declined" |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
336 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
337 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
338 if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
339 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
340 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
341 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
342 importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
343 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
344 return nil, tx.Commit() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
345 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
346 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
347 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
348 q.creatorsMu.Lock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
349 defer q.creatorsMu.Unlock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
350 q.creators[kind] = jc |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
351 q.creators[kind+ReviewJobSuffix] = &reviewedJobCreator{jobCreator: jc} |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
352 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
353 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
354 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
355 // FindJobCreator looks up a JobCreator in the global import queue. |
1193
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
356 func FindJobCreator(kind JobKind) JobCreator { |
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
357 return iqueue.jobCreator(kind) |
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
358 } |
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
359 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
360 // ImportKindNames is a list of the names of the imports the |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
361 // global import queue supports. |
1495
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
362 func ImportKindNames() []string { |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
363 return iqueue.importKindNames() |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
364 } |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
365 |
3246
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
366 // LogImportKindNames logs a list of importer types registered |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
367 // to the global import queue. |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
368 func LogImportKindNames() { |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
369 kinds := ImportKindNames() |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
370 sort.Strings(kinds) |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
371 log.Printf("info: registered import kinds: %s", |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
372 strings.Join(kinds, ", ")) |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
373 } |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
374 |
1694
4a2fad8f57de
Imports: Resolved golint issues unrelated to exported symbols commenting.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
1657
diff
changeset
|
375 // HasImportKindName checks if the import queue supports a given kind. |
1627
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
376 func HasImportKindName(kind string) bool { |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
377 return iqueue.hasImportKindName(kind) |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
378 } |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
379 |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
380 // |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
381 func (q *importQueue) hasImportKindName(kind string) bool { |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
382 q.creatorsMu.Lock() |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
383 defer q.creatorsMu.Unlock() |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
384 return q.creators[JobKind(kind)] != nil |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
385 } |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
386 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
387 // RegisterJobCreator adds a JobCreator to the global import queue. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
388 // This a good candidate to be called in a init function for |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
389 // a particular JobCreator. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
390 func RegisterJobCreator(kind JobKind, jc JobCreator) { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
391 iqueue.registerJobCreator(kind, jc) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
392 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
393 |
1495
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
394 func (q *importQueue) importKindNames() []string { |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
395 q.creatorsMu.Lock() |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
396 defer q.creatorsMu.Unlock() |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
397 names := make([]string, len(q.creators)) |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
398 var i int |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
399 for kind := range q.creators { |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
400 names[i] = string(kind) |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
401 i++ |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
402 } |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
403 // XXX: Consider using sort.Strings to make output deterministic. |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
404 return names |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
405 } |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
406 |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
407 func (idj *idJob) nextRetry(feedback Feedback) bool { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
408 switch { |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
409 case idj.waitRetry.Status != pgtype.Present && !idj.triesLeft.Valid: |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
410 return false |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
411 case idj.waitRetry.Status == pgtype.Present && !idj.triesLeft.Valid: |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
412 return true |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
413 case idj.triesLeft.Valid: |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
414 if idj.triesLeft.Int64 < 1 { |
5114
da26076ffafe
More neutral messages in retries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5113
diff
changeset
|
415 feedback.Warn("no retries left") |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
416 } else { |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
417 idj.triesLeft.Int64-- |
5114
da26076ffafe
More neutral messages in retries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5113
diff
changeset
|
418 feedback.Info("failed but will retry") |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
419 return true |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
420 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
421 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
422 return false |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
423 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
424 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
425 func (idj *idJob) nextDue() time.Time { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
426 now := time.Now() |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
427 if idj.waitRetry.Status == pgtype.Present { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
428 var d time.Duration |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
429 if err := idj.waitRetry.AssignTo(&d); err != nil { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
430 log.Printf("error: converting waitRetry failed: %v\n", err) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
431 } else { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
432 now = now.Add(d) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
433 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
434 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
435 return now |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
436 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
437 |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
438 func (idj *idJob) triesLeftPointer() *int { |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
439 if !idj.triesLeft.Valid { |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
440 return nil |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
441 } |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
442 t := int(idj.triesLeft.Int64) |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
443 return &t |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
444 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
445 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
446 func (idj *idJob) waitRetryPointer() *time.Duration { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
447 if idj.waitRetry.Status != pgtype.Present { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
448 return nil |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
449 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
450 d := new(time.Duration) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
451 if err := idj.waitRetry.AssignTo(d); err != nil { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
452 log.Printf("error: converting waitRetry failed: %v\n", err) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
453 return nil |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
454 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
455 return d |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
456 } |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
457 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
458 func (q *importQueue) lockDependencies(jc JobCreator) { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
459 deps := jc.Depends() |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
460 q.creatorsMu.Lock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
461 defer q.creatorsMu.Unlock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
462 for _, d := range deps[0] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
463 q.usedDeps[d] = runExclusive |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
464 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
465 for _, d := range deps[1] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
466 q.usedDeps[d]++ |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
467 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
468 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
469 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
470 func (q *importQueue) unlockDependencies(jc JobCreator) { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
471 deps := jc.Depends() |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
472 q.creatorsMu.Lock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
473 defer q.creatorsMu.Unlock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
474 for _, d := range deps[0] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
475 q.usedDeps[d] = 0 |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
476 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
477 for _, d := range deps[1] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
478 q.usedDeps[d]-- |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
479 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
480 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
481 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
482 func (q *importQueue) jobCreator(kind JobKind) JobCreator { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
483 q.creatorsMu.Lock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
484 defer q.creatorsMu.Unlock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
485 return q.creators[kind] |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
486 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
487 |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
488 func (q *importQueue) addJob( |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
489 kind JobKind, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
490 due time.Time, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
491 triesLeft *int, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
492 waitRetry *time.Duration, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
493 user string, |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
494 sendEmail bool, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
495 data string, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
496 ) (int64, error) { |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
497 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
498 var id int64 |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
499 if due.IsZero() { |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
500 due = time.Now() |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
501 } |
4084
350a24c92848
Deliver times from import queue in UTC.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3246
diff
changeset
|
502 due = due.UTC() |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
503 |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
504 var tl sql.NullInt64 |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
505 if triesLeft != nil { |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
506 tl = sql.NullInt64{Int64: int64(*triesLeft), Valid: true} |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
507 } |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
508 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
509 var wr pgtype.Interval |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
510 if waitRetry != nil { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
511 if err := wr.Set(*waitRetry); err != nil { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
512 return 0, err |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
513 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
514 } else { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
515 wr = pgtype.Interval{Status: pgtype.Null} |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
516 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
517 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
518 ctx := context.Background() |
1798
40cbfd268aa9
Row level security for import jobs
Tom Gottfried <tom@intevation.de>
parents:
1760
diff
changeset
|
519 err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
520 return conn.QueryRowContext( |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
521 ctx, |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
522 insertJobSQL, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
523 string(kind), |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
524 due, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
525 tl, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
526 &wr, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
527 user, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
528 sendEmail, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
529 data).Scan(&id) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
530 }) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
531 if err == nil { |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
532 select { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
533 case q.signalChan <- struct{}{}: |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
534 default: |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
535 } |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
536 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
537 return id, err |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
538 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
539 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
540 // AddJob adds a job to the global import queue to be executed |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
541 // as soon as possible after due. |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
542 // This is gone in a separate Go routine |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
543 // so this will not block. |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
544 func AddJob( |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
545 kind JobKind, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
546 due time.Time, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
547 triesLeft *int, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
548 waitRetry *time.Duration, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
549 user string, |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
550 sendEmail bool, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
551 data string, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
552 ) (int64, error) { |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
553 return iqueue.addJob( |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
554 kind, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
555 due, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
556 triesLeft, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
557 waitRetry, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
558 user, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
559 sendEmail, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
560 data) |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
561 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
562 |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
563 const ( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
564 isPendingSQL = ` |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
565 SELECT |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
566 state = 'pending'::import_state, |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
567 kind |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
568 FROM import.imports |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
569 WHERE id = $1` |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
570 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
571 selectUserSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
572 SELECT username from import.imports WHERE ID = $1` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
573 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
574 selectUserKindSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
575 SELECT username, kind from import.imports WHERE ID = $1` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
576 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
577 reviewSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
578 UPDATE import.imports SET |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
579 state = $1::import_state, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
580 changed = CURRENT_TIMESTAMP, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
581 signer = $2 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
582 WHERE id = $3` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
583 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
584 deleteImportDataSQL = `SELECT import.del_import($1)` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
585 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
586 deleteImportTrackSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
587 DELETE FROM import.track_imports WHERE import_id = $1` |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
588 ) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
589 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
590 func (q *importQueue) decideImportTx( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
591 ctx context.Context, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
592 tx *sql.Tx, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
593 id int64, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
594 accepted bool, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
595 reviewer string, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
596 ) error { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
597 var ( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
598 pending bool |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
599 kind string |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
600 ) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
601 |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
602 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
603 case err == sql.ErrNoRows: |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
604 return fmt.Errorf("cannot find import #%d", id) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
605 case err != nil: |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
606 return err |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
607 case !pending: |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
608 return fmt.Errorf("#%d is not pending", id) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
609 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
610 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
611 jc := q.jobCreator(JobKind(kind)) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
612 if jc == nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
613 return fmt.Errorf("no job creator for kind '%s'", kind) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
614 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
615 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
616 r := &reviewedJob{ |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
617 ID: id, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
618 Accepted: accepted, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
619 } |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
620 serialized, err := common.ToJSONString(r) |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
621 if err != nil { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
622 return err |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
623 } |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
624 |
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
625 // Try a little harder to persist the decision. |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
626 tries := reviewJobRetries |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
627 wait := reviewJobWait |
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
628 |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
629 rID, err := q.addJob( |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
630 JobKind(kind+ReviewJobSuffix), |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
631 time.Now(), |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
632 &tries, |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
633 &wait, |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
634 reviewer, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
635 false, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
636 serialized) |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
637 log.Printf("info: add review job %d\n", rID) |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
638 if err != nil { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
639 return err |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
640 } |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
641 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
642 return err |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
643 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
644 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
645 func (q *importQueue) decideImport( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
646 ctx context.Context, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
647 id int64, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
648 accepted bool, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
649 reviewer string, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
650 ) error { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
651 if ctx == nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
652 ctx = context.Background() |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
653 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
654 |
5033
13d9820c1ea4
Run review decisions SQLs as reviewer instead of queue user to use the rls policies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5029
diff
changeset
|
655 return auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
656 tx, err := conn.BeginTx(ctx, nil) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
657 if err != nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
658 return err |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
659 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
660 defer tx.Rollback() |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
661 err = q.decideImportTx(ctx, tx, id, accepted, reviewer) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
662 if err == nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
663 err = tx.Commit() |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
664 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
665 return err |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
666 }) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
667 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
668 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
669 func DecideImport( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
670 ctx context.Context, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
671 id int64, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
672 accepted bool, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
673 reviewer string, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
674 ) error { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
675 return iqueue.decideImport(ctx, id, accepted, reviewer) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
676 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
677 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
678 type logFeedback int64 |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
679 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
680 func (lf logFeedback) log(kind, format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
681 ctx := context.Background() |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
682 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
683 _, err := conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
684 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
685 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
686 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
687 if err != nil { |
1760
22148eb0f986
More on harmonizing logging.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1758
diff
changeset
|
688 log.Printf("error: logging failed: %v\n", err) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
689 } |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
690 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
691 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
692 func (lf logFeedback) Info(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
693 lf.log("info", format, args...) |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
694 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
695 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
696 func (lf logFeedback) Warn(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
697 lf.log("warn", format, args...) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
698 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
699 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
700 func (lf logFeedback) Error(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
701 lf.log("error", format, args...) |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
702 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
703 |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
704 func survive(fn func() error) func() error { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
705 return func() (err error) { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
706 defer func() { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
707 if p := recover(); p != nil { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
708 err = fmt.Errorf("%v: %s", p, string(debug.Stack())) |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
709 } |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
710 }() |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
711 return fn() |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
712 } |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
713 } |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
714 |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
715 func reEnqueueRunning() error { |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
716 ctx := context.Background() |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
717 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
718 _, err := conn.ExecContext(ctx, reEnqueueRunningSQL) |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
719 return err |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
720 }) |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
721 } |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
722 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
723 func (q *importQueue) fetchJob() (*idJob, error) { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
724 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
725 var which []string |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
726 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
727 q.creatorsMu.Lock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
728 nextCreator: |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
729 for kind, jc := range q.creators { |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
730 deps := jc.Depends() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
731 for _, d := range deps[0] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
732 if q.usedDeps[d] != 0 { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
733 continue nextCreator |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
734 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
735 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
736 for _, d := range deps[1] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
737 if q.usedDeps[d] == runExclusive { |
3221
899914a18d7e
Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3219
diff
changeset
|
738 continue nextCreator |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
739 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
740 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
741 which = append(which, string(kind)) |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
742 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
743 q.creatorsMu.Unlock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
744 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
745 if len(which) == 0 { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
746 return nil, sql.ErrNoRows |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
747 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
748 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
749 var kinds pgtype.TextArray |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
750 if err := kinds.Set(which); err != nil { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
751 return nil, err |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
752 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
753 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
754 var ji idJob |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
755 ctx := context.Background() |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
756 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
757 tx, err := conn.BeginTx(ctx, nil) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
758 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
759 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
760 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
761 defer tx.Rollback() |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
762 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
763 &ji.id, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
764 &ji.kind, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
765 &ji.triesLeft, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
766 &ji.waitRetry, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
767 &ji.user, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
768 &ji.sendEmail, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
769 &ji.data, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
770 ); err != nil { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
771 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
772 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
773 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
774 if err == nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
775 err = tx.Commit() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
776 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
777 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
778 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
779 switch { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
780 case err == sql.ErrNoRows: |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
781 return nil, nil |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
782 case err != nil: |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
783 return nil, err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
784 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
785 return &ji, nil |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
786 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
787 |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
788 func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
789 // As it is important to keep the persistent model |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
790 // in sync with the in-memory model try harder to store |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
791 // the state. |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
792 const maxTries = 10 |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
793 var sleep = time.Second |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
794 |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
795 for try := 1; ; try++ { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
796 var err error |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
797 if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
798 return err |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
799 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
800 log.Printf("warn: [try %d/%d] Storing state failed: %v (try again in %s).\n", |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
801 try, maxTries, err, sleep) |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
802 |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
803 time.Sleep(sleep) |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
804 if sleep < time.Minute { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
805 if sleep *= 2; sleep > time.Minute { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
806 sleep = time.Minute |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
807 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
808 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
809 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
810 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
811 |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
812 func updateStateSummary( |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
813 ctx context.Context, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
814 id int64, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
815 state string, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
816 summary interface{}, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
817 ) error { |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
818 var s sql.NullString |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
819 if summary != nil { |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
820 var b strings.Builder |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
821 if err := json.NewEncoder(&b).Encode(summary); err != nil { |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
822 return err |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
823 } |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
824 s = sql.NullString{String: b.String(), Valid: true} |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
825 } |
2823
b150e5b37afe
Import queue: As it is important to keep the in-memory and the persistent model in sync try harder to store the final state change if it fails.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2801
diff
changeset
|
826 |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
827 return tryHardToStoreState(ctx, func(conn *sql.Conn) error { |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
828 _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
829 return err |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
830 }) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
831 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
832 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
833 func deleteJob(ctx context.Context, id int64) error { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
834 return tryHardToStoreState(ctx, func(conn *sql.Conn) error { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
835 _, err := conn.ExecContext(ctx, deleteJobSQL, id) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
836 return err |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
837 }) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
838 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
839 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
840 func errorAndFail(id int64, format string, args ...interface{}) error { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
841 ctx := context.Background() |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
842 return tryHardToStoreState(ctx, func(conn *sql.Conn) error { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
843 tx, err := conn.BeginTx(ctx, nil) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
844 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
845 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
846 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
847 defer tx.Rollback() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
848 _, err = conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
849 ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
850 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
851 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
852 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
853 _, err = conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
854 ctx, updateStateSQL, "failed", id) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
855 if err == nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
856 err = tx.Commit() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
857 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
858 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
859 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
860 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
861 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
862 func (q *importQueue) importLoop() { |
1000
14425e35e3c2
Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
998
diff
changeset
|
863 config.WaitReady() |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
864 // re-enqueue the jobs that are in state running. |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
865 // They where in progess when the server went down. |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
866 if err := reEnqueueRunning(); err != nil { |
1751
c3a6aaf926c3
Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1740
diff
changeset
|
867 log.Printf("error: re-enqueuing failed: %v", err) |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
868 } |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
869 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
870 for { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
871 var idj *idJob |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
872 var err error |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
873 |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
874 for { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
875 if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows { |
1751
c3a6aaf926c3
Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1740
diff
changeset
|
876 log.Printf("error: db: %v\n", err) |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
877 } |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
878 if idj != nil { |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
879 break |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
880 } |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
881 select { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
882 case <-q.signalChan: |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
883 case <-time.After(pollDuration): |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
884 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
885 } |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
886 |
1751
c3a6aaf926c3
Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1740
diff
changeset
|
887 log.Printf("info: starting import #%d\n", idj.id) |
987
3841509f6e9e
Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
979
diff
changeset
|
888 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
889 jc := q.jobCreator(idj.kind) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
890 if jc == nil { |
1010
8f23ec811afb
Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1005
diff
changeset
|
891 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
892 continue |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
893 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
894 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
895 // Lock dependencies. |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
896 q.lockDependencies(jc) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
897 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
898 go func(jc JobCreator, idj *idJob) { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
899 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
900 // Unlock the dependencies. |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
901 defer func() { |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
902 q.unlockDependencies(jc) |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
903 select { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
904 case q.signalChan <- struct{}{}: |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
905 default: |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
906 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
907 }() |
1010
8f23ec811afb
Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1005
diff
changeset
|
908 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
909 job := jc.Create() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
910 if err := common.FromJSONString(idj.data, job); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
911 errorAndFail(idj.id, "failed to create job for import #%d: %v", |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
912 idj.id, err) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
913 return |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
914 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
915 |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
916 var feedback Feedback |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
917 if fc, ok := job.(FeedbackJob); ok { |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
918 feedback = fc.CreateFeedback(idj.id) |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
919 } else { |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
920 feedback = logFeedback(idj.id) |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
921 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
922 |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
923 ctx := context.Background() |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
924 var summary interface{} |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
925 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
926 errDo := survive(func() error { |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
927 return auth.RunAs(ctx, idj.user, |
1168
930fdd8b474f
Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1138
diff
changeset
|
928 func(conn *sql.Conn) error { |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
929 var err error |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
930 summary, err = job.Do(ctx, idj.id, conn, feedback) |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
931 return err |
1168
930fdd8b474f
Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1138
diff
changeset
|
932 }) |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
933 })() |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
934 |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
935 var unchanged, retry bool |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
936 if v, ok := errDo.(UnchangedError); ok { |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
937 feedback.Info("unchanged: %s", v.Error()) |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
938 unchanged = true |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
939 } else if errDo != nil { |
4180
91cb4a7b1b13
Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents:
4084
diff
changeset
|
940 feedback.Error("error in import: %v", |
91cb4a7b1b13
Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents:
4084
diff
changeset
|
941 pgxutils.ReadableError{Err: errDo}) |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
942 retry = idj.nextRetry(feedback) |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
943 } |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
944 |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
945 var errCleanup error |
2801
054f5d61452d
Import queue: Cleanup up debris after an import in all cases if we are not going to retry it.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2187
diff
changeset
|
946 if !retry { // cleanup debris |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
947 if errCleanup = survive(job.CleanUp)(); errCleanup != nil { |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
948 feedback.Error("error cleanup: %v", errCleanup) |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
949 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
950 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
951 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
952 var remove bool |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
953 if remover, ok := jc.(JobRemover); ok { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
954 remove = remover.RemoveJob() |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
955 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
956 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
957 var state string |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
958 switch { |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
959 case unchanged: |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
960 state = "unchanged" |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
961 case errDo != nil || errCleanup != nil: |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
962 state = "failed" |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
963 case jc.AutoAccept(): |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
964 state = "accepted" |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
965 default: |
1190
e3de65179889
The imort queue has now six states:
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1189
diff
changeset
|
966 state = "pending" |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
967 } |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
968 if !remove { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
969 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
970 log.Printf("error: setting state of job %d failed: %v\n", idj.id, err) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
971 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
972 log.Printf("info: import #%d finished: %s\n", idj.id, state) |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
973 } |
1646
a0982c38eac0
Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1642
diff
changeset
|
974 if idj.sendEmail { |
a0982c38eac0
Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1642
diff
changeset
|
975 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) |
a0982c38eac0
Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1642
diff
changeset
|
976 } |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
977 |
5115
bb5459faadb7
Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5114
diff
changeset
|
978 if retry { |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
979 nid, err := q.addJob( |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
980 idj.kind, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
981 idj.nextDue(), |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
982 idj.triesLeftPointer(), |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
983 idj.waitRetryPointer(), |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
984 idj.user, idj.sendEmail, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
985 idj.data) |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
986 if err != nil { |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
987 log.Printf("error: retry enqueue failed: %v\n", err) |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
988 } else { |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
989 log.Printf("info: re-enqueued job with id %d\n", nid) |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
990 } |
5115
bb5459faadb7
Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5114
diff
changeset
|
991 } |
bb5459faadb7
Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5114
diff
changeset
|
992 if remove { |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
993 if err := deleteJob(ctx, idj.id); err != nil { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
994 log.Printf("error: deleting job %d failed: %v\n", idj.id, err) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
995 } |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
996 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
997 }(jc, idj) |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
998 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
999 } |