comparison pkg/imports/gm.go @ 1637:dd31be75ce6d

Implemented gauge measurement import.
author Raimund Renkert <raimund.renkert@intevation.de>
date Thu, 20 Dec 2018 12:06:37 +0100
parents
children 2a35bbbb4d93
comparison
equal deleted inserted replaced
1636:37ee25bc2bbe 1637:dd31be75ce6d
1 // This is Free Software under GNU Affero General Public License v >= 3.0
2 // without warranty, see README.md and license for details.
3 //
4 // SPDX-License-Identifier: AGPL-3.0-or-later
5 // License-Filename: LICENSES/AGPL-3.0.txt
6 //
7 // Copyright (C) 2018 by via donau
8 // – Österreichische Wasserstraßen-Gesellschaft mbH
9 // Software engineering by Intevation GmbH
10 //
11 // Author(s):
12 // * Raimund Renkert <raimund.renkert@intevation.de>
13 package imports
14
15 import (
16 "context"
17 "database/sql"
18 "errors"
19 "time"
20
21 "gemma.intevation.de/gemma/pkg/common"
22 "gemma.intevation.de/gemma/pkg/models"
23 "gemma.intevation.de/gemma/pkg/soap/nts"
24 )
25
26 type GaugeMeasurement struct {
27 URL string `json:"url"`
28 Insecure bool `json:"insecure"`
29 }
30
31 const GMJobKind JobKind = "gm"
32
33 const (
34 listGaugesSQL = `
35 SELECT
36 (location).country_code,
37 (location).locode,
38 (location).fairway_section,
39 (location).orc,
40 (location).hectometre
41 FROM waterway.gauges
42 WHERE (location).country_code = users.current_user_country()`
43
44 hasGaugeMeasurementSQL = `
45 SELECT true FROM waterway.gauge_measurements WHERE fk_gauge_id = $1`
46
47 insertGMSQL = `
48 INSERT INTO waterway.gauge_measurements (
49 fk_gauge_id,
50 measure_date,
51 sender,
52 language_code,
53 date_issue,
54 water_level,
55 predicted,
56 is_waterlevel,
57 value_min,
58 value_max,
59 date_info,
60 source_organization
61 ) VALUES(
62 ($1, $2, $3, $4, $5),
63 $6,
64 $7,
65 $8,
66 $9,
67 $10,
68 $11,
69 $12,
70 $13,
71 $14,
72 $15,
73 $16
74 )
75 RETURNING id`
76 )
77
78 type gmJobCreator struct{}
79
80 func init() {
81 RegisterJobCreator(GMJobKind, gmJobCreator{})
82 }
83
84 func (gmJobCreator) Create(_ JobKind, data string) (Job, error) {
85 gm := new(GaugeMeasurement)
86 if err := common.FromJSONString(data, gm); err != nil {
87 return nil, err
88 }
89 return gm, nil
90 }
91
92 func (gmJobCreator) Depends() []string {
93 return []string{
94 "waterway.gauges",
95 "waterway.gauge_measurements",
96 }
97 }
98
99 // StageDone moves the imported gauge measurement out of the staging area.
100 // Currently doing nothing.
101 func (gmJobCreator) StageDone(
102 ctx context.Context,
103 tx *sql.Tx,
104 id int64,
105 ) error {
106 return nil
107 }
108
109 // CleanUp of a gauge measurement import is a NOP.
110 func (gm *GaugeMeasurement) CleanUp() error { return nil }
111
112 // Do executes the actual bottleneck import.
113 func (gm *GaugeMeasurement) Do(
114 ctx context.Context,
115 importID int64,
116 conn *sql.Conn,
117 feedback Feedback,
118 ) (interface{}, error) {
119
120 // Get available gauges from database for use as filter in SOAP request
121 var rows *sql.Rows
122
123 rows, err := conn.QueryContext(ctx, listGaugesSQL)
124 if err != nil {
125 return nil, err
126 }
127 defer rows.Close()
128
129 gauges := []models.GaugeMeasurement{}
130
131 for rows.Next() {
132 var g models.GaugeMeasurement
133 if err = rows.Scan(
134 &g.Gauge.CountryCode,
135 &g.Gauge.LoCode,
136 &g.Gauge.FairwaySection,
137 &g.Gauge.Orc,
138 &g.Gauge.Hectometre,
139 ); err != nil {
140 return nil, err
141 }
142 gauges = append(gauges, g)
143 }
144
145 if err = rows.Err(); err != nil {
146 return nil, err
147 }
148
149 // TODO get date_issue for selected gauges
150 gids, err := gm.doForGM(ctx, gauges, conn, feedback)
151 if err != nil {
152 feedback.Error("Error processing %d gauges: %s", len(gauges), err)
153 }
154 if len(gids) == 0 {
155 feedback.Error("No new gauge measurements found")
156 return nil, errors.New("No new gauge measurements found")
157 }
158 // TODO: needs to be filled more useful.
159 summary := struct {
160 GaugeMeasuremets []string `json:"gaugeMeasurements"`
161 }{
162 GaugeMeasuremets: gids,
163 }
164 return &summary, err
165 }
166
167 func (gm *GaugeMeasurement) doForGM(
168 ctx context.Context,
169 gauges []models.GaugeMeasurement,
170 conn *sql.Conn,
171 feedback Feedback,
172 ) ([]string, error) {
173 start := time.Now()
174
175 client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)
176
177 var idPairs []*nts.Id_pair
178 for _, g := range gauges {
179 isrs := g.Gauge.String()
180 isrsID := nts.Isrs_code_type(isrs)
181 idPairs = append(idPairs, &nts.Id_pair{
182 Id: &isrsID,
183 })
184 }
185 mt := nts.Message_type_typeWRM
186 req := &nts.Get_messages_query{
187 Message_type: &mt,
188 Ids: idPairs,
189 }
190 resp, err := client.Get_messages(req)
191 if err != nil {
192 feedback.Error("%v", err)
193 return nil, err
194 }
195
196 if resp.Result_message == nil {
197 err := errors.New("no Gauge Measurements found")
198 for i, e := range resp.Result_error {
199 feedback.Error("%d: %v", i, string(*e))
200 }
201 return nil, err
202 }
203
204 result := resp.Result_message
205
206 tx, err := conn.BeginTx(ctx, nil)
207 if err != nil {
208 return nil, err
209 }
210 defer tx.Rollback()
211
212 insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
213 if err != nil {
214 return nil, err
215 }
216 defer insertStmt.Close()
217
218 var gid int64
219 var gids []string
220 for _, msg := range result {
221 feedback.Info("Found %d gauges with measurements", len(msg.Wrm))
222 for _, wrm := range msg.Wrm {
223 currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id))
224 for _, measure := range wrm.Measure {
225 isWaterlevel := false
226 if *measure.Measure_code == nts.Measure_code_enumWAL {
227 isWaterlevel = true
228 }
229 err = insertStmt.QueryRowContext(
230 ctx,
231 currIsrs.CountryCode,
232 currIsrs.LoCode,
233 currIsrs.FairwaySection,
234 currIsrs.Orc,
235 currIsrs.Hectometre,
236 measure.Measuredate,
237 msg.Identification.From,
238 msg.Identification.Language_code,
239 msg.Identification.Date_issue,
240 measure.Value,
241 measure.Predicted,
242 isWaterlevel,
243 measure.Value_min,
244 measure.Value_max,
245 msg.Identification.Date_issue,
246 msg.Identification.Originator,
247 ).Scan(&gid)
248 if err != nil {
249 return nil, err
250 }
251 feedback.Info("Inserted '%s'", currIsrs)
252 }
253 gids = append(gids, currIsrs.String())
254 }
255 }
256 feedback.Info("Storing gauge measurements took %s", time.Since(start))
257 if err = tx.Commit(); err == nil {
258 feedback.Info("Import of gauge measurements was successful")
259 }
260
261 return gids, nil
262 }