comparison pkg/imports/gm.go @ 2242:786c3fb7efe1

Gauge measurements import: Re-factored to be re-usable for upcoming uploaded gauge measurements.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 13 Feb 2019 15:38:24 +0100
parents 4ca41516115b
children 52093f82a786
comparison
equal deleted inserted replaced
2241:5529e1f08dba 2242:786c3fb7efe1
15 15
16 import ( 16 import (
17 "context" 17 "context"
18 "database/sql" 18 "database/sql"
19 "fmt" 19 "fmt"
20 "sort"
20 "strings" 21 "strings"
21 "time" 22 "time"
22 23
23 "gemma.intevation.de/gemma/pkg/models" 24 "gemma.intevation.de/gemma/pkg/models"
24 "gemma.intevation.de/gemma/pkg/soap/nts" 25 "gemma.intevation.de/gemma/pkg/soap/nts"
135 } 136 }
136 137
137 // CleanUp of a gauge measurement import is a NOP. 138 // CleanUp of a gauge measurement import is a NOP.
138 func (*GaugeMeasurement) CleanUp() error { return nil } 139 func (*GaugeMeasurement) CleanUp() error { return nil }
139 140
140 func loadGauges(ctx context.Context, tx *sql.Tx) ([]*gauge, error) {
141
142 rows, err := tx.QueryContext(ctx, listGaugesSQL)
143 if err != nil {
144 return nil, err
145 }
146 defer rows.Close()
147
148 var gauges []*gauge
149
150 for rows.Next() {
151 var g gauge
152 if err = rows.Scan(
153 &g.location.CountryCode,
154 &g.location.LoCode,
155 &g.location.FairwaySection,
156 &g.location.Orc,
157 &g.location.Hectometre,
158 ); err != nil {
159 return nil, err
160 }
161 gauges = append(gauges, &g)
162 }
163
164 if err = rows.Err(); err != nil {
165 return nil, err
166 }
167
168 return gauges, nil
169 }
170
171 // Do executes the actual bottleneck import. 141 // Do executes the actual bottleneck import.
172 func (gm *GaugeMeasurement) Do( 142 func (gm *GaugeMeasurement) Do(
173 ctx context.Context, 143 ctx context.Context,
174 importID int64, 144 importID int64,
175 conn *sql.Conn, 145 conn *sql.Conn,
176 feedback Feedback, 146 feedback Feedback,
177 ) (interface{}, error) { 147 ) (interface{}, error) {
178 148
149 fetch := func() ([]*nts.RIS_Message_Type, error) {
150 client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)
151
152 mt := nts.Message_type_typeWRM
153
154 req := &nts.Get_messages_query{
155 Message_type: &mt,
156 }
157
158 resp, err := client.Get_messages(req)
159 if err != nil {
160 return nil, err
161 }
162
163 result := resp.Result_message
164 if result == nil {
165 for _, e := range resp.Result_error {
166 if e != nil {
167 feedback.Error("Error code: %s", *e)
168 } else {
169 feedback.Error("Unknown error")
170 }
171 }
172 }
173 return result, nil
174 }
175
176 return storeGaugeMeasurements(
177 ctx,
178 importID,
179 fetch,
180 conn,
181 feedback,
182 )
183 }
184
185 func loadGauges(ctx context.Context, tx *sql.Tx) ([]string, error) {
186
187 rows, err := tx.QueryContext(ctx, listGaugesSQL)
188 if err != nil {
189 return nil, err
190 }
191 defer rows.Close()
192
193 var gauges []string
194
195 for rows.Next() {
196 var g models.Isrs
197 if err = rows.Scan(
198 &g.CountryCode,
199 &g.LoCode,
200 &g.FairwaySection,
201 &g.Orc,
202 &g.Hectometre,
203 ); err != nil {
204 return nil, err
205 }
206 gauges = append(gauges, g.String())
207 }
208
209 if err = rows.Err(); err != nil {
210 return nil, err
211 }
212
213 sort.Strings(gauges)
214
215 return gauges, nil
216 }
217
218 func storeGaugeMeasurements(
219 ctx context.Context,
220 importID int64,
221 fetch func() ([]*nts.RIS_Message_Type, error),
222 conn *sql.Conn,
223 feedback Feedback,
224 ) (interface{}, error) {
225
179 start := time.Now() 226 start := time.Now()
180 227
181 tx, err := conn.BeginTx(ctx, nil) 228 tx, err := conn.BeginTx(ctx, nil)
182 if err != nil { 229 if err != nil {
183 return nil, err 230 return nil, err
189 if err != nil { 236 if err != nil {
190 return nil, err 237 return nil, err
191 } 238 }
192 239
193 // TODO get date_issue for selected gauges 240 // TODO get date_issue for selected gauges
194 gids, err := gm.doForGM(ctx, gauges, tx, feedback) 241 gids, err := doForGM(ctx, gauges, fetch, tx, feedback)
195 if err != nil { 242 if err != nil {
196 feedback.Error("Error processing %d gauges: %v", len(gauges), err) 243 feedback.Error("Error processing %d gauges: %v", len(gauges), err)
197 return nil, err 244 return nil, err
198 } 245 }
199 246
243 290
244 fn := func(x float32) float32 { return scale * x } 291 fn := func(x float32) float32 { return scale * x }
245 return fn, nil 292 return fn, nil
246 } 293 }
247 294
248 func (gm *GaugeMeasurement) doForGM( 295 func doForGM(
249 ctx context.Context, 296 ctx context.Context,
250 gauges []*gauge, 297 gauges []string,
298 fetch func() ([]*nts.RIS_Message_Type, error),
251 tx *sql.Tx, 299 tx *sql.Tx,
252 feedback Feedback, 300 feedback Feedback,
253 ) ([]string, error) { 301 ) ([]string, error) {
254 302
255 client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)
256
257 mt := nts.Message_type_typeWRM
258
259 insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) 303 insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
260 if err != nil { 304 if err != nil {
261 return nil, err 305 return nil, err
262 } 306 }
263 defer insertStmt.Close() 307 defer insertStmt.Close()
264 308
309 // lookup to see if we have gauges in the database.
310 isKnown := func(s string) bool {
311 idx := sort.SearchStrings(gauges, s)
312 return idx < len(gauges) && gauges[idx] == s
313 }
314
315 result, err := fetch()
316 if err != nil {
317 return nil, err
318 }
319
265 var gids []string 320 var gids []string
266 for _, g := range gauges { 321 for _, msg := range result {
267 322 var gid int64
268 isrsID := nts.Isrs_code_type(g.location.String()) 323 for _, wrm := range msg.Wrm {
269 idPair := []*nts.Id_pair{{Id: &isrsID}} 324 curr := string(*wrm.Geo_object.Id)
270 325 currIsrs, err := models.IsrsFromString(curr)
271 req := &nts.Get_messages_query{ 326 if err != nil {
272 Message_type: &mt, 327 feedback.Warn("Invalid ISRS code %v", err)
273 Ids: idPair, 328 continue
274 } 329 }
275 330 feedback.Info("Found measurements for %s", curr)
276 resp, err := client.Get_messages(req) 331 if !isKnown(curr) {
277 if err != nil { 332 feedback.Warn("Gauge '%s' is not in database.", curr)
278 return nil, err 333 continue
279 } 334 }
280 335
281 if resp.Result_message == nil { 336 var referenceCode string
282 for _, e := range resp.Result_error { 337 if wrm.Reference_code == nil {
283 if e != nil { 338 feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
284 feedback.Error("No gauge measurements found for %s", g.location) 339 referenceCode = "ZPG"
340 } else {
341 referenceCode = string(*wrm.Reference_code)
342 }
343 for _, measure := range wrm.Measure {
344 var unit string
345 if measure.Unit == nil {
346 feedback.Info("'Unit' not specified. Assuming 'cm'")
347 unit = "cm"
285 } else { 348 } else {
286 feedback.Error("unknown") 349 unit = string(*measure.Unit)
287 } 350 }
288 } 351 convert, err := rescale(unit)
289 continue
290 }
291 result := resp.Result_message
292
293 for _, msg := range result {
294 var gid int64
295 feedback.Info("Found measurements for %s", g.location)
296 for _, wrm := range msg.Wrm {
297 currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id))
298 if err != nil { 352 if err != nil {
299 feedback.Warn("Invalid ISRS code %v", err) 353 return nil, err
300 continue
301 } 354 }
302 var referenceCode string 355 isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
303 if wrm.Reference_code == nil { 356 err = insertStmt.QueryRowContext(
304 feedback.Info("'Reference_code' not specified. Assuming 'ZPG'") 357 ctx,
305 referenceCode = "ZPG" 358 currIsrs.CountryCode,
306 } else { 359 currIsrs.LoCode,
307 referenceCode = string(*wrm.Reference_code) 360 currIsrs.FairwaySection,
361 currIsrs.Orc,
362 currIsrs.Hectometre,
363 measure.Measuredate,
364 msg.Identification.From,
365 msg.Identification.Language_code,
366 msg.Identification.Country_code,
367 msg.Identification.Date_issue,
368 referenceCode,
369 convert(measure.Value),
370 measure.Predicted,
371 isWaterlevel,
372 convert(measure.Value_min),
373 convert(measure.Value_max),
374 msg.Identification.Date_issue,
375 msg.Identification.Originator,
376 true, // staging_done
377 ).Scan(&gid)
378 if err != nil {
379 return nil, err
308 } 380 }
309 for _, measure := range wrm.Measure { 381 }
310 var unit string 382 feedback.Info("Inserted %d measurements for %s",
311 if measure.Unit == nil { 383 len(wrm.Measure), curr)
312 feedback.Info("'Unit' not specified. Assuming 'cm'") 384 gids = append(gids, curr)
313 unit = "cm"
314 } else {
315 unit = string(*measure.Unit)
316 }
317 convert, err := rescale(unit)
318 if err != nil {
319 return nil, err
320 }
321 isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
322 err = insertStmt.QueryRowContext(
323 ctx,
324 currIsrs.CountryCode,
325 currIsrs.LoCode,
326 currIsrs.FairwaySection,
327 currIsrs.Orc,
328 currIsrs.Hectometre,
329 measure.Measuredate,
330 msg.Identification.From,
331 msg.Identification.Language_code,
332 msg.Identification.Country_code,
333 msg.Identification.Date_issue,
334 referenceCode,
335 convert(measure.Value),
336 measure.Predicted,
337 isWaterlevel,
338 convert(measure.Value_min),
339 convert(measure.Value_max),
340 msg.Identification.Date_issue,
341 msg.Identification.Originator,
342 true, // staging_done
343 ).Scan(&gid)
344 if err != nil {
345 return nil, err
346 }
347 }
348 feedback.Info("Inserted %d measurements for %s",
349 len(wrm.Measure), currIsrs)
350 gids = append(gids, currIsrs.String())
351 }
352 } 385 }
353 } 386 }
354 return gids, nil 387 return gids, nil
355 } 388 }