comparison pkg/imports/gm.go @ 1879:9a2fbeaabd52 dev-pdf-generation

merging in from branch default
author Bernhard Reiter <bernhard@intevation.de>
date Tue, 15 Jan 2019 10:07:10 +0100
parents 48791416bea5
children 15af36e41f27
comparison
equal deleted inserted replaced
1878:f030182f82f1 1879:9a2fbeaabd52
8 // – Österreichische Wasserstraßen-Gesellschaft mbH 8 // – Österreichische Wasserstraßen-Gesellschaft mbH
9 // Software engineering by Intevation GmbH 9 // Software engineering by Intevation GmbH
10 // 10 //
11 // Author(s): 11 // Author(s):
12 // * Raimund Renkert <raimund.renkert@intevation.de> 12 // * Raimund Renkert <raimund.renkert@intevation.de>
13
13 package imports 14 package imports
14 15
15 import ( 16 import (
16 "context" 17 "context"
17 "database/sql" 18 "database/sql"
18 "errors" 19 "errors"
20 "fmt"
21 "strings"
19 "time" 22 "time"
20 23
21 "gemma.intevation.de/gemma/pkg/common" 24 "gemma.intevation.de/gemma/pkg/common"
22 "gemma.intevation.de/gemma/pkg/models" 25 "gemma.intevation.de/gemma/pkg/models"
23 "gemma.intevation.de/gemma/pkg/soap/nts" 26 "gemma.intevation.de/gemma/pkg/soap/nts"
24 ) 27 )
25 28
29 // GaugeMeasurement is an import job to import
30 // gauges measurement data from a NtS SOAP service.
26 type GaugeMeasurement struct { 31 type GaugeMeasurement struct {
27 URL string `json:"url"` 32 // URL is the URL of the SOAP service.
28 Insecure bool `json:"insecure"` 33 URL string `json:"url"`
29 } 34 // Insecure indicates if HTTPS traffic
30 35 // should validate certificates or not.
36 Insecure bool `json:"insecure"`
37 }
38
39 // GMJobKind is the import queue type identifier.
31 const GMJobKind JobKind = "gm" 40 const GMJobKind JobKind = "gm"
32 41
33 const ( 42 const (
34 listGaugesSQL = ` 43 listGaugesSQL = `
35 SELECT 44 SELECT
39 (location).orc, 48 (location).orc,
40 (location).hectometre 49 (location).hectometre
41 FROM waterway.gauges 50 FROM waterway.gauges
42 WHERE (location).country_code = users.current_user_country()` 51 WHERE (location).country_code = users.current_user_country()`
43 52
44 hasGaugeMeasurementSQL = `
45 SELECT true FROM waterway.gauge_measurements WHERE fk_gauge_id = $1`
46
47 insertGMSQL = ` 53 insertGMSQL = `
48 INSERT INTO waterway.gauge_measurements ( 54 INSERT INTO waterway.gauge_measurements (
49 fk_gauge_id, 55 fk_gauge_id,
50 measure_date, 56 measure_date,
51 sender, 57 sender,
52 language_code, 58 language_code,
59 country_code,
53 date_issue, 60 date_issue,
61 reference_code,
54 water_level, 62 water_level,
55 predicted, 63 predicted,
56 is_waterlevel, 64 is_waterlevel,
57 value_min, 65 value_min,
58 value_max, 66 value_max,
59 date_info, 67 date_info,
60 source_organization 68 source_organization,
69 staging_done
61 ) VALUES( 70 ) VALUES(
62 ($1, $2, $3, $4, $5), 71 ($1, $2, $3, $4, $5),
63 $6, 72 $6,
64 $7, 73 $7,
65 $8, 74 $8,
68 $11, 77 $11,
69 $12, 78 $12,
70 $13, 79 $13,
71 $14, 80 $14,
72 $15, 81 $15,
73 $16 82 $16,
83 $17,
84 $18,
85 $19
74 ) 86 )
75 RETURNING id` 87 RETURNING id`
76 ) 88 )
77 89
78 type gmJobCreator struct{} 90 type gmJobCreator struct{}
79 91
80 func init() { 92 func init() {
81 RegisterJobCreator(GMJobKind, gmJobCreator{}) 93 RegisterJobCreator(GMJobKind, gmJobCreator{})
82 } 94 }
83 95
96 func (gmJobCreator) Description() string {
97 return "gauge measurements"
98 }
99
84 func (gmJobCreator) Create(_ JobKind, data string) (Job, error) { 100 func (gmJobCreator) Create(_ JobKind, data string) (Job, error) {
85 gm := new(GaugeMeasurement) 101 gm := new(GaugeMeasurement)
86 if err := common.FromJSONString(data, gm); err != nil { 102 if err := common.FromJSONString(data, gm); err != nil {
87 return nil, err 103 return nil, err
88 } 104 }
89 return gm, nil 105 return gm, nil
90 } 106 }
91 107
92 func (gmJobCreator) Depends() []string { 108 func (gmJobCreator) Depends() []string {
93 return []string{ 109 return []string{
94 "waterway.gauges", 110 "gauges",
95 "waterway.gauge_measurements", 111 "gauge_measurements",
96 } 112 }
97 } 113 }
114
115 func (gmJobCreator) AutoAccept() bool { return true }
98 116
99 // StageDone moves the imported gauge measurement out of the staging area. 117 // StageDone moves the imported gauge measurement out of the staging area.
100 // Currently doing nothing. 118 // Currently doing nothing.
101 func (gmJobCreator) StageDone( 119 func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
102 ctx context.Context,
103 tx *sql.Tx,
104 id int64,
105 ) error {
106 return nil 120 return nil
107 } 121 }
108 122
109 // CleanUp of a gauge measurement import is a NOP. 123 // CleanUp of a gauge measurement import is a NOP.
110 func (gm *GaugeMeasurement) CleanUp() error { return nil } 124 func (*GaugeMeasurement) CleanUp() error { return nil }
111 125
112 // Do executes the actual bottleneck import. 126 // Do executes the actual bottleneck import.
113 func (gm *GaugeMeasurement) Do( 127 func (gm *GaugeMeasurement) Do(
114 ctx context.Context, 128 ctx context.Context,
115 importID int64, 129 importID int64,
147 } 161 }
148 162
149 // TODO get date_issue for selected gauges 163 // TODO get date_issue for selected gauges
150 gids, err := gm.doForGM(ctx, gauges, conn, feedback) 164 gids, err := gm.doForGM(ctx, gauges, conn, feedback)
151 if err != nil { 165 if err != nil {
152 feedback.Error("Error processing %d gauges: %s", len(gauges), err) 166 feedback.Error("Error processing %d gauges: %v", len(gauges), err)
167 return nil, err
153 } 168 }
154 if len(gids) == 0 { 169 if len(gids) == 0 {
155 feedback.Info("No new gauge measurements found") 170 feedback.Info("No new gauge measurements found")
156 return nil, nil 171 return nil, nil
157 } 172 }
160 GaugeMeasuremets []string `json:"gaugeMeasurements"` 175 GaugeMeasuremets []string `json:"gaugeMeasurements"`
161 }{ 176 }{
162 GaugeMeasuremets: gids, 177 GaugeMeasuremets: gids,
163 } 178 }
164 return &summary, err 179 return &summary, err
180 }
181
182 // rescale returns a scaling function to bring the unit all to cm.
183 func rescale(unit string) (func(float32) float32, error) {
184
185 var scale float32
186
187 switch strings.ToLower(unit) {
188 case "mm":
189 scale = 0.1
190 case "cm":
191 scale = 1.0
192 case "dm":
193 scale = 10.0
194 case "m":
195 scale = 100.0
196 case "hm":
197 scale = 10000.0
198 case "km":
199 scale = 100000.0
200 default:
201 return nil, fmt.Errorf("unknown unit '%s'", unit)
202 }
203
204 fn := func(x float32) float32 { return scale * x }
205 return fn, nil
165 } 206 }
166 207
167 func (gm *GaugeMeasurement) doForGM( 208 func (gm *GaugeMeasurement) doForGM(
168 ctx context.Context, 209 ctx context.Context,
169 gauges []models.GaugeMeasurement, 210 gauges []models.GaugeMeasurement,
223 currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id)) 264 currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id))
224 if err != nil { 265 if err != nil {
225 feedback.Warn("Invalid ISRS code %v", err) 266 feedback.Warn("Invalid ISRS code %v", err)
226 continue 267 continue
227 } 268 }
269 var referenceCode string
270 if wrm.Reference_code == nil {
271 feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
272 referenceCode = "ZPG"
273 } else {
274 referenceCode = string(*wrm.Reference_code)
275 }
228 for _, measure := range wrm.Measure { 276 for _, measure := range wrm.Measure {
277 var unit string
278 if measure.Unit == nil {
279 feedback.Info("'Unit' not specified. Assuming 'cm'")
280 unit = "cm"
281 } else {
282 unit = string(*measure.Unit)
283 }
284 convert, err := rescale(unit)
285 if err != nil {
286 return nil, err
287 }
229 isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL 288 isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
230 err = insertStmt.QueryRowContext( 289 err = insertStmt.QueryRowContext(
231 ctx, 290 ctx,
232 currIsrs.CountryCode, 291 currIsrs.CountryCode,
233 currIsrs.LoCode, 292 currIsrs.LoCode,
235 currIsrs.Orc, 294 currIsrs.Orc,
236 currIsrs.Hectometre, 295 currIsrs.Hectometre,
237 measure.Measuredate, 296 measure.Measuredate,
238 msg.Identification.From, 297 msg.Identification.From,
239 msg.Identification.Language_code, 298 msg.Identification.Language_code,
299 msg.Identification.Country_code,
240 msg.Identification.Date_issue, 300 msg.Identification.Date_issue,
241 measure.Value, 301 referenceCode,
302 convert(measure.Value),
242 measure.Predicted, 303 measure.Predicted,
243 isWaterlevel, 304 isWaterlevel,
244 measure.Value_min, 305 convert(measure.Value_min),
245 measure.Value_max, 306 convert(measure.Value_max),
246 msg.Identification.Date_issue, 307 msg.Identification.Date_issue,
247 msg.Identification.Originator, 308 msg.Identification.Originator,
309 true, // staging_done
248 ).Scan(&gid) 310 ).Scan(&gid)
249 if err != nil { 311 if err != nil {
250 return nil, err 312 return nil, err
251 } 313 }
252 } 314 }