annotate pkg/imports/queue.go @ 5099:3cd736acbad3 queued-stage-done

First version of a reviewed job. I bet it does not work.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 24 Mar 2020 15:46:37 +0100
parents 59a99655f34d
children d3a24152b0be
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1017
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
1 // This is Free Software under GNU Affero General Public License v >= 3.0
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
2 // without warranty, see README.md and license for details.
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
3 //
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
4 // SPDX-License-Identifier: AGPL-3.0-or-later
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
5 // License-Filename: LICENSES/AGPL-3.0.txt
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
6 //
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
7 // Copyright (C) 2018 by via donau
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
8 // – Österreichische Wasserstraßen-Gesellschaft mbH
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
9 // Software engineering by Intevation GmbH
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
10 //
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
11 // Author(s):
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de>
a244b18cb916 Added GNU Affero General Public License.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1010
diff changeset
13
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
14 package imports
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
15
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
16 import (
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
17 "context"
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
18 "database/sql"
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
19 "encoding/json"
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
20 "fmt"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
21 "log"
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
22 "runtime/debug"
3246
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
23 "sort"
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
24 "strings"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
25 "sync"
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
26 "time"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
27
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
28 "github.com/jackc/pgx/pgtype"
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
29
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
30 "gemma.intevation.de/gemma/pkg/auth"
2187
7c83b5277c1c Import queue: Removed boilerplate code to deserialize jobs from JSON by making it part of the import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2168
diff changeset
31 "gemma.intevation.de/gemma/pkg/common"
1000
14425e35e3c2 Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 998
diff changeset
32 "gemma.intevation.de/gemma/pkg/config"
4180
91cb4a7b1b13 Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents: 4084
diff changeset
33 "gemma.intevation.de/gemma/pkg/pgxutils"
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
34 )
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
35
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
36 type (
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
37 // Feedback is passed to the Do method of a Job to log
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
38 // informations, warnings or errors.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
39 Feedback interface {
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
40 // Info logs informations.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
41 Info(fmt string, args ...interface{})
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
42 // Warn logs warnings.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
43 Warn(fmt string, args ...interface{})
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
44 // Error logs errors.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
45 Error(fmt string, args ...interface{})
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
46 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
47
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
48 // UnchangedError may be issued by Do of a Job to indicate
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
49 // That the database has not changed.
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
50 UnchangedError string
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
51
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
52 // Job is the central abstraction of an import job
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
53 // run by the import queue.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
54 Job interface {
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
55 // Do is called to do the actual import.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
56 // Bind transactions to ctx and conn, please-
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
57 // id is the number of the import job.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
58 // feedback can be used to log the import process.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
59 // If no error is return the import is assumed to
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
60 // be successfull. The non-error return value is
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
61 // serialized as a JSON string into the database as
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
62 // a summary to the import to be used by the review process.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
63 Do(ctx context.Context, id int64, conn *sql.Conn, feedback Feedback) (interface{}, error)
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
64 // CleanUp is called to clean up ressources hold by the import.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
65 // It is called whether the import succeeded or not.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
66 CleanUp() error
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
67 }
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
68
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
69 // JobKind is the type of an import.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
70 // Choose a unique name for every import.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
71 JobKind string
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
72
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
73 Dependencies interface {
3219
4acbee65275d Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2872
diff changeset
74 // Depends returns two lists of ressources locked by this type of import.
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
75 // Imports are run concurrently if they have disjoint sets
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
76 // of dependencies.
3219
4acbee65275d Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2872
diff changeset
77 // The first list are locked exclusively.
4acbee65275d Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2872
diff changeset
78 // The second allows multiple read users but only one writing one.
4acbee65275d Import queue: Split locked dependencies in exclusively and multiple uses.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2872
diff changeset
79 Depends() [2][]string
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
80 }
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
81
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
82 // JobCreator is used to bring a job to life as it is stored
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
83 // in pure meta-data form to the database.
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
84 JobCreator interface {
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
85 Dependencies
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
86 // Description is the long name of the import.
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
87 Description() string
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
88 // Create build the actual job.
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
89 Create() Job
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
90 // StageDone is called if an import is positively reviewed
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
91 // (state = accepted). This can be used to finalize the imported
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
92 // data to move it e.g from the staging area.
5034
59a99655f34d Added feedback support for StageDone.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5033
diff changeset
93 StageDone(context.Context, *sql.Tx, int64, Feedback) error
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
94 // AutoAccept indicates that imports of this kind
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
95 // don't need a review.
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
96 AutoAccept() bool
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
97 }
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
98
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
99 idJob struct {
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
100 id int64
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
101 kind JobKind
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
102 user string
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
103 waitRetry pgtype.Interval
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
104 trysLeft sql.NullInt64
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
105 sendEmail bool
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
106 data string
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
107 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
108 )
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
109
3225
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
110 const (
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
111 pollDuration = time.Second * 10
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
112 runExclusive = -66666
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
113 )
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
114
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
115 type importQueue struct {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
116 signalChan chan struct{}
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
117 creatorsMu sync.Mutex
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
118 creators map[JobKind]JobCreator
3221
899914a18d7e Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3219
diff changeset
119 usedDeps map[string]int
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
120 }
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
121
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
122 var iqueue = importQueue{
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
123 signalChan: make(chan struct{}),
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
124 creators: map[JobKind]JobCreator{},
3221
899914a18d7e Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3219
diff changeset
125 usedDeps: map[string]int{},
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
126 }
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
127
1189
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
128 var (
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
129 // ImportStateNames is a list of the states a job can be in.
1189
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
130 ImportStateNames = []string{
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
131 "queued",
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
132 "running",
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
133 "failed",
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
134 "unchanged",
1189
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
135 "pending",
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
136 "accepted",
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
137 "declined",
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
138 "reviewed",
1189
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
139 }
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
140 )
3d50f558870c REST GET call to /imports now has the ability to be filtered by kinds or states.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1168
diff changeset
141
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
142 const (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
143 queueUser = "sys_admin"
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
144
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
145 reEnqueueRunningSQL = `
4748
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
146 UPDATE import.imports SET
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
147 state = 'queued'::import_state,
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
148 changed = CURRENT_TIMESTAMP
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
149 WHERE state = 'running'::import_state`
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
150
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
151 insertJobSQL = `
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
152 INSERT INTO import.imports (
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
153 kind,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
154 due,
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
155 trys_left,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
156 retry_wait,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
157 username,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
158 send_email,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
159 data
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
160 ) VALUES (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
161 $1,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
162 COALESCE($2, CURRENT_TIMESTAMP),
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
163 $3,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
164 $4,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
165 $5,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
166 $6,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
167 $7
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
168 ) RETURNING id`
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
169
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
170 selectJobSQL = `
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
171 SELECT
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
172 id,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
173 kind,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
174 trys_left,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
175 retry_wait,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
176 username,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
177 send_email,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
178 data
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
179 FROM import.imports
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
180 WHERE
1718
8ddbedf296d7 Re-scheduled imports: be a bit more tolerant about start time.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1708
diff changeset
181 due <= CURRENT_TIMESTAMP + interval '5 seconds' AND
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
182 state = 'queued'::import_state AND enqueued IN (
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
183 SELECT min(enqueued)
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
184 FROM import.imports
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
185 WHERE state = 'queued'::import_state AND
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
186 kind = ANY($1))
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
187 LIMIT 1`
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
188
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
189 updateStateSQL = `
4748
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
190 UPDATE import.imports SET
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
191 state = $1::import_state,
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
192 changed = CURRENT_TIMESTAMP
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
193 WHERE id = $2`
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
194
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
195 updateStateSummarySQL = `
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
196 UPDATE import.imports SET
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
197 state = $1::import_state,
4748
47922c1a088d Added a 'changed' column to the import.imports table.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4180
diff changeset
198 changed = CURRENT_TIMESTAMP,
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
199 summary = $2
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
200 WHERE id = $3`
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
201
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
202 logMessageSQL = `
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
203 INSERT INTO import.import_logs (
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
204 import_id,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
205 kind,
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
206 msg
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
207 ) VALUES (
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
208 $1,
1995
59055c8301df Move import queue to its own database namespace
Tom Gottfried <tom@intevation.de>
parents: 1985
diff changeset
209 $2::log_type,
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
210 $3
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
211 )`
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
212 )
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
213
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
214 func init() {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
215 go iqueue.importLoop()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
216 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
217
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
218 // Error makes UnchangedError an error.
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
219 func (ue UnchangedError) Error() string {
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
220 return string(ue)
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
221 }
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
222
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
223 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
224 q.creatorsMu.Lock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
225 defer q.creatorsMu.Unlock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
226 q.creators[kind] = jc
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
227 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
228
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
229 // FindJobCreator looks up a JobCreator in the global import queue.
1193
58acc343b1b6 Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1190
diff changeset
230 func FindJobCreator(kind JobKind) JobCreator {
58acc343b1b6 Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1190
diff changeset
231 return iqueue.jobCreator(kind)
58acc343b1b6 Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1190
diff changeset
232 }
58acc343b1b6 Implemented the db stuff of the review process. Needs testing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1190
diff changeset
233
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
234 // ImportKindNames is a list of the names of the imports the
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
235 // global import queue supports.
1495
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
236 func ImportKindNames() []string {
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
237 return iqueue.importKindNames()
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
238 }
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
239
3246
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
240 // LogImportKindNames logs a list of importer types registered
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
241 // to the global import queue.
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
242 func LogImportKindNames() {
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
243 kinds := ImportKindNames()
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
244 sort.Strings(kinds)
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
245 log.Printf("info: registered import kinds: %s",
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
246 strings.Join(kinds, ", "))
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
247 }
64324aaeb1fb Made logging of waht is registered to the scheduler and the import queue more deterministic.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3225
diff changeset
248
1694
4a2fad8f57de Imports: Resolved golint issues unrelated to exported symbols commenting.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 1657
diff changeset
249 // HasImportKindName checks if the import queue supports a given kind.
1627
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
250 func HasImportKindName(kind string) bool {
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
251 return iqueue.hasImportKindName(kind)
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
252 }
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
253
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
254 //
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
255 func (q *importQueue) hasImportKindName(kind string) bool {
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
256 q.creatorsMu.Lock()
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
257 defer q.creatorsMu.Unlock()
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
258 return q.creators[JobKind(kind)] != nil
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
259 }
b10aa02d7819 Refactored: Moved REST /api/imports/scheduler to /api/imports/config
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1497
diff changeset
260
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
261 // RegisterJobCreator adds a JobCreator to the global import queue.
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
262 // This a good candidate to be called in a init function for
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
263 // a particular JobCreator.
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
264 func RegisterJobCreator(kind JobKind, jc JobCreator) {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
265 iqueue.registerJobCreator(kind, jc)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
266 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
267
1495
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
268 func (q *importQueue) importKindNames() []string {
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
269 q.creatorsMu.Lock()
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
270 defer q.creatorsMu.Unlock()
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
271 names := make([]string, len(q.creators))
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
272 var i int
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
273 for kind := range q.creators {
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
274 names[i] = string(kind)
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
275 i++
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
276 }
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
277 // XXX: Consider using sort.Strings to make output deterministic.
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
278 return names
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
279 }
d26e3e1fcff1 The global import queue already knows which kinds of imports it supports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1409
diff changeset
280
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
281 func (idj *idJob) nextRetry(feedback Feedback) bool {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
282 switch {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
283 case idj.waitRetry.Status != pgtype.Present && !idj.trysLeft.Valid:
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
284 return false
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
285 case idj.waitRetry.Status == pgtype.Present && !idj.trysLeft.Valid:
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
286 return true
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
287 case idj.trysLeft.Valid:
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
288 if idj.trysLeft.Int64 < 1 {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
289 feedback.Warn("import should be retried, but no retrys left")
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
290 } else {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
291 idj.trysLeft.Int64--
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
292 feedback.Info("import failed but will be retried")
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
293 return true
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
294 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
295 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
296 return false
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
297 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
298
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
299 func (idj *idJob) nextDue() time.Time {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
300 now := time.Now()
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
301 if idj.waitRetry.Status == pgtype.Present {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
302 var d time.Duration
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
303 if err := idj.waitRetry.AssignTo(&d); err != nil {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
304 log.Printf("error: converting waitRetry failed: %v\n", err)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
305 } else {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
306 now = now.Add(d)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
307 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
308 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
309 return now
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
310 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
311
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
312 func (idj *idJob) trysLeftPointer() *int {
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
313 if !idj.trysLeft.Valid {
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
314 return nil
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
315 }
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
316 t := int(idj.trysLeft.Int64)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
317 return &t
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
318 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
319
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
320 func (idj *idJob) waitRetryPointer() *time.Duration {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
321 if idj.waitRetry.Status != pgtype.Present {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
322 return nil
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
323 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
324 d := new(time.Duration)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
325 if err := idj.waitRetry.AssignTo(d); err != nil {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
326 log.Printf("error: converting waitRetry failed: %v\n", err)
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
327 return nil
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
328 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
329 return d
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
330 }
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
331
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
332 func (q *importQueue) lockDependencies(d Dependencies) {
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
333 deps := d.Depends()
3225
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
334 q.creatorsMu.Lock()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
335 defer q.creatorsMu.Unlock()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
336 for _, d := range deps[0] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
337 q.usedDeps[d] = runExclusive
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
338 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
339 for _, d := range deps[1] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
340 q.usedDeps[d]++
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
341 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
342 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
343
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
344 func (q *importQueue) unlockDependencies(d Dependencies) {
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
345 deps := d.Depends()
3225
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
346 q.creatorsMu.Lock()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
347 defer q.creatorsMu.Unlock()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
348 for _, d := range deps[0] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
349 q.usedDeps[d] = 0
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
350 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
351 for _, d := range deps[1] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
352 q.usedDeps[d]--
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
353 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
354 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
355
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
356 func (q *importQueue) jobCreator(kind JobKind) JobCreator {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
357 q.creatorsMu.Lock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
358 defer q.creatorsMu.Unlock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
359 return q.creators[kind]
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
360 }
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
361
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
362 func (q *importQueue) addJob(
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
363 kind JobKind,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
364 due time.Time,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
365 trysLeft *int,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
366 waitRetry *time.Duration,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
367 user string,
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
368 sendEmail bool,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
369 data string,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
370 ) (int64, error) {
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
371
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
372 var id int64
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
373 if due.IsZero() {
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
374 due = time.Now()
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
375 }
4084
350a24c92848 Deliver times from import queue in UTC.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3246
diff changeset
376 due = due.UTC()
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
377
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
378 var tl sql.NullInt64
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
379 if trysLeft != nil {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
380 tl = sql.NullInt64{Int64: int64(*trysLeft), Valid: true}
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
381 }
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
382
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
383 var wr pgtype.Interval
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
384 if waitRetry != nil {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
385 if err := wr.Set(*waitRetry); err != nil {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
386 return 0, err
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
387 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
388 } else {
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
389 wr = pgtype.Interval{Status: pgtype.Null}
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
390 }
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
391
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
392 ctx := context.Background()
1798
40cbfd268aa9 Row level security for import jobs
Tom Gottfried <tom@intevation.de>
parents: 1760
diff changeset
393 err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
394 return conn.QueryRowContext(
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
395 ctx,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
396 insertJobSQL,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
397 string(kind),
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
398 due,
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
399 tl,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
400 &wr,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
401 user,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
402 sendEmail,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
403 data).Scan(&id)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
404 })
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
405 if err == nil {
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
406 select {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
407 case q.signalChan <- struct{}{}:
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
408 default:
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
409 }
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
410 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
411 return id, err
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
412 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
413
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
414 // AddJob adds a job to the global import queue to be executed
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
415 // as soon as possible after due.
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
416 // This is gone in a separate Go routine
1497
b41ad15cc55f Backend: Documented the internal API of the global import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1495
diff changeset
417 // so this will not block.
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
418 func AddJob(
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
419 kind JobKind,
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
420 due time.Time,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
421 trysLeft *int,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
422 waitRetry *time.Duration,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
423 user string,
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
424 sendEmail bool,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
425 data string,
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
426 ) (int64, error) {
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
427 return iqueue.addJob(
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
428 kind,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
429 due,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
430 trysLeft,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
431 waitRetry,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
432 user,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
433 sendEmail,
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
434 data)
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
435 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
436
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
437 const (
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
438 isPendingSQL = `
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
439 SELECT
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
440 state = 'pending'::import_state,
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
441 kind
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
442 FROM import.imports
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
443 WHERE id = $1`
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
444 )
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
445
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
446 func (q *importQueue) decideImportTx(
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
447 ctx context.Context,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
448 tx *sql.Tx,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
449 id int64,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
450 accepted bool,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
451 reviewer string,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
452 ) error {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
453 var (
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
454 pending bool
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
455 kind string
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
456 )
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
457
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
458 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); {
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
459 case err == sql.ErrNoRows:
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
460 return fmt.Errorf("cannot find import #%d", id)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
461 case err != nil:
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
462 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
463 case !pending:
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
464 return fmt.Errorf("#%d is not pending", id)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
465 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
466
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
467 jc := q.jobCreator(JobKind(kind))
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
468 if jc == nil {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
469 return fmt.Errorf("no job creator for kind '%s'", kind)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
470 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
471
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
472 r := &reviewed{
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
473 ID: id,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
474 Accepted: accepted,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
475 }
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
476 serialized, err := common.ToJSONString(r)
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
477 if err != nil {
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
478 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
479 }
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
480 rID, err := q.addJob(
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
481 reviewedJobKind,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
482 time.Now(),
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
483 nil,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
484 nil,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
485 reviewer,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
486 false,
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
487 serialized)
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
488 log.Printf("info: add review job %d\n", rID)
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
489 if err != nil {
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
490 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
491 }
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
492 _, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id)
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
493 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
494 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
495
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
496 func (q *importQueue) decideImport(
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
497 ctx context.Context,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
498 id int64,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
499 accepted bool,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
500 reviewer string,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
501 ) error {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
502 if ctx == nil {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
503 ctx = context.Background()
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
504 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
505
5033
13d9820c1ea4 Run review decisions SQLs as reviewer instead of queue user to use the rls policies.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5029
diff changeset
506 return auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error {
5028
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
507 tx, err := conn.BeginTx(ctx, nil)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
508 if err != nil {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
509 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
510 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
511 defer tx.Rollback()
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
512 err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
513 if err == nil {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
514 err = tx.Commit()
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
515 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
516 return err
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
517 })
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
518 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
519
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
520 func DecideImport(
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
521 ctx context.Context,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
522 id int64,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
523 accepted bool,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
524 reviewer string,
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
525 ) error {
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
526 return iqueue.decideImport(ctx, id, accepted, reviewer)
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
527 }
d727641911a5 Moved import desision logic to import queue (where it belongs).
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 4748
diff changeset
528
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
529 type logFeedback int64
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
530
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
531 func (lf logFeedback) log(kind, format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
532 ctx := context.Background()
1327
cabf4789e02b To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1193
diff changeset
533 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
534 _, err := conn.ExecContext(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
535 ctx, logMessageSQL, int64(lf), kind, fmt.Sprintf(format, args...))
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
536 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
537 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
538 if err != nil {
1760
22148eb0f986 More on harmonizing logging.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1758
diff changeset
539 log.Printf("error: logging failed: %v\n", err)
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
540 }
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
541 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
542
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
543 func (lf logFeedback) Info(format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
544 lf.log("info", format, args...)
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
545 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
546
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
547 func (lf logFeedback) Warn(format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
548 lf.log("warn", format, args...)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
549 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
550
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
551 func (lf logFeedback) Error(format string, args ...interface{}) {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
552 lf.log("error", format, args...)
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
553 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
554
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
555 func survive(fn func() error) func() error {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
556 return func() (err error) {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
557 defer func() {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
558 if p := recover(); p != nil {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
559 err = fmt.Errorf("%v: %s", p, string(debug.Stack()))
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
560 }
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
561 }()
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
562 return fn()
992
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
563 }
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
564 }
a978b2b26a88 Run do and cleanup of import jobs in own go routines with crash handler.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 991
diff changeset
565
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
566 func reEnqueueRunning() error {
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
567 ctx := context.Background()
1327
cabf4789e02b To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1193
diff changeset
568 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
569 _, err := conn.ExecContext(ctx, reEnqueueRunningSQL)
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
570 return err
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
571 })
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
572 }
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
573
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
574 func (q *importQueue) fetchJob() (*idJob, error) {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
575
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
576 var which []string
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
577
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
578 q.creatorsMu.Lock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
579 nextCreator:
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
580 for kind, jc := range q.creators {
3225
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
581 deps := jc.Depends()
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
582 for _, d := range deps[0] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
583 if q.usedDeps[d] != 0 {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
584 continue nextCreator
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
585 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
586 }
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
587 for _, d := range deps[1] {
1a0985083c06 Import queue: Fixed exclusive writers.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3221
diff changeset
588 if q.usedDeps[d] == runExclusive {
3221
899914a18d7e Import queue: Implemented multiple reader / one writer strategy when locking ressources.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 3219
diff changeset
589 continue nextCreator
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
590 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
591 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
592 which = append(which, string(kind))
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
593 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
594 q.creatorsMu.Unlock()
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
595
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
596 if len(which) == 0 {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
597 return nil, sql.ErrNoRows
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
598 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
599
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
600 var kinds pgtype.TextArray
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
601 if err := kinds.Set(which); err != nil {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
602 return nil, err
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
603 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
604
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
605 var ji idJob
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
606 ctx := context.Background()
1327
cabf4789e02b To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1193
diff changeset
607 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
608 tx, err := conn.BeginTx(ctx, nil)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
609 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
610 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
611 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
612 defer tx.Rollback()
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
613 if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
614 &ji.id,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
615 &ji.kind,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
616 &ji.trysLeft,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
617 &ji.waitRetry,
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
618 &ji.user,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
619 &ji.sendEmail,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
620 &ji.data,
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
621 ); err != nil {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
622 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
623 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
624 _, err = tx.ExecContext(ctx, updateStateSQL, "running", ji.id)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
625 if err == nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
626 err = tx.Commit()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
627 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
628 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
629 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
630 switch {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
631 case err == sql.ErrNoRows:
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
632 return nil, nil
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
633 case err != nil:
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
634 return nil, err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
635 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
636 return &ji, nil
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
637 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
638
2872
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
639 func tryHardToStoreState(ctx context.Context, fn func(*sql.Conn) error) error {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
640 // As it is important to keep the persistent model
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
641 // in sync with the in-memory model try harder to store
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
642 // the state.
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
643 const maxTries = 10
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
644 var sleep = time.Second
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
645
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
646 for try := 1; ; try++ {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
647 var err error
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
648 if err = auth.RunAs(ctx, queueUser, fn); err == nil || try == maxTries {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
649 return err
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
650 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
651 log.Printf("warn: [try %d/%d] Storing state failed: %v (try again in %s).\n",
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
652 try, maxTries, err, sleep)
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
653
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
654 time.Sleep(sleep)
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
655 if sleep < time.Minute {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
656 if sleep *= 2; sleep > time.Minute {
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
657 sleep = time.Minute
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
658 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
659 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
660 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
661 }
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
662
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
663 func updateStateSummary(
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
664 ctx context.Context,
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
665 id int64,
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
666 state string,
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
667 summary interface{},
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
668 ) error {
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
669 var s sql.NullString
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
670 if summary != nil {
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
671 var b strings.Builder
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
672 if err := json.NewEncoder(&b).Encode(summary); err != nil {
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
673 return err
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
674 }
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
675 s = sql.NullString{String: b.String(), Valid: true}
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
676 }
2823
b150e5b37afe Import queue: As it is important to keep the in-memory and the persistent model in sync try harder to store the final state change if it fails.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2801
diff changeset
677
2872
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
678 return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
679 _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id)
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
680 return err
2872
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
681 })
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
682 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
683
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
684 func errorAndFail(id int64, format string, args ...interface{}) error {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
685 ctx := context.Background()
2872
bfea3f80ca9a Import queue: Extend the idea of changeset 2886:b150e5b37afe to some error cases that might happen before the storing of the after import state.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2836
diff changeset
686 return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
687 tx, err := conn.BeginTx(ctx, nil)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
688 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
689 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
690 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
691 defer tx.Rollback()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
692 _, err = conn.ExecContext(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
693 ctx, logMessageSQL, id, "error", fmt.Sprintf(format, args...))
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
694 if err != nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
695 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
696 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
697 _, err = conn.ExecContext(
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
698 ctx, updateStateSQL, "failed", id)
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
699 if err == nil {
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
700 err = tx.Commit()
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
701 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
702 return err
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
703 })
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
704 }
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
705
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
706 func (q *importQueue) importLoop() {
1000
14425e35e3c2 Wait with start of import queue until configuration is fully loaded.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 998
diff changeset
707 config.WaitReady()
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
708 // re-enqueue the jobs that are in state running.
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
709 // They where in progess when the server went down.
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
710 if err := reEnqueueRunning(); err != nil {
1751
c3a6aaf926c3 Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1740
diff changeset
711 log.Printf("error: re-enqueuing failed: %v", err)
1005
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
712 }
fcf016ebdef4 Re-enqueue import jobs in state running if the the gemma server starts.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1003
diff changeset
713
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
714 for {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
715 var idj *idJob
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
716 var err error
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
717
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
718 for {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
719 if idj, err = q.fetchJob(); err != nil && err != sql.ErrNoRows {
1751
c3a6aaf926c3 Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1740
diff changeset
720 log.Printf("error: db: %v\n", err)
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
721 }
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
722 if idj != nil {
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
723 break
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
724 }
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
725 select {
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
726 case <-q.signalChan:
1003
d789f19877f4 Do not rely on internal gemma state only. Instead poll every 10 seconds for new import jobs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1000
diff changeset
727 case <-time.After(pollDuration):
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
728 }
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
729 }
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
730
1751
c3a6aaf926c3 Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1740
diff changeset
731 log.Printf("info: starting import #%d\n", idj.id)
987
3841509f6e9e Store job id alongside to job in job queue.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 979
diff changeset
732
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
733 jc := q.jobCreator(idj.kind)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
734 if jc == nil {
1010
8f23ec811afb Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1005
diff changeset
735 errorAndFail(idj.id, "no creator for kind '%s' found", idj.kind)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
736 continue
988
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
737 }
7dfd3db94e6d In preparation of persisting import jobs logging is done through an interface.
Sascha L. Teichmann <teichmann@intevation.de>
parents: 987
diff changeset
738
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
739 job := jc.Create()
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
740 if err := common.FromJSONString(idj.data, job); err != nil {
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
741 errorAndFail(idj.id, "failed to create job for import #%d: %v",
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
742 idj.id, err)
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
743 continue
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
744 }
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
745
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
746 var dependencies Dependencies
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
747 if deps, ok := job.(Dependencies); ok {
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
748 dependencies = deps
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
749 } else {
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
750 dependencies = jc
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
751 }
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
752
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
753 // Lock dependencies.
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
754 q.lockDependencies(dependencies)
991
a301d240905f Decoupled import job creation and job execution with a factory function.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 989
diff changeset
755
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
756 go func(jc JobCreator, idj *idJob) {
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
757
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
758 // Unlock the dependencies.
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
759 defer func() {
5099
3cd736acbad3 First version of a reviewed job. I bet it does not work.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5034
diff changeset
760 q.unlockDependencies(dependencies)
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
761 select {
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
762 case q.signalChan <- struct{}{}:
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
763 default:
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
764 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
765 }()
1010
8f23ec811afb Fixed and harmonized wording in importer queue a bit.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1005
diff changeset
766
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
767 feedback := logFeedback(idj.id)
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
768
1138
443fc80a315f Don't issue new lines at end of log messages when importing.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1136
diff changeset
769 feedback.Info("import #%d started", idj.id)
998
75e65599ea52 Persist job queue in database. WIP.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 992
diff changeset
770
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
771 ctx := context.Background()
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
772 var summary interface{}
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
773
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
774 errDo := survive(func() error {
1327
cabf4789e02b To make golint happier made context.Context to be the first argument of auth.RunAs.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1193
diff changeset
775 return auth.RunAs(ctx, idj.user,
1168
930fdd8b474f Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1138
diff changeset
776 func(conn *sql.Conn) error {
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
777 var err error
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
778 summary, err = job.Do(ctx, idj.id, conn, feedback)
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
779 return err
1168
930fdd8b474f Track successfull imports in a separate table to be able to remove them later.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1138
diff changeset
780 })
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
781 })()
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
782
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
783 var unchanged, retry bool
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
784 if v, ok := errDo.(UnchangedError); ok {
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
785 feedback.Info("unchanged: %s", v.Error())
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
786 unchanged = true
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
787 } else if errDo != nil {
4180
91cb4a7b1b13 Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents: 4084
diff changeset
788 feedback.Error("error in import: %v",
91cb4a7b1b13 Always try to translate to readable error if import failed
Tom Gottfried <tom@intevation.de>
parents: 4084
diff changeset
789 pgxutils.ReadableError{Err: errDo})
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
790 retry = idj.nextRetry(feedback)
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
791 }
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
792
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
793 var errCleanup error
2801
054f5d61452d Import queue: Cleanup up debris after an import in all cases if we are not going to retry it.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 2187
diff changeset
794 if !retry { // cleanup debris
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
795 if errCleanup = survive(job.CleanUp)(); errCleanup != nil {
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
796 feedback.Error("error cleanup: %v", errCleanup)
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
797 }
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
798 }
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
799
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
800 var state string
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
801 switch {
1975
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
802 case unchanged:
d966f03ea819 Imports: Added the new state 'unchanged' which can be issued by the imports to indicate that the database is not modified by the particular imports.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1798
diff changeset
803 state = "unchanged"
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
804 case errDo != nil || errCleanup != nil:
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
805 state = "failed"
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
806 case jc.AutoAccept():
1642
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
807 state = "accepted"
49c04bb64e0a Import queue: Implemented auto-accept and email sending.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1627
diff changeset
808 default:
1190
e3de65179889 The imort queue has now six states:
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1189
diff changeset
809 state = "pending"
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
810 }
1392
0e1d89241cda Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1328
diff changeset
811 if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
1751
c3a6aaf926c3 Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1740
diff changeset
812 log.Printf("error: setting state of job %d failed: %v\n", idj.id, err)
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
813 }
1751
c3a6aaf926c3 Import queue: Harmonized logging output.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1740
diff changeset
814 log.Printf("info: import #%d finished: %s\n", idj.id, state)
1646
a0982c38eac0 Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1642
diff changeset
815 if idj.sendEmail {
a0982c38eac0 Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1642
diff changeset
816 go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
a0982c38eac0 Import queue: Implemented email notifications.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1642
diff changeset
817 }
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
818
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
819 if retry {
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
820 nid, err := q.addJob(
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
821 idj.kind,
1985
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
822 idj.nextDue(),
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
823 idj.trysLeftPointer(),
8eeb0b5eb340 Imports: Made retries and the waiting between the attempts configurable.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1975
diff changeset
824 idj.waitRetryPointer(),
1754
807569b08513 Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1751
diff changeset
825 idj.user, idj.sendEmail,
1708
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
826 idj.data)
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
827 if err != nil {
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
828 log.Printf("error: retry enqueue failed: %v\n", err)
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
829 } else {
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
830 log.Printf("info: re-enqueued job with id %d\n", nid)
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
831 }
49e047c2106e Imports: Made imports re-runnable if they fail.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1694
diff changeset
832 }
1136
a5069da2f0b7 Independent imports in terms of affected tables/dependencies are now run concurrently.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 1017
diff changeset
833 }(jc, idj)
958
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
834 }
2818ad6c7d32 Started with import queue.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
835 }