Mercurial > gemma
annotate pkg/imports/queue.go @ 5591:0011f50cf216 surveysperbottleneckid
Removed no longer used alternative api for surveys/ endpoint.
As bottlenecks in the summary for SR imports are now identified by
their id and no longer by the (not guarantied to be unique!) name,
there is no longer the need to request survey data by the name+date
tuple (which isn't reliable anyway). So the workaround was now
reversed.
author | Sascha Wilde <wilde@sha-bang.de> |
---|---|
date | Wed, 06 Apr 2022 13:30:29 +0200 |
parents | aaa9e658cabd |
children | 1222b777f51f |
rev | line source |
---|---|
1017
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
1 // This is Free Software under GNU Affero General Public License v >= 3.0 |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
2 // without warranty, see README.md and license for details. |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
3 // |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
4 // SPDX-License-Identifier: AGPL-3.0-or-later |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
5 // License-Filename: LICENSES/AGPL-3.0.txt |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
6 // |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
7 // Copyright (C) 2018 by via donau |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
8 // – Österreichische Wasserstraßen-Gesellschaft mbH |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
9 // Software engineering by Intevation GmbH |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
10 // |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
11 // Author(s): |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> |
a244b18cb916
Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1010
diff
changeset
|
13 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
14 package imports |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
15 |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
16 import ( |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
17 "context" |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
18 "database/sql" |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
19 "encoding/json" |
5153
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
20 "errors" |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
21 "fmt" |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
22 "runtime/debug" |
3246
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
23 "sort" |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
24 "strings" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
25 "sync" |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
26 "time" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
27 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
28 "github.com/jackc/pgx/pgtype" |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
29 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
30 "gemma.intevation.de/gemma/pkg/auth" |
2187
7c83b5277c1c
Import queue: Removed boilerplate code to deserialize jobs from JSON by making it part of the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2168
diff
changeset
|
31 "gemma.intevation.de/gemma/pkg/common" |
1000
14425e35e3c2
Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
998
diff
changeset
|
32 "gemma.intevation.de/gemma/pkg/config" |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
33 "gemma.intevation.de/gemma/pkg/log" |
4180
91cb4a7b1b13
Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents:
4084
diff
changeset
|
34 "gemma.intevation.de/gemma/pkg/pgxutils" |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
35 ) |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
36 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
37 type ( |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
38 // Feedback is passed to the Do method of a Job to log |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
39 // informations, warnings or errors. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
40 Feedback interface { |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
41 // Info logs informations. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
42 Info(fmt string, args ...interface{}) |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
43 // Warn logs warnings. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
44 Warn(fmt string, args ...interface{}) |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
45 // Error logs errors. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
46 Error(fmt string, args ...interface{}) |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
47 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
48 |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
49 // UnchangedError may be issued by Do of a Job to indicate |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
50 // That the database has not changed. |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
51 UnchangedError string |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
52 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
53 // Job is the central abstraction of an import job |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
54 // run by the import queue. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
55 Job interface { |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
56 // Do is called to do the actual import. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
57 // Bind transactions to ctx and conn, please- |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
58 // id is the number of the import job. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
59 // feedback can be used to log the import process. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
60 // If no error is return the import is assumed to |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
61 // be successfull. The non-error return value is |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
62 // serialized as a JSON string into the database as |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
63 // a summary to the import to be used by the review process. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
64 Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error) |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
65 // CleanUp is called to clean up ressources hold by the import. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
66 // It is called whether the import succeeded or not. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
67 CleanUp() error |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
68 } |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
69 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
70 FeedbackJob interface { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
71 Job |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
72 CreateFeedback(int64) Feedback |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
73 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
74 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
75 // JobKind is the type of an import. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
76 // Choose a unique name for every import. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
77 JobKind string |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
78 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
79 // JobCreator is used to bring a job to life as it is stored |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
80 // in pure meta-data form to the database. |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
81 JobCreator interface { |
5104
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
82 // Description is the long name of the import. |
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
83 Description() string |
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
84 // Create build the actual job. |
cb736582e8fc
Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5103
diff
changeset
|
85 Create() Job |
3219
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
86 // Depends returns two lists of ressources locked by this type of import. |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
87 // Imports are run concurrently if they have disjoint sets |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
88 // of dependencies. |
3219
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
89 // The first list are locked exclusively. |
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
90 // The second allows multiple read users but only one writing one. |
4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2872
diff
changeset
|
91 Depends() [2][]string |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
92 // StageDone is called if an import is positively reviewed |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
93 // (state = accepted). This can be used to finalize the imported |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
94 // data to move it e.g from the staging area. |
5034
59a99655f34d
Added feedback support for StageDone.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5033
diff
changeset
|
95 StageDone(context.Context, *sql.Tx, int64, Feedback) error |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
96 // AutoAccept indicates that imports of this kind |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
97 // don't need a review. |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
98 AutoAccept() bool |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
99 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
100 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
101 JobRemover interface { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
102 JobCreator |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
103 RemoveJob() bool |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
104 } |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
105 |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
106 idJob struct { |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
107 id int64 |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
108 kind JobKind |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
109 user string |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
110 waitRetry pgtype.Interval |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
111 triesLeft sql.NullInt64 |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
112 sendEmail bool |
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
113 data string |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
114 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
115 ) |
987
3841509f6e9e
Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
979
diff
changeset
|
116 |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
117 const ( |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
118 pollDuration = time.Second * 10 |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
119 runExclusive = -66666 |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
120 ) |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
121 |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
122 const ( |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
123 ReviewJobSuffix = "#review" |
5524
35966741e45e
Try harder to retry executing review finalization jobs if they fail: 500 times each 10 min.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5490
diff
changeset
|
124 reviewJobRetries = 200 |
35966741e45e
Try harder to retry executing review finalization jobs if they fail: 500 times each 10 min.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5490
diff
changeset
|
125 reviewJobWait = 10 * time.Minute |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
126 ) |
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
127 |
5169
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
128 const ( |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
129 hardMaxTries = 200 |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
130 minWaitRetry = 5 * time.Second |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
131 ) |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
132 |
5153
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
133 var ErrRetrying = errors.New("retrying") |
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
134 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
135 type importQueue struct { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
136 cmdCh chan func(*importQueue) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
137 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
138 creatorsMu sync.Mutex |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
139 creators map[JobKind]JobCreator |
3221
899914a18d7e
Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3219
diff
changeset
|
140 usedDeps map[string]int |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
141 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
142 waiting map[int64]chan struct{} |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
143 } |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
144 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
145 var iqueue = importQueue{ |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
146 cmdCh: make(chan func(*importQueue)), |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
147 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
148 creators: map[JobKind]JobCreator{}, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
149 usedDeps: map[string]int{}, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
150 waiting: make(map[int64]chan struct{}), |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
151 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
152 |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
153 var ( |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
154 // ImportStateNames is a list of the states a job can be in. |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
155 ImportStateNames = []string{ |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
156 "queued", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
157 "running", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
158 "failed", |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
159 "unchanged", |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
160 "pending", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
161 "accepted", |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
162 "declined", |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
163 "reviewed", |
1189
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
164 } |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
165 ) |
3d50f558870c
REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1168
diff
changeset
|
166 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
167 const ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
168 queueUser = "sys_admin" |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
169 |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
170 reEnqueueRunningSQL = ` |
4748
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
171 UPDATE import.imports SET |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
172 state = 'queued'::import_state, |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
173 changed = CURRENT_TIMESTAMP |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
174 WHERE state = 'running'::import_state` |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
175 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
176 insertJobSQL = ` |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
177 INSERT INTO import.imports ( |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
178 kind, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
179 due, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
180 trys_left, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
181 retry_wait, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
182 username, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
183 send_email, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
184 data |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
185 ) VALUES ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
186 $1, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
187 COALESCE($2, CURRENT_TIMESTAMP), |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
188 $3, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
189 $4, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
190 $5, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
191 $6, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
192 $7 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
193 ) RETURNING id` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
194 |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
195 // Select oldest queued job but prioritize review jobs |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
196 selectJobSQL = ` |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
197 SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `') |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
198 id, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
199 kind, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
200 trys_left, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
201 retry_wait, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
202 username, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
203 send_email, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
204 data |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
205 FROM import.imports |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
206 WHERE |
1718
8ddbedf296d7
Re-scheduled imports: be a bit more tolerant about start time.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1708
diff
changeset
|
207 due <= CURRENT_TIMESTAMP + interval '5 seconds' AND |
5122
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
208 state = 'queued'::import_state AND |
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
209 kind = ANY($1) |
0b6b62d247e8
Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents:
5121
diff
changeset
|
210 ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
211 LIMIT 1` |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
212 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
213 updateStateSQL = ` |
4748
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
214 UPDATE import.imports SET |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
215 state = $1::import_state, |
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
216 changed = CURRENT_TIMESTAMP |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
217 WHERE id = $2` |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
218 |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
219 updateStateSummarySQL = ` |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
220 UPDATE import.imports SET |
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
221 state = $1::import_state, |
4748
47922c1a088d
Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4180
diff
changeset
|
222 changed = CURRENT_TIMESTAMP, |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
223 summary = $2 |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
224 WHERE id = $3` |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
225 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
226 deleteJobSQL = ` |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
227 DELETE FROM import.imports WHERE id = $1` |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
228 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
229 logMessageSQL = ` |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
230 INSERT INTO import.import_logs ( |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
231 import_id, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
232 kind, |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
233 msg |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
234 ) VALUES ( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
235 $1, |
1995
59055c8301df
Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents:
1985
diff
changeset
|
236 $2::log_type, |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
237 $3 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
238 )` |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
239 ) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
240 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
241 func init() { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
242 go iqueue.importLoop() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
243 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
244 |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
245 // Error makes UnchangedError an error. |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
246 func (ue UnchangedError) Error() string { |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
247 return string(ue) |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
248 } |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
249 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
250 type reviewedJobCreator struct { |
5101
1b0b13e70bc1
Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5100
diff
changeset
|
251 jobCreator JobCreator |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
252 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
253 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
254 func (*reviewedJobCreator) AutoAccept() bool { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
255 return true |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
256 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
257 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
258 func (*reviewedJobCreator) RemoveJob() bool { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
259 return true |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
260 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
261 |
5101
1b0b13e70bc1
Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5100
diff
changeset
|
262 func (rjc *reviewedJobCreator) Depends() [2][]string { |
5102
8cc5b08ffc2b
End endless recursion.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5101
diff
changeset
|
263 return rjc.jobCreator.Depends() |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
264 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
265 |
5101
1b0b13e70bc1
Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5100
diff
changeset
|
266 func (rjc *reviewedJobCreator) Description() string { |
5120
22899babe85d
Catch another endless recursin call.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5119
diff
changeset
|
267 return rjc.jobCreator.Description() + ReviewJobSuffix |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
268 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
269 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
270 func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
271 return nil |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
272 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
273 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
274 type reviewedJob struct { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
275 ID int64 `json:"id"` |
5103
a11705203f3f
Nmae json field in reviewed job correctly.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5102
diff
changeset
|
276 Accepted bool `json:"accepted"` |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
277 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
278 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
279 func (*reviewedJobCreator) Create() Job { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
280 return new(reviewedJob) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
281 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
282 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
283 func (*reviewedJob) CleanUp() error { return nil } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
284 |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
285 func (r *reviewedJob) CreateFeedback(int64) Feedback { |
5119
5f62ac3db148
Don't write "import #42 started" into imports logs and remove the supress code to prevent this in review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5117
diff
changeset
|
286 return logFeedback(r.ID) |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
287 } |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
288 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
289 func (rj *reviewedJob) Do( |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
290 ctx context.Context, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
291 importID int64, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
292 conn *sql.Conn, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
293 feedback Feedback, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
294 ) (interface{}, error) { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
295 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
296 tx, err := conn.BeginTx(ctx, nil) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
297 if err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
298 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
299 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
300 defer tx.Rollback() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
301 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
302 var signer string |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
303 if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
304 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
305 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
306 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
307 var user, kind string |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
308 if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
309 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
310 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
311 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
312 jc := FindJobCreator(JobKind(kind)) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
313 if jc == nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
314 return nil, fmt.Errorf("no job creator found for '%s'", kind) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
315 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
316 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
317 importFeedback := logFeedback(rj.ID) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
318 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
319 if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
320 userTx, err := conn.BeginTx(ctx, nil) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
321 if err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
322 return err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
323 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
324 defer userTx.Rollback() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
325 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
326 if rj.Accepted { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
327 err = jc.StageDone(ctx, userTx, rj.ID, importFeedback) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
328 } else { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
329 _, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
330 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
331 if err == nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
332 err = userTx.Commit() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
333 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
334 return err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
335 }); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
336 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
337 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
338 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
339 // Remove the import track |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
340 if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
341 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
342 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
343 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
344 var state string |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
345 if rj.Accepted { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
346 state = "accepted" |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
347 } else { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
348 state = "declined" |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
349 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
350 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
351 if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
352 return nil, err |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
353 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
354 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
355 importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
356 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
357 return nil, tx.Commit() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
358 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
359 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
360 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
361 q.creatorsMu.Lock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
362 defer q.creatorsMu.Unlock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
363 q.creators[kind] = jc |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
364 q.creators[kind+ReviewJobSuffix] = &reviewedJobCreator{jobCreator: jc} |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
365 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
366 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
367 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
368 // FindJobCreator looks up a JobCreator in the global import queue. |
1193
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
369 func FindJobCreator(kind JobKind) JobCreator { |
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
370 return iqueue.jobCreator(kind) |
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
371 } |
58acc343b1b6
Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1190
diff
changeset
|
372 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
373 // ImportKindNames is a list of the names of the imports the |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
374 // global import queue supports. |
1495
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
375 func ImportKindNames() []string { |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
376 return iqueue.importKindNames() |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
377 } |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
378 |
3246
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
379 // LogImportKindNames logs a list of importer types registered |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
380 // to the global import queue. |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
381 func LogImportKindNames() { |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
382 kinds := ImportKindNames() |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
383 sort.Strings(kinds) |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
384 log.Infof("registered import kinds: %s", |
3246
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
385 strings.Join(kinds, ", ")) |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
386 } |
64324aaeb1fb
Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3225
diff
changeset
|
387 |
1694
4a2fad8f57de
Imports: Resolved golint issues unrelated to exported symbols commenting.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
1657
diff
changeset
|
388 // HasImportKindName checks if the import queue supports a given kind. |
1627
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
389 func HasImportKindName(kind string) bool { |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
390 return iqueue.hasImportKindName(kind) |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
391 } |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
392 |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
393 // |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
394 func (q *importQueue) hasImportKindName(kind string) bool { |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
395 q.creatorsMu.Lock() |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
396 defer q.creatorsMu.Unlock() |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
397 return q.creators[JobKind(kind)] != nil |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
398 } |
b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1497
diff
changeset
|
399 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
400 // RegisterJobCreator adds a JobCreator to the global import queue. |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
401 // This a good candidate to be called in a init function for |
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
402 // a particular JobCreator. |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
403 func RegisterJobCreator(kind JobKind, jc JobCreator) { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
404 iqueue.registerJobCreator(kind, jc) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
405 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
406 |
1495
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
407 func (q *importQueue) importKindNames() []string { |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
408 q.creatorsMu.Lock() |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
409 defer q.creatorsMu.Unlock() |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
410 names := make([]string, len(q.creators)) |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
411 var i int |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
412 for kind := range q.creators { |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
413 names[i] = string(kind) |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
414 i++ |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
415 } |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
416 // XXX: Consider using sort.Strings to make output deterministic. |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
417 return names |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
418 } |
d26e3e1fcff1
The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1409
diff
changeset
|
419 |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
420 func (idj *idJob) nextRetry(feedback Feedback) bool { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
421 switch { |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
422 case idj.waitRetry.Status != pgtype.Present && !idj.triesLeft.Valid: |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
423 return false |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
424 case idj.waitRetry.Status == pgtype.Present && !idj.triesLeft.Valid: |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
425 return true |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
426 case idj.triesLeft.Valid: |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
427 if idj.triesLeft.Int64 < 1 { |
5114
da26076ffafe
More neutral messages in retries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5113
diff
changeset
|
428 feedback.Warn("no retries left") |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
429 } else { |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
430 idj.triesLeft.Int64-- |
5114
da26076ffafe
More neutral messages in retries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5113
diff
changeset
|
431 feedback.Info("failed but will retry") |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
432 return true |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
433 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
434 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
435 return false |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
436 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
437 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
438 func (idj *idJob) nextDue() time.Time { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
439 now := time.Now() |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
440 if idj.waitRetry.Status == pgtype.Present { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
441 var d time.Duration |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
442 if err := idj.waitRetry.AssignTo(&d); err != nil { |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
443 log.Errorf("converting waitRetry failed: %v\n", err) |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
444 } else { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
445 now = now.Add(d) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
446 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
447 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
448 return now |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
449 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
450 |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
451 func (idj *idJob) triesLeftPointer() *int { |
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
452 if !idj.triesLeft.Valid { |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
453 return nil |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
454 } |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
455 t := int(idj.triesLeft.Int64) |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
456 return &t |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
457 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
458 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
459 func (idj *idJob) waitRetryPointer() *time.Duration { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
460 if idj.waitRetry.Status != pgtype.Present { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
461 return nil |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
462 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
463 d := new(time.Duration) |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
464 if err := idj.waitRetry.AssignTo(d); err != nil { |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
465 log.Errorf("converting waitRetry failed: %v\n", err) |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
466 return nil |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
467 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
468 return d |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
469 } |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
470 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
471 func (q *importQueue) lockDependencies(jc JobCreator) { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
472 deps := jc.Depends() |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
473 q.creatorsMu.Lock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
474 defer q.creatorsMu.Unlock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
475 for _, d := range deps[0] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
476 q.usedDeps[d] = runExclusive |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
477 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
478 for _, d := range deps[1] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
479 q.usedDeps[d]++ |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
480 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
481 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
482 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
483 func (q *importQueue) unlockDependencies(jc JobCreator) { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
484 deps := jc.Depends() |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
485 q.creatorsMu.Lock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
486 defer q.creatorsMu.Unlock() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
487 for _, d := range deps[0] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
488 q.usedDeps[d] = 0 |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
489 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
490 for _, d := range deps[1] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
491 q.usedDeps[d]-- |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
492 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
493 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
494 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
495 func (q *importQueue) jobCreator(kind JobKind) JobCreator { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
496 q.creatorsMu.Lock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
497 defer q.creatorsMu.Unlock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
498 return q.creators[kind] |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
499 } |
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
500 |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
501 func (q *importQueue) addJob( |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
502 kind JobKind, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
503 due time.Time, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
504 triesLeft *int, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
505 waitRetry *time.Duration, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
506 user string, |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
507 sendEmail bool, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
508 data string, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
509 sync bool, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
510 ) (int64, chan struct{}, error) { |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
511 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
512 var id int64 |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
513 if due.IsZero() { |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
514 due = time.Now() |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
515 } |
4084
350a24c92848
Deliver times from import queue in UTC.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3246
diff
changeset
|
516 due = due.UTC() |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
517 |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
518 var tl sql.NullInt64 |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
519 if triesLeft != nil { |
5169
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
520 var many int64 |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
521 if *triesLeft > hardMaxTries || *triesLeft < 0 { |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
522 many = hardMaxTries |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
523 } else { |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
524 many = int64(*triesLeft) |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
525 } |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
526 tl = sql.NullInt64{Int64: many, Valid: true} |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
527 } |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
528 |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
529 var wr pgtype.Interval |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
530 if waitRetry != nil { |
5169
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
531 var howLong time.Duration |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
532 if minWaitRetry > *waitRetry { |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
533 howLong = minWaitRetry |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
534 } else { |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
535 howLong = *waitRetry |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
536 } |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
537 |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
538 if err := wr.Set(howLong); err != nil { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
539 return 0, nil, err |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
540 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
541 } else { |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
542 wr = pgtype.Interval{Status: pgtype.Null} |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
543 } |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
544 |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
545 errCh := make(chan error) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
546 var done chan struct{} |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
547 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
548 q.cmdCh <- func(q *importQueue) { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
549 ctx := context.Background() |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
550 errCh <- auth.RunAs(ctx, user, func(conn *sql.Conn) error { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
551 err := conn.QueryRowContext( |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
552 ctx, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
553 insertJobSQL, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
554 string(kind), |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
555 due, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
556 tl, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
557 &wr, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
558 user, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
559 sendEmail, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
560 data).Scan(&id) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
561 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
562 if err == nil && sync { |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
563 log.Infof("register wait for %d\n", id) |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
564 done = make(chan struct{}) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
565 q.waiting[id] = done |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
566 } |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
567 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
568 return err |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
569 }) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
570 } |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
571 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
572 return id, done, <-errCh |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
573 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
574 |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
575 // AddJob adds a job to the global import queue to be executed |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
576 // as soon as possible after due. |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
577 // This is gone in a separate Go routine |
1497
b41ad15cc55f
Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1495
diff
changeset
|
578 // so this will not block. |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
579 func AddJob( |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
580 kind JobKind, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
581 due time.Time, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
582 triesLeft *int, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
583 waitRetry *time.Duration, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
584 user string, |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
585 sendEmail bool, |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
586 data string, |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
587 ) (int64, error) { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
588 id, _, err := iqueue.addJob( |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
589 kind, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
590 due, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
591 triesLeft, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
592 waitRetry, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
593 user, |
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
594 sendEmail, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
595 data, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
596 false) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
597 return id, err |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
598 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
599 |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
600 const ( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
601 isPendingSQL = ` |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
602 SELECT |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
603 state = 'pending'::import_state, |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
604 kind |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
605 FROM import.imports |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
606 WHERE id = $1` |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
607 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
608 selectUserSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
609 SELECT username from import.imports WHERE ID = $1` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
610 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
611 selectUserKindSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
612 SELECT username, kind from import.imports WHERE ID = $1` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
613 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
614 reviewSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
615 UPDATE import.imports SET |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
616 state = $1::import_state, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
617 changed = CURRENT_TIMESTAMP, |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
618 signer = $2 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
619 WHERE id = $3` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
620 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
621 deleteImportDataSQL = `SELECT import.del_import($1)` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
622 |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
623 deleteImportTrackSQL = ` |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
624 DELETE FROM import.track_imports WHERE import_id = $1` |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
625 ) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
626 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
627 func (q *importQueue) decideImportTx( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
628 ctx context.Context, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
629 tx *sql.Tx, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
630 id int64, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
631 accepted bool, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
632 reviewer string, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
633 ) (chan struct{}, error) { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
634 var ( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
635 pending bool |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
636 kind string |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
637 ) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
638 |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
639 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
640 case err == sql.ErrNoRows: |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
641 return nil, fmt.Errorf("cannot find import #%d", id) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
642 case err != nil: |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
643 return nil, err |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
644 case !pending: |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
645 return nil, fmt.Errorf("#%d is not pending", id) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
646 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
647 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
648 jc := q.jobCreator(JobKind(kind)) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
649 if jc == nil { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
650 return nil, fmt.Errorf("no job creator for kind '%s'", kind) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
651 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
652 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
653 r := &reviewedJob{ |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
654 ID: id, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
655 Accepted: accepted, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
656 } |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
657 serialized, err := common.ToJSONString(r) |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
658 if err != nil { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
659 return nil, err |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
660 } |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
661 |
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
662 // Try a little harder to persist the decision. |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
663 tries := reviewJobRetries |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
664 wait := reviewJobWait |
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
665 |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
666 rID, done, err := q.addJob( |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
667 JobKind(kind+ReviewJobSuffix), |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
668 time.Now(), |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
669 &tries, |
5109
c0ceec7e6f85
Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5104
diff
changeset
|
670 &wait, |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
671 reviewer, |
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
672 false, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
673 serialized, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
674 true) |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
675 if err != nil { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
676 return nil, err |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
677 } |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
678 log.Infof("add review job %d\n", rID) |
5099
3cd736acbad3
First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5034
diff
changeset
|
679 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id) |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
680 if err != nil && done != nil { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
681 go func() { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
682 q.cmdCh <- func(q *importQueue) { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
683 delete(q.waiting, rID) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
684 } |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
685 }() |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
686 done = nil |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
687 } |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
688 return done, err |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
689 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
690 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
691 func (q *importQueue) decideImport( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
692 ctx context.Context, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
693 id int64, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
694 accepted bool, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
695 reviewer string, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
696 ) error { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
697 if ctx == nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
698 ctx = context.Background() |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
699 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
700 |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
701 var done chan struct{} |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
702 |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
703 if err := auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error { |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
704 tx, err := conn.BeginTx(ctx, nil) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
705 if err != nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
706 return err |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
707 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
708 defer tx.Rollback() |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
709 done, err = q.decideImportTx(ctx, tx, id, accepted, reviewer) |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
710 if err == nil { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
711 err = tx.Commit() |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
712 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
713 return err |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
714 }); err != nil { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
715 return err |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
716 } |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
717 |
5153
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
718 _, retry := <-done |
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
719 if retry { |
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
720 return ErrRetrying |
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
721 } |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
722 return nil |
5028
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
723 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
724 |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
725 func DecideImport( |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
726 ctx context.Context, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
727 id int64, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
728 accepted bool, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
729 reviewer string, |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
730 ) error { |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
731 return iqueue.decideImport(ctx, id, accepted, reviewer) |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
732 } |
d727641911a5
Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
4748
diff
changeset
|
733 |
5564
aaa9e658cabd
Log export: Added marker interface to JobCreators that log messages should be loaded at export.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5524
diff
changeset
|
734 func (q *importQueue) All(fn func(JobKind, JobCreator)) { |
aaa9e658cabd
Log export: Added marker interface to JobCreators that log messages should be loaded at export.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5524
diff
changeset
|
735 q.creatorsMu.Lock() |
aaa9e658cabd
Log export: Added marker interface to JobCreators that log messages should be loaded at export.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5524
diff
changeset
|
736 defer q.creatorsMu.Unlock() |
aaa9e658cabd
Log export: Added marker interface to JobCreators that log messages should be loaded at export.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5524
diff
changeset
|
737 for k, v := range q.creators { |
aaa9e658cabd
Log export: Added marker interface to JobCreators that log messages should be loaded at export.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5524
diff
changeset
|
738 fn(k, v) |
aaa9e658cabd
Log export: Added marker interface to JobCreators that log messages should be loaded at export.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5524
diff
changeset
|
739 } |
aaa9e658cabd
Log export: Added marker interface to JobCreators that log messages should be loaded at export.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5524
diff
changeset
|
740 } |
aaa9e658cabd
Log export: Added marker interface to JobCreators that log messages should be loaded at export.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5524
diff
changeset
|
741 |
aaa9e658cabd
Log export: Added marker interface to JobCreators that log messages should be loaded at export.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5524
diff
changeset
|
742 func All(fn func(JobKind, JobCreator)) { iqueue.All(fn) } |
aaa9e658cabd
Log export: Added marker interface to JobCreators that log messages should be loaded at export.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5524
diff
changeset
|
743 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
744 type logFeedback int64 |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
745 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
746 func (lf logFeedback) log(kind, format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
747 ctx := context.Background() |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
748 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
749 _, err := conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
750 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...)) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
751 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
752 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
753 if err != nil { |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
754 log.Errorf("logging failed: %v\n", err) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
755 } |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
756 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
757 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
758 func (lf logFeedback) Info(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
759 lf.log("info", format, args...) |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
760 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
761 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
762 func (lf logFeedback) Warn(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
763 lf.log("warn", format, args...) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
764 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
765 |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
766 func (lf logFeedback) Error(format string, args ...interface{}) { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
767 lf.log("error", format, args...) |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
768 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
769 |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
770 func survive(fn func() error) func() error { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
771 return func() (err error) { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
772 defer func() { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
773 if p := recover(); p != nil { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
774 err = fmt.Errorf("%v: %s", p, string(debug.Stack())) |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
775 } |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
776 }() |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
777 return fn() |
992
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
778 } |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
779 } |
a978b2b26a88
Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
991
diff
changeset
|
780 |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
781 func reEnqueueRunning() error { |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
782 ctx := context.Background() |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
783 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
784 _, err := conn.ExecContext(ctx, reEnqueueRunningSQL) |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
785 return err |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
786 }) |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
787 } |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
788 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
789 func (q *importQueue) fetchJob() (*idJob, error) { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
790 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
791 var which []string |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
792 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
793 q.creatorsMu.Lock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
794 nextCreator: |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
795 for kind, jc := range q.creators { |
3225
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
796 deps := jc.Depends() |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
797 for _, d := range deps[0] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
798 if q.usedDeps[d] != 0 { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
799 continue nextCreator |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
800 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
801 } |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
802 for _, d := range deps[1] { |
1a0985083c06
Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3221
diff
changeset
|
803 if q.usedDeps[d] == runExclusive { |
3221
899914a18d7e
Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
3219
diff
changeset
|
804 continue nextCreator |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
805 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
806 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
807 which = append(which, string(kind)) |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
808 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
809 q.creatorsMu.Unlock() |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
810 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
811 if len(which) == 0 { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
812 return nil, sql.ErrNoRows |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
813 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
814 |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
815 var kinds pgtype.TextArray |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
816 if err := kinds.Set(which); err != nil { |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
817 return nil, err |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
818 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
819 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
820 var ji idJob |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
821 ctx := context.Background() |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
822 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
823 tx, err := conn.BeginTx(ctx, nil) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
824 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
825 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
826 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
827 defer tx.Rollback() |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
828 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
829 &ji.id, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
830 &ji.kind, |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
831 &ji.triesLeft, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
832 &ji.waitRetry, |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
833 &ji.user, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
834 &ji.sendEmail, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
835 &ji.data, |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
836 ); err != nil { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
837 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
838 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
839 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
840 if err == nil { |
5169
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
841 if err = tx.Commit(); err != nil { |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
842 return err |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
843 } |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
844 } |
5169
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
845 // Clip repetition back to allowd values. |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
846 if ji.waitRetry.Status == pgtype.Present { |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
847 var d time.Duration |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
848 ji.waitRetry.AssignTo(&d) |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
849 if d < minWaitRetry { |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
850 ji.waitRetry.Set(minWaitRetry) |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
851 } |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
852 } |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
853 if ji.triesLeft.Valid { |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
854 if ji.triesLeft.Int64 < 0 || ji.triesLeft.Int64 > hardMaxTries { |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
855 ji.triesLeft.Int64 = hardMaxTries |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
856 } |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
857 } |
4f0869b85038
Import queue: Limit number of repetions of failed jobs to 200 and wait at least 5 seconds between the attempts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5153
diff
changeset
|
858 return nil |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
859 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
860 switch { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
861 case err == sql.ErrNoRows: |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
862 return nil, nil |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
863 case err != nil: |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
864 return nil, err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
865 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
866 return &ji, nil |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
867 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
868 |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
869 func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
870 // As it is important to keep the persistent model |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
871 // in sync with the in-memory model try harder to store |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
872 // the state. |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
873 const maxTries = 10 |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
874 var sleep = time.Second |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
875 |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
876 for try := 1; ; try++ { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
877 var err error |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
878 if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
879 return err |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
880 } |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
881 log.Warnf("[try %d/%d] Storing state failed: %v (try again in %s).\n", |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
882 try, maxTries, err, sleep) |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
883 |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
884 time.Sleep(sleep) |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
885 if sleep < time.Minute { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
886 if sleep *= 2; sleep > time.Minute { |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
887 sleep = time.Minute |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
888 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
889 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
890 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
891 } |
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
892 |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
893 func updateStateSummary( |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
894 ctx context.Context, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
895 id int64, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
896 state string, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
897 summary interface{}, |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
898 ) error { |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
899 var s sql.NullString |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
900 if summary != nil { |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
901 var b strings.Builder |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
902 if err := json.NewEncoder(&b).Encode(summary); err != nil { |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
903 return err |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
904 } |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
905 s = sql.NullString{String: b.String(), Valid: true} |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
906 } |
2823
b150e5b37afe
Import queue: As it is important to keep the in-memory and the persistent model in sync try harder to store the final state change if it fails.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2801
diff
changeset
|
907 |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
908 return tryHardToStoreState(ctx, func(conn *sql.Conn) error { |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
909 _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
910 return err |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
911 }) |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
912 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
913 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
914 func deleteJob(ctx context.Context, id int64) error { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
915 return tryHardToStoreState(ctx, func(conn *sql.Conn) error { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
916 _, err := conn.ExecContext(ctx, deleteJobSQL, id) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
917 return err |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
918 }) |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
919 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
920 |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
921 func errorAndFail(id int64, format string, args ...interface{}) error { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
922 ctx := context.Background() |
2872
bfea3f80ca9a
Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2836
diff
changeset
|
923 return tryHardToStoreState(ctx, func(conn *sql.Conn) error { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
924 tx, err := conn.BeginTx(ctx, nil) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
925 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
926 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
927 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
928 defer tx.Rollback() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
929 _, err = conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
930 ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...)) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
931 if err != nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
932 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
933 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
934 _, err = conn.ExecContext( |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
935 ctx, updateStateSQL, "failed", id) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
936 if err == nil { |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
937 err = tx.Commit() |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
938 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
939 return err |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
940 }) |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
941 } |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
942 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
943 func (q *importQueue) importLoop() { |
1000
14425e35e3c2
Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
998
diff
changeset
|
944 config.WaitReady() |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
945 // re-enqueue the jobs that are in state running. |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
946 // They where in progess when the server went down. |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
947 if err := reEnqueueRunning(); err != nil { |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
948 log.Errorf("re-enqueuing failed: %v", err) |
1005
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
949 } |
fcf016ebdef4
Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1003
diff
changeset
|
950 |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
951 for { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
952 var idj *idJob |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
953 var err error |
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
954 |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
955 for { |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
956 if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows { |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
957 log.Errorf("error: db: %v\n", err) |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
958 } |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
959 if idj != nil { |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
960 break |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
961 } |
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
962 select { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
963 case cmd := <-q.cmdCh: |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
964 cmd(q) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
965 |
1003
d789f19877f4
Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1000
diff
changeset
|
966 case <-time.After(pollDuration): |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
967 } |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
968 } |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
969 |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
970 log.Infof("starting import #%d\n", idj.id) |
987
3841509f6e9e
Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
979
diff
changeset
|
971 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
972 jc := q.jobCreator(idj.kind) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
973 if jc == nil { |
1010
8f23ec811afb
Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1005
diff
changeset
|
974 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
975 continue |
988
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
976 } |
7dfd3db94e6d
In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents:
987
diff
changeset
|
977 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
978 // Lock dependencies. |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
979 q.lockDependencies(jc) |
991
a301d240905f
Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
989
diff
changeset
|
980 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
981 go func(jc JobCreator, idj *idJob) { |
998
75e65599ea52
Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
992
diff
changeset
|
982 |
5153
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
983 var retry bool |
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
984 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
985 defer func() { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
986 // Unlock the dependencies. |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
987 q.unlockDependencies(jc) |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
988 // Unlock waiting. |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
989 q.cmdCh <- func(q *importQueue) { |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
990 if w := q.waiting[idj.id]; w != nil { |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
991 log.Infof("unlock waiting %d\n", idj.id) |
5153
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
992 if retry { |
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
993 w <- struct{}{} |
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
994 } else { |
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
995 close(w) |
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
996 } |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
997 delete(q.waiting, idj.id) |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
998 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
999 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1000 }() |
1010
8f23ec811afb
Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1005
diff
changeset
|
1001 |
5100
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
1002 job := jc.Create() |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
1003 if err := common.FromJSONString(idj.data, job); err != nil { |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
1004 errorAndFail(idj.id, "failed to create job for import #%d: %v", |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
1005 idj.id, err) |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
1006 return |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
1007 } |
d3a24152b0be
Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5099
diff
changeset
|
1008 |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
1009 var feedback Feedback |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1010 if fc, ok := job.(FeedbackJob); ok { |
5110
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
1011 feedback = fc.CreateFeedback(idj.id) |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
1012 } else { |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
1013 feedback = logFeedback(idj.id) |
4dc2e6dc6c7d
Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5109
diff
changeset
|
1014 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1015 |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
1016 ctx := context.Background() |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
1017 var summary interface{} |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
1018 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1019 errDo := survive(func() error { |
1327
cabf4789e02b
To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1193
diff
changeset
|
1020 return auth.RunAs(ctx, idj.user, |
1168
930fdd8b474f
Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1138
diff
changeset
|
1021 func(conn *sql.Conn) error { |
1392
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
1022 var err error |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
1023 summary, err = job.Do(ctx, idj.id, conn, feedback) |
0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1328
diff
changeset
|
1024 return err |
1168
930fdd8b474f
Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1138
diff
changeset
|
1025 }) |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1026 })() |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
1027 |
5153
adf7b9f1273b
Send the info if an import job was re-scheduled to sync callers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5124
diff
changeset
|
1028 var unchanged bool |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
1029 if v, ok := errDo.(UnchangedError); ok { |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
1030 feedback.Info("unchanged: %s", v.Error()) |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
1031 unchanged = true |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
1032 } else if errDo != nil { |
4180
91cb4a7b1b13
Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents:
4084
diff
changeset
|
1033 feedback.Error("error in import: %v", |
91cb4a7b1b13
Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents:
4084
diff
changeset
|
1034 pgxutils.ReadableError{Err: errDo}) |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
1035 retry = idj.nextRetry(feedback) |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1036 } |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1037 |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1038 var errCleanup error |
2801
054f5d61452d
Import queue: Cleanup up debris after an import in all cases if we are not going to retry it.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
2187
diff
changeset
|
1039 if !retry { // cleanup debris |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
1040 if errCleanup = survive(job.CleanUp)(); errCleanup != nil { |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1041 feedback.Error("error cleanup: %v", errCleanup) |
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1042 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1043 } |
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1044 |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1045 var remove bool |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1046 if remover, ok := jc.(JobRemover); ok { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1047 remove = remover.RemoveJob() |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1048 } |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1049 |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1050 var state string |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
1051 switch { |
1975
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
1052 case unchanged: |
d966f03ea819
Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1798
diff
changeset
|
1053 state = "unchanged" |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
1054 case errDo != nil || errCleanup != nil: |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1055 state = "failed" |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
1056 case jc.AutoAccept(): |
1642
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
1057 state = "accepted" |
49c04bb64e0a
Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1627
diff
changeset
|
1058 default: |
1190
e3de65179889
The imort queue has now six states:
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1189
diff
changeset
|
1059 state = "pending" |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1060 } |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1061 if !remove { |
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1062 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
1063 log.Errorf("setting state of job %d failed: %v\n", idj.id, err) |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1064 } |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
1065 log.Infof("info: import #%d finished: %s\n", idj.id, state) |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1066 } |
1646
a0982c38eac0
Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1642
diff
changeset
|
1067 if idj.sendEmail { |
a0982c38eac0
Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1642
diff
changeset
|
1068 go sendNotificationMail(idj.user, jc.Description(), state, idj.id) |
a0982c38eac0
Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1642
diff
changeset
|
1069 } |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1070 |
5115
bb5459faadb7
Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5114
diff
changeset
|
1071 if retry { |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
1072 nid, _, err := q.addJob( |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1073 idj.kind, |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
1074 idj.nextDue(), |
5113
d036ad682013
Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5112
diff
changeset
|
1075 idj.triesLeftPointer(), |
1985
8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1975
diff
changeset
|
1076 idj.waitRetryPointer(), |
1754
807569b08513
Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1751
diff
changeset
|
1077 idj.user, idj.sendEmail, |
5123
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
1078 idj.data, |
eeb45e3e0a5a
Added mechanism to have sync import jobs on import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5122
diff
changeset
|
1079 false) |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1080 if err != nil { |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
1081 log.Errorf("retry enqueue failed: %v\n", err) |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1082 } else { |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
1083 log.Infof("re-enqueued job with id %d\n", nid) |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1084 } |
5115
bb5459faadb7
Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5114
diff
changeset
|
1085 } |
bb5459faadb7
Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5114
diff
changeset
|
1086 if remove { |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1087 if err := deleteJob(ctx, idj.id); err != nil { |
5490
5f47eeea988d
Use own logging package.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5169
diff
changeset
|
1088 log.Errorf("deleting job %d failed: %v\n", idj.id, err) |
5111
90b0a14dd58b
Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
5110
diff
changeset
|
1089 } |
1708
49e047c2106e
Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1694
diff
changeset
|
1090 } |
1136
a5069da2f0b7
Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
1017
diff
changeset
|
1091 }(jc, idj) |
958
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
1092 } |
2818ad6c7d32
Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff
changeset
|
1093 } |