annotate pkg/imports/queue.go @ 5122:0b6b62d247e8 queued-stage-done

Prioritize review jobs on selection This reverts rev. 37784b70eea3 and instead moves review jobs forward in the queue when fetching the next job to be run. Also optimized index setup for filtering by state but not enqueued.
author Tom Gottfried <tom@intevation.de>
date Thu, 26 Mar 2020 14:41:23 +0100
parents 8bfc71eecde1
children eeb45e3e0a5a
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1017
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
1 // This is Free Software under GNU Affero General Public License v >= 3.0
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
2 // without warranty, see README.md and license for details.
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
3 //
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
4 // SPDX-License-Identifier: AGPL-3.0-or-later
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
5 // License-Filename: LICENSES/AGPL-3.0.txt
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
6 //
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
7 // Copyright (C) 2018 by via donau
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
8 // – Österreichische Wasserstraßen-Gesellschaft mbH
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
9 // Software engineering by Intevation GmbH
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
10 //
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
11 // Author(s):
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de>
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
13
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
14 package imports
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
15
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
16 import (
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
17 "context"
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
18 "database/sql"
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
19 "encoding/json"
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
20 "fmt"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
21 "log"
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
22 "runtime/debug"
3246
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
23 "sort"
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
24 "strings"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
25 "sync"
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
26 "time"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
27
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
28 "github.com/jackc/pgx/pgtype"
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
29
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
30 "gemma.intevation.de/gemma/pkg/auth"
2187
7c83b5277c1c Import queue: Removed boilerplate code to deserialize jobs from JSON by making it part of the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2168
diff changeset
31 "gemma.intevation.de/gemma/pkg/common"
1000
14425e35e3c2 Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 998
diff changeset
32 "gemma.intevation.de/gemma/pkg/config"
4180
91cb4a7b1b13 Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents: 4084
diff changeset
33 "gemma.intevation.de/gemma/pkg/pgxutils"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
34 )
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
35
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
36 type (
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
37 // Feedback is passed to the Do method of a Job to log
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
38 // informations, warnings or errors.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
39 Feedback interface {
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
40 // Info logs informations.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
41 Info(fmt string, args ...interface{})
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
42 // Warn logs warnings.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
43 Warn(fmt string, args ...interface{})
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
44 // Error logs errors.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
45 Error(fmt string, args ...interface{})
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
46 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
47
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
48 // UnchangedError may be issued by Do of a Job to indicate
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
49 // That the database has not changed.
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
50 UnchangedError string
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
51
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
52 // Job is the central abstraction of an import job
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
53 // run by the import queue.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
54 Job interface {
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
55 // Do is called to do the actual import.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
56 // Bind transactions to ctx and conn, please-
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
57 // id is the number of the import job.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
58 // feedback can be used to log the import process.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
59 // If no error is return the import is assumed to
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
60 // be successfull. The non-error return value is
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
61 // serialized as a JSON string into the database as
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
62 // a summary to the import to be used by the review process.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
63 Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error)
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
64 // CleanUp is called to clean up ressources hold by the import.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
65 // It is called whether the import succeeded or not.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
66 CleanUp() error
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
67 }
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
68
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
69 FeedbackJob interface {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
70 Job
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
71 CreateFeedback(int64) Feedback
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
72 }
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
73
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
74 // JobKind is the type of an import.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
75 // Choose a unique name for every import.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
76 JobKind string
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
77
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
78 // JobCreator is used to bring a job to life as it is stored
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
79 // in pure meta-data form to the database.
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
80 JobCreator interface {
5104
cb736582e8fc Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5103
diff changeset
81 // Description is the long name of the import.
cb736582e8fc Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5103
diff changeset
82 Description() string
cb736582e8fc Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5103
diff changeset
83 // Create build the actual job.
cb736582e8fc Minimize diff to default.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5103
diff changeset
84 Create() Job
3219
4acbee65275d Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2872
diff changeset
85 // Depends returns two lists of ressources locked by this type of import.
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
86 // Imports are run concurrently if they have disjoint sets
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
87 // of dependencies.
3219
4acbee65275d Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2872
diff changeset
88 // The first list are locked exclusively.
4acbee65275d Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2872
diff changeset
89 // The second allows multiple read users but only one writing one.
4acbee65275d Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2872
diff changeset
90 Depends() [2][]string
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
91 // StageDone is called if an import is positively reviewed
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
92 // (state = accepted). This can be used to finalize the imported
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
93 // data to move it e.g from the staging area.
5034
59a99655f34d Added feedback support for StageDone.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5033
diff changeset
94 StageDone(context.Context, *sql.Tx, int64, Feedback) error
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
95 // AutoAccept indicates that imports of this kind
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
96 // don't need a review.
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
97 AutoAccept() bool
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
98 }
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
99
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
100 JobRemover interface {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
101 JobCreator
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
102 RemoveJob() bool
5110
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
103 }
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
104
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
105 idJob struct {
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
106 id int64
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
107 kind JobKind
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
108 user string
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
109 waitRetry pgtype.Interval
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
110 triesLeft sql.NullInt64
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
111 sendEmail bool
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
112 data string
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
113 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
114 )
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
115
3225
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
116 const (
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
117 pollDuration = time.Second * 10
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
118 runExclusive = -66666
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
119 )
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
120
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
121 const (
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
122 ReviewJobSuffix = "#review"
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
123 reviewJobRetries = 10
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
124 reviewJobWait = time.Minute
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
125 )
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
126
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
127 type importQueue struct {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
128 signalChan chan struct{}
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
129 creatorsMu sync.Mutex
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
130 creators map[JobKind]JobCreator
3221
899914a18d7e Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3219
diff changeset
131 usedDeps map[string]int
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
132 }
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
133
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
134 var iqueue = importQueue{
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
135 signalChan: make(chan struct{}),
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
136 creators: map[JobKind]JobCreator{},
3221
899914a18d7e Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3219
diff changeset
137 usedDeps: map[string]int{},
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
138 }
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
139
1189
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
140 var (
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
141 // ImportStateNames is a list of the states a job can be in.
1189
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
142 ImportStateNames = []string{
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
143 "queued",
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
144 "running",
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
145 "failed",
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
146 "unchanged",
1189
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
147 "pending",
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
148 "accepted",
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
149 "declined",
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
150 "reviewed",
1189
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
151 }
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
152 )
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
153
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
154 const (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
155 queueUser = "sys_admin"
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
156
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
157 reEnqueueRunningSQL = `
4748
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
158 UPDATE import.imports SET
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
159 state = 'queued'::import_state,
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
160 changed = CURRENT_TIMESTAMP
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
161 WHERE state = 'running'::import_state`
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
162
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
163 insertJobSQL = `
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
164 INSERT INTO import.imports (
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
165 kind,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
166 due,
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
167 trys_left,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
168 retry_wait,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
169 username,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
170 send_email,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
171 data
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
172 ) VALUES (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
173 $1,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
174 COALESCE($2, CURRENT_TIMESTAMP),
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
175 $3,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
176 $4,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
177 $5,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
178 $6,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
179 $7
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
180 ) RETURNING id`
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
181
5122
0b6b62d247e8 Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents: 5121
diff changeset
182 // Select oldest queued job but prioritize review jobs
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
183 selectJobSQL = `
5122
0b6b62d247e8 Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents: 5121
diff changeset
184 SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `')
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
185 id,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
186 kind,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
187 trys_left,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
188 retry_wait,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
189 username,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
190 send_email,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
191 data
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
192 FROM import.imports
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
193 WHERE
1718
8ddbedf296d7 Re-scheduled imports: be a bit more tolerant about start time.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1708
diff changeset
194 due <= CURRENT_TIMESTAMP + interval '5 seconds' AND
5122
0b6b62d247e8 Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents: 5121
diff changeset
195 state = 'queued'::import_state AND
0b6b62d247e8 Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents: 5121
diff changeset
196 kind = ANY($1)
0b6b62d247e8 Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents: 5121
diff changeset
197 ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
198 LIMIT 1`
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
199
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
200 updateStateSQL = `
4748
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
201 UPDATE import.imports SET
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
202 state = $1::import_state,
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
203 changed = CURRENT_TIMESTAMP
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
204 WHERE id = $2`
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
205
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
206 updateStateSummarySQL = `
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
207 UPDATE import.imports SET
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
208 state = $1::import_state,
4748
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
209 changed = CURRENT_TIMESTAMP,
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
210 summary = $2
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
211 WHERE id = $3`
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
212
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
213 deleteJobSQL = `
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
214 DELETE FROM import.imports WHERE id = $1`
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
215
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
216 logMessageSQL = `
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
217 INSERT INTO import.import_logs (
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
218 import_id,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
219 kind,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
220 msg
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
221 ) VALUES (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
222 $1,
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
223 $2::log_type,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
224 $3
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
225 )`
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
226 )
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
227
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
228 func init() {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
229 go iqueue.importLoop()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
230 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
231
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
232 // Error makes UnchangedError an error.
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
233 func (ue UnchangedError) Error() string {
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
234 return string(ue)
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
235 }
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
236
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
237 type reviewedJobCreator struct {
5101
1b0b13e70bc1 Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5100
diff changeset
238 jobCreator JobCreator
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
239 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
240
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
241 func (*reviewedJobCreator) AutoAccept() bool {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
242 return true
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
243 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
244
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
245 func (*reviewedJobCreator) RemoveJob() bool {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
246 return true
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
247 }
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
248
5101
1b0b13e70bc1 Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5100
diff changeset
249 func (rjc *reviewedJobCreator) Depends() [2][]string {
5102
8cc5b08ffc2b End endless recursion.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5101
diff changeset
250 return rjc.jobCreator.Depends()
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
251 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
252
5101
1b0b13e70bc1 Proxy the original job creator directly and not only the dependencies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5100
diff changeset
253 func (rjc *reviewedJobCreator) Description() string {
5120
22899babe85d Catch another endless recursin call.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5119
diff changeset
254 return rjc.jobCreator.Description() + ReviewJobSuffix
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
255 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
256
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
257 func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
258 return nil
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
259 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
260
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
261 type reviewedJob struct {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
262 ID int64 `json:"id"`
5103
a11705203f3f Nmae json field in reviewed job correctly.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5102
diff changeset
263 Accepted bool `json:"accepted"`
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
264 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
265
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
266 func (*reviewedJobCreator) Create() Job {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
267 return new(reviewedJob)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
268 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
269
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
270 func (*reviewedJob) CleanUp() error { return nil }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
271
5110
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
272 func (r *reviewedJob) CreateFeedback(int64) Feedback {
5119
5f62ac3db148 Don't write "import #42 started" into imports logs and remove the supress code to prevent this in review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5117
diff changeset
273 return logFeedback(r.ID)
5110
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
274 }
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
275
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
276 func (rj *reviewedJob) Do(
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
277 ctx context.Context,
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
278 importID int64,
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
279 conn *sql.Conn,
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
280 feedback Feedback,
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
281 ) (interface{}, error) {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
282
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
283 tx, err := conn.BeginTx(ctx, nil)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
284 if err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
285 return nil, err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
286 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
287 defer tx.Rollback()
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
288
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
289 var signer string
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
290 if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
291 return nil, err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
292 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
293
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
294 var user, kind string
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
295 if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
296 return nil, err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
297 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
298
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
299 jc := FindJobCreator(JobKind(kind))
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
300 if jc == nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
301 return nil, fmt.Errorf("no job creator found for '%s'", kind)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
302 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
303
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
304 importFeedback := logFeedback(rj.ID)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
305
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
306 if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
307 userTx, err := conn.BeginTx(ctx, nil)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
308 if err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
309 return err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
310 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
311 defer userTx.Rollback()
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
312
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
313 if rj.Accepted {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
314 err = jc.StageDone(ctx, userTx, rj.ID, importFeedback)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
315 } else {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
316 _, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
317 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
318 if err == nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
319 err = userTx.Commit()
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
320 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
321 return err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
322 }); err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
323 return nil, err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
324 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
325
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
326 // Remove the import track
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
327 if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
328 return nil, err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
329 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
330
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
331 var state string
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
332 if rj.Accepted {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
333 state = "accepted"
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
334 } else {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
335 state = "declined"
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
336 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
337
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
338 if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
339 return nil, err
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
340 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
341
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
342 importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
343
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
344 return nil, tx.Commit()
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
345 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
346
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
347 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
348 q.creatorsMu.Lock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
349 defer q.creatorsMu.Unlock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
350 q.creators[kind] = jc
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
351 q.creators[kind+ReviewJobSuffix] = &reviewedJobCreator{jobCreator: jc}
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
352
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
353 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
354
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
355 // FindJobCreator looks up a JobCreator in the global import queue.
1193
58acc343b1b6 Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1190
diff changeset
356 func FindJobCreator(kind JobKind) JobCreator {
58acc343b1b6 Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1190
diff changeset
357 return iqueue.jobCreator(kind)
58acc343b1b6 Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1190
diff changeset
358 }
58acc343b1b6 Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1190
diff changeset
359
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
360 // ImportKindNames is a list of the names of the imports the
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
361 // global import queue supports.
1495
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
362 func ImportKindNames() []string {
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
363 return iqueue.importKindNames()
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
364 }
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
365
3246
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
366 // LogImportKindNames logs a list of importer types registered
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
367 // to the global import queue.
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
368 func LogImportKindNames() {
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
369 kinds := ImportKindNames()
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
370 sort.Strings(kinds)
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
371 log.Printf("info: registered import kinds: %s",
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
372 strings.Join(kinds, ", "))
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
373 }
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
374
1694
4a2fad8f57de Imports: Resolved golint issues unrelated to exported symbols commenting.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 1657
diff changeset
375 // HasImportKindName checks if the import queue supports a given kind.
1627
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
376 func HasImportKindName(kind string) bool {
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
377 return iqueue.hasImportKindName(kind)
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
378 }
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
379
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
380 //
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
381 func (q *importQueue) hasImportKindName(kind string) bool {
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
382 q.creatorsMu.Lock()
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
383 defer q.creatorsMu.Unlock()
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
384 return q.creators[JobKind(kind)] != nil
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
385 }
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
386
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
387 // RegisterJobCreator adds a JobCreator to the global import queue.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
388 // This a good candidate to be called in a init function for
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
389 // a particular JobCreator.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
390 func RegisterJobCreator(kind JobKind, jc JobCreator) {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
391 iqueue.registerJobCreator(kind, jc)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
392 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
393
1495
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
394 func (q *importQueue) importKindNames() []string {
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
395 q.creatorsMu.Lock()
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
396 defer q.creatorsMu.Unlock()
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
397 names := make([]string, len(q.creators))
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
398 var i int
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
399 for kind := range q.creators {
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
400 names[i] = string(kind)
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
401 i++
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
402 }
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
403 // XXX: Consider using sort.Strings to make output deterministic.
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
404 return names
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
405 }
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
406
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
407 func (idj *idJob) nextRetry(feedback Feedback) bool {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
408 switch {
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
409 case idj.waitRetry.Status != pgtype.Present && !idj.triesLeft.Valid:
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
410 return false
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
411 case idj.waitRetry.Status == pgtype.Present && !idj.triesLeft.Valid:
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
412 return true
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
413 case idj.triesLeft.Valid:
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
414 if idj.triesLeft.Int64 < 1 {
5114
da26076ffafe More neutral messages in retries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5113
diff changeset
415 feedback.Warn("no retries left")
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
416 } else {
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
417 idj.triesLeft.Int64--
5114
da26076ffafe More neutral messages in retries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5113
diff changeset
418 feedback.Info("failed but will retry")
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
419 return true
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
420 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
421 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
422 return false
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
423 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
424
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
425 func (idj *idJob) nextDue() time.Time {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
426 now := time.Now()
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
427 if idj.waitRetry.Status == pgtype.Present {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
428 var d time.Duration
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
429 if err := idj.waitRetry.AssignTo(&d); err != nil {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
430 log.Printf("error: converting waitRetry failed: %v\n", err)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
431 } else {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
432 now = now.Add(d)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
433 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
434 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
435 return now
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
436 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
437
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
438 func (idj *idJob) triesLeftPointer() *int {
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
439 if !idj.triesLeft.Valid {
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
440 return nil
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
441 }
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
442 t := int(idj.triesLeft.Int64)
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
443 return &t
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
444 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
445
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
446 func (idj *idJob) waitRetryPointer() *time.Duration {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
447 if idj.waitRetry.Status != pgtype.Present {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
448 return nil
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
449 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
450 d := new(time.Duration)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
451 if err := idj.waitRetry.AssignTo(d); err != nil {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
452 log.Printf("error: converting waitRetry failed: %v\n", err)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
453 return nil
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
454 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
455 return d
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
456 }
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
457
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
458 func (q *importQueue) lockDependencies(jc JobCreator) {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
459 deps := jc.Depends()
3225
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
460 q.creatorsMu.Lock()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
461 defer q.creatorsMu.Unlock()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
462 for _, d := range deps[0] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
463 q.usedDeps[d] = runExclusive
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
464 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
465 for _, d := range deps[1] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
466 q.usedDeps[d]++
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
467 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
468 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
469
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
470 func (q *importQueue) unlockDependencies(jc JobCreator) {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
471 deps := jc.Depends()
3225
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
472 q.creatorsMu.Lock()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
473 defer q.creatorsMu.Unlock()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
474 for _, d := range deps[0] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
475 q.usedDeps[d] = 0
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
476 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
477 for _, d := range deps[1] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
478 q.usedDeps[d]--
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
479 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
480 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
481
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
482 func (q *importQueue) jobCreator(kind JobKind) JobCreator {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
483 q.creatorsMu.Lock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
484 defer q.creatorsMu.Unlock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
485 return q.creators[kind]
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
486 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
487
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
488 func (q *importQueue) addJob(
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
489 kind JobKind,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
490 due time.Time,
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
491 triesLeft *int,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
492 waitRetry *time.Duration,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
493 user string,
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
494 sendEmail bool,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
495 data string,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
496 ) (int64, error) {
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
497
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
498 var id int64
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
499 if due.IsZero() {
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
500 due = time.Now()
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
501 }
4084
350a24c92848 Deliver times from import queue in UTC.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3246
diff changeset
502 due = due.UTC()
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
503
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
504 var tl sql.NullInt64
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
505 if triesLeft != nil {
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
506 tl = sql.NullInt64{Int64: int64(*triesLeft), Valid: true}
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
507 }
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
508
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
509 var wr pgtype.Interval
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
510 if waitRetry != nil {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
511 if err := wr.Set(*waitRetry); err != nil {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
512 return 0, err
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
513 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
514 } else {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
515 wr = pgtype.Interval{Status: pgtype.Null}
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
516 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
517
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
518 ctx := context.Background()
1798
40cbfd268aa9 Row level security for import jobs
Tom Gottfried <tom@intevation.de>
parents: 1760
diff changeset
519 err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
520 return conn.QueryRowContext(
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
521 ctx,
5122
0b6b62d247e8 Prioritize review jobs on selection
Tom Gottfried <tom@intevation.de>
parents: 5121
diff changeset
522 insertJobSQL,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
523 string(kind),
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
524 due,
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
525 tl,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
526 &wr,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
527 user,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
528 sendEmail,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
529 data).Scan(&id)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
530 })
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
531 if err == nil {
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
532 select {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
533 case q.signalChan <- struct{}{}:
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
534 default:
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
535 }
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
536 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
537 return id, err
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
538 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
539
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
540 // AddJob adds a job to the global import queue to be executed
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
541 // as soon as possible after due.
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
542 // This is gone in a separate Go routine
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
543 // so this will not block.
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
544 func AddJob(
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
545 kind JobKind,
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
546 due time.Time,
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
547 triesLeft *int,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
548 waitRetry *time.Duration,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
549 user string,
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
550 sendEmail bool,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
551 data string,
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
552 ) (int64, error) {
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
553 return iqueue.addJob(
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
554 kind,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
555 due,
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
556 triesLeft,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
557 waitRetry,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
558 user,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
559 sendEmail,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
560 data)
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
561 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
562
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
563 const (
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
564 isPendingSQL = `
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
565 SELECT
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
566 state = 'pending'::import_state,
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
567 kind
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
568 FROM import.imports
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
569 WHERE id = $1`
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
570
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
571 selectUserSQL = `
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
572 SELECT username from import.imports WHERE ID = $1`
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
573
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
574 selectUserKindSQL = `
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
575 SELECT username, kind from import.imports WHERE ID = $1`
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
576
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
577 reviewSQL = `
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
578 UPDATE import.imports SET
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
579 state = $1::import_state,
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
580 changed = CURRENT_TIMESTAMP,
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
581 signer = $2
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
582 WHERE id = $3`
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
583
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
584 deleteImportDataSQL = `SELECT import.del_import($1)`
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
585
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
586 deleteImportTrackSQL = `
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
587 DELETE FROM import.track_imports WHERE import_id = $1`
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
588 )
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
589
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
590 func (q *importQueue) decideImportTx(
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
591 ctx context.Context,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
592 tx *sql.Tx,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
593 id int64,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
594 accepted bool,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
595 reviewer string,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
596 ) error {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
597 var (
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
598 pending bool
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
599 kind string
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
600 )
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
601
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
602 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); {
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
603 case err == sql.ErrNoRows:
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
604 return fmt.Errorf("cannot find import #%d", id)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
605 case err != nil:
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
606 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
607 case !pending:
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
608 return fmt.Errorf("#%d is not pending", id)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
609 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
610
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
611 jc := q.jobCreator(JobKind(kind))
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
612 if jc == nil {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
613 return fmt.Errorf("no job creator for kind '%s'", kind)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
614 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
615
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
616 r := &reviewedJob{
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
617 ID: id,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
618 Accepted: accepted,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
619 }
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
620 serialized, err := common.ToJSONString(r)
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
621 if err != nil {
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
622 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
623 }
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
624
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
625 // Try a little harder to persist the decision.
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
626 tries := reviewJobRetries
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
627 wait := reviewJobWait
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
628
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
629 rID, err := q.addJob(
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
630 JobKind(kind+ReviewJobSuffix),
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
631 time.Now(),
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
632 &tries,
5109
c0ceec7e6f85 Define constants for review jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5104
diff changeset
633 &wait,
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
634 reviewer,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
635 false,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
636 serialized)
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
637 log.Printf("info: add review job %d\n", rID)
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
638 if err != nil {
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
639 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
640 }
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
641 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id)
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
642 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
643 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
644
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
645 func (q *importQueue) decideImport(
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
646 ctx context.Context,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
647 id int64,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
648 accepted bool,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
649 reviewer string,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
650 ) error {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
651 if ctx == nil {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
652 ctx = context.Background()
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
653 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
654
5033
13d9820c1ea4 Run review decisions SQLs as reviewer instead of queue user to use the rls policies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5029
diff changeset
655 return auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error {
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
656 tx, err := conn.BeginTx(ctx, nil)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
657 if err != nil {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
658 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
659 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
660 defer tx.Rollback()
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
661 err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
662 if err == nil {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
663 err = tx.Commit()
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
664 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
665 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
666 })
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
667 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
668
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
669 func DecideImport(
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
670 ctx context.Context,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
671 id int64,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
672 accepted bool,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
673 reviewer string,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
674 ) error {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
675 return iqueue.decideImport(ctx, id, accepted, reviewer)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
676 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
677
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
678 type logFeedback int64
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
679
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
680 func (lf logFeedback) log(kind, format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
681 ctx := context.Background()
1327
cabf4789e02b To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1193
diff changeset
682 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
683 _, err := conn.ExecContext(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
684 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
685 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
686 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
687 if err != nil {
1760
22148eb0f986 More on harmonizing logging.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1758
diff changeset
688 log.Printf("error: logging failed: %v\n", err)
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
689 }
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
690 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
691
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
692 func (lf logFeedback) Info(format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
693 lf.log("info", format, args...)
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
694 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
695
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
696 func (lf logFeedback) Warn(format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
697 lf.log("warn", format, args...)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
698 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
699
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
700 func (lf logFeedback) Error(format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
701 lf.log("error", format, args...)
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
702 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
703
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
704 func survive(fn func() error) func() error {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
705 return func() (err error) {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
706 defer func() {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
707 if p := recover(); p != nil {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
708 err = fmt.Errorf("%v: %s", p, string(debug.Stack()))
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
709 }
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
710 }()
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
711 return fn()
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
712 }
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
713 }
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
714
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
715 func reEnqueueRunning() error {
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
716 ctx := context.Background()
1327
cabf4789e02b To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1193
diff changeset
717 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
718 _, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
719 return err
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
720 })
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
721 }
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
722
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
723 func (q *importQueue) fetchJob() (*idJob, error) {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
724
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
725 var which []string
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
726
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
727 q.creatorsMu.Lock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
728 nextCreator:
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
729 for kind, jc := range q.creators {
3225
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
730 deps := jc.Depends()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
731 for _, d := range deps[0] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
732 if q.usedDeps[d] != 0 {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
733 continue nextCreator
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
734 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
735 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
736 for _, d := range deps[1] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
737 if q.usedDeps[d] == runExclusive {
3221
899914a18d7e Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3219
diff changeset
738 continue nextCreator
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
739 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
740 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
741 which = append(which, string(kind))
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
742 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
743 q.creatorsMu.Unlock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
744
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
745 if len(which) == 0 {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
746 return nil, sql.ErrNoRows
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
747 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
748
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
749 var kinds pgtype.TextArray
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
750 if err := kinds.Set(which); err != nil {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
751 return nil, err
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
752 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
753
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
754 var ji idJob
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
755 ctx := context.Background()
1327
cabf4789e02b To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1193
diff changeset
756 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
757 tx, err := conn.BeginTx(ctx, nil)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
758 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
759 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
760 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
761 defer tx.Rollback()
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
762 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
763 &ji.id,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
764 &ji.kind,
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
765 &ji.triesLeft,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
766 &ji.waitRetry,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
767 &ji.user,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
768 &ji.sendEmail,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
769 &ji.data,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
770 ); err != nil {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
771 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
772 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
773 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
774 if err == nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
775 err = tx.Commit()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
776 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
777 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
778 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
779 switch {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
780 case err == sql.ErrNoRows:
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
781 return nil, nil
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
782 case err != nil:
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
783 return nil, err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
784 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
785 return &ji, nil
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
786 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
787
2872
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
788 func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
789 // As it is important to keep the persistent model
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
790 // in sync with the in-memory model try harder to store
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
791 // the state.
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
792 const maxTries = 10
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
793 var sleep = time.Second
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
794
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
795 for try := 1; ; try++ {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
796 var err error
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
797 if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
798 return err
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
799 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
800 log.Printf("warn: [try %d/%d] Storing state failed: %v (try again in %s).\n",
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
801 try, maxTries, err, sleep)
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
802
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
803 time.Sleep(sleep)
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
804 if sleep < time.Minute {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
805 if sleep *= 2; sleep > time.Minute {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
806 sleep = time.Minute
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
807 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
808 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
809 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
810 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
811
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
812 func updateStateSummary(
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
813 ctx context.Context,
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
814 id int64,
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
815 state string,
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
816 summary interface{},
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
817 ) error {
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
818 var s sql.NullString
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
819 if summary != nil {
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
820 var b strings.Builder
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
821 if err := json.NewEncoder(&b).Encode(summary); err != nil {
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
822 return err
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
823 }
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
824 s = sql.NullString{String: b.String(), Valid: true}
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
825 }
2823
b150e5b37afe Import queue: As it is important to keep the in-memory and the persistent model in sync try harder to store the final state change if it fails.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2801
diff changeset
826
2872
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
827 return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
828 _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id)
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
829 return err
2872
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
830 })
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
831 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
832
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
833 func deleteJob(ctx context.Context, id int64) error {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
834 return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
835 _, err := conn.ExecContext(ctx, deleteJobSQL, id)
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
836 return err
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
837 })
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
838 }
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
839
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
840 func errorAndFail(id int64, format string, args ...interface{}) error {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
841 ctx := context.Background()
2872
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
842 return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
843 tx, err := conn.BeginTx(ctx, nil)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
844 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
845 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
846 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
847 defer tx.Rollback()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
848 _, err = conn.ExecContext(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
849 ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
850 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
851 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
852 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
853 _, err = conn.ExecContext(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
854 ctx, updateStateSQL, "failed", id)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
855 if err == nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
856 err = tx.Commit()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
857 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
858 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
859 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
860 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
861
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
862 func (q *importQueue) importLoop() {
1000
14425e35e3c2 Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 998
diff changeset
863 config.WaitReady()
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
864 // re-enqueue the jobs that are in state running.
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
865 // They where in progess when the server went down.
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
866 if err := reEnqueueRunning(); err != nil {
1751
c3a6aaf926c3 Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1740
diff changeset
867 log.Printf("error: re-enqueuing failed: %v", err)
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
868 }
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
869
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
870 for {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
871 var idj *idJob
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
872 var err error
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
873
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
874 for {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
875 if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows {
1751
c3a6aaf926c3 Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1740
diff changeset
876 log.Printf("error: db: %v\n", err)
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
877 }
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
878 if idj != nil {
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
879 break
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
880 }
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
881 select {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
882 case <-q.signalChan:
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
883 case <-time.After(pollDuration):
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
884 }
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
885 }
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
886
1751
c3a6aaf926c3 Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1740
diff changeset
887 log.Printf("info: starting import #%d\n", idj.id)
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
888
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
889 jc := q.jobCreator(idj.kind)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
890 if jc == nil {
1010
8f23ec811afb Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1005
diff changeset
891 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
892 continue
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
893 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
894
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
895 // Lock dependencies.
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
896 q.lockDependencies(jc)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
897
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
898 go func(jc JobCreator, idj *idJob) {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
899
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
900 // Unlock the dependencies.
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
901 defer func() {
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
902 q.unlockDependencies(jc)
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
903 select {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
904 case q.signalChan <- struct{}{}:
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
905 default:
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
906 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
907 }()
1010
8f23ec811afb Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1005
diff changeset
908
5100
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
909 job := jc.Create()
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
910 if err := common.FromJSONString(idj.data, job); err != nil {
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
911 errorAndFail(idj.id, "failed to create job for import #%d: %v",
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
912 idj.id, err)
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
913 return
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
914 }
d3a24152b0be Register a kind#rev job factory for any job factory.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5099
diff changeset
915
5110
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
916 var feedback Feedback
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
917 if fc, ok := job.(FeedbackJob); ok {
5110
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
918 feedback = fc.CreateFeedback(idj.id)
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
919 } else {
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
920 feedback = logFeedback(idj.id)
4dc2e6dc6c7d Redirect logging of review job to original import log.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5109
diff changeset
921 }
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
922
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
923 ctx := context.Background()
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
924 var summary interface{}
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
925
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
926 errDo := survive(func() error {
1327
cabf4789e02b To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1193
diff changeset
927 return auth.RunAs(ctx, idj.user,
1168
930fdd8b474f Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1138
diff changeset
928 func(conn *sql.Conn) error {
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
929 var err error
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
930 summary, err = job.Do(ctx, idj.id, conn, feedback)
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
931 return err
1168
930fdd8b474f Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1138
diff changeset
932 })
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
933 })()
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
934
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
935 var unchanged, retry bool
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
936 if v, ok := errDo.(UnchangedError); ok {
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
937 feedback.Info("unchanged: %s", v.Error())
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
938 unchanged = true
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
939 } else if errDo != nil {
4180
91cb4a7b1b13 Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents: 4084
diff changeset
940 feedback.Error("error in import: %v",
91cb4a7b1b13 Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents: 4084
diff changeset
941 pgxutils.ReadableError{Err: errDo})
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
942 retry = idj.nextRetry(feedback)
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
943 }
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
944
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
945 var errCleanup error
2801
054f5d61452d Import queue: Cleanup up debris after an import in all cases if we are not going to retry it.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2187
diff changeset
946 if !retry { // cleanup debris
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
947 if errCleanup = survive(job.CleanUp)(); errCleanup != nil {
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
948 feedback.Error("error cleanup: %v", errCleanup)
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
949 }
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
950 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
951
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
952 var remove bool
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
953 if remover, ok := jc.(JobRemover); ok {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
954 remove = remover.RemoveJob()
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
955 }
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
956
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
957 var state string
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
958 switch {
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
959 case unchanged:
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
960 state = "unchanged"
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
961 case errDo != nil || errCleanup != nil:
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
962 state = "failed"
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
963 case jc.AutoAccept():
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
964 state = "accepted"
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
965 default:
1190
e3de65179889 The imort queue has now six states:
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1189
diff changeset
966 state = "pending"
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
967 }
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
968 if !remove {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
969 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
970 log.Printf("error: setting state of job %d failed: %v\n", idj.id, err)
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
971 }
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
972 log.Printf("info: import #%d finished: %s\n", idj.id, state)
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
973 }
1646
a0982c38eac0 Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1642
diff changeset
974 if idj.sendEmail {
a0982c38eac0 Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1642
diff changeset
975 go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
a0982c38eac0 Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1642
diff changeset
976 }
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
977
5115
bb5459faadb7 Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5114
diff changeset
978 if retry {
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
979 nid, err := q.addJob(
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
980 idj.kind,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
981 idj.nextDue(),
5113
d036ad682013 Spell trys as tries.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5112
diff changeset
982 idj.triesLeftPointer(),
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
983 idj.waitRetryPointer(),
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
984 idj.user, idj.sendEmail,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
985 idj.data)
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
986 if err != nil {
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
987 log.Printf("error: retry enqueue failed: %v\n", err)
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
988 } else {
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
989 log.Printf("info: re-enqueued job with id %d\n", nid)
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
990 }
5115
bb5459faadb7 Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5114
diff changeset
991 }
bb5459faadb7 Dont leave old jobs behind if retrying remove jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5114
diff changeset
992 if remove {
5111
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
993 if err := deleteJob(ctx, idj.id); err != nil {
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
994 log.Printf("error: deleting job %d failed: %v\n", idj.id, err)
90b0a14dd58b Enable jobs to be removed by the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5110
diff changeset
995 }
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
996 }
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
997 }(jc, idj)
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
998 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
999 }