comparison pkg/imports/agm.go @ 4027:b17453420eff

Avoid doing one SELECT per line in import file When the import is run by a waterway admin (more row level security processing involved than as sys_admin), this approximately halves the overall duration of the import
author Tom Gottfried <tom@intevation.de>
date Mon, 22 Jul 2019 19:19:00 +0200
parents 82037bbd2c7c
children 040a5dc95eb9
comparison
equal deleted inserted replaced
4026:82037bbd2c7c 4027:b17453420eff
150 } 150 }
151 151
152 const ( 152 const (
153 agmSelectSQL = ` 153 agmSelectSQL = `
154 SELECT 154 SELECT
155 id,
156 country_code, 155 country_code,
157 sender, 156 sender,
158 language_code, 157 language_code,
159 date_issue, 158 date_issue,
160 reference_code, 159 reference_code,
160 measure_date,
161 water_level, 161 water_level,
162 date_info, 162 date_info,
163 source_organization 163 source_organization
164 FROM waterway.gauge_measurements 164 FROM waterway.gauge_measurements
165 WHERE 165 WHERE
166 location 166 location
167 = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) 167 = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
168 AND measure_date = $6 168 AND measure_date BETWEEN $6 AND $7
169 AND staging_done 169 AND staging_done
170 ` 170 `
171 171
172 agmInsertSQL = ` 172 agmInsertSQL = `
173 INSERT INTO waterway.gauge_measurements ( 173 INSERT INTO waterway.gauge_measurements (
312 warn := warnLimiter.Warn 312 warn := warnLimiter.Warn
313 defer warnLimiter.Close() 313 defer warnLimiter.Close()
314 314
315 agmLines := []*agmLine{} 315 agmLines := []*agmLine{}
316 ignored := 0 316 ignored := 0
317 mdMinMax := map[models.Isrs][2]time.Time{}
317 318
318 lines: 319 lines:
319 for line := 1; ; line++ { 320 for line := 1; ; line++ {
320 321
321 row, err := r.Read() 322 row, err := r.Read()
365 } 366 }
366 367
367 md, err := guessDate(row[measureDateIdx]) 368 md, err := guessDate(row[measureDateIdx])
368 if err != nil { 369 if err != nil {
369 return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) 370 return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err)
371 }
372 if _, hasGid := mdMinMax[*gid]; hasGid {
373 if md.Before(mdMinMax[*gid][0]) {
374 mdMinMax[*gid] = [2]time.Time{md, mdMinMax[*gid][1]}
375 }
376 if md.After(mdMinMax[*gid][1]) {
377 mdMinMax[*gid] = [2]time.Time{mdMinMax[*gid][0], md}
378 }
379 } else {
380 mdMinMax[*gid] = [2]time.Time{md, md}
370 } 381 }
371 382
372 newSender := agm.Originator 383 newSender := agm.Originator
373 newCountryCode := gid.CountryCode 384 newCountryCode := gid.CountryCode
374 newLanguageCode := misc.CCtoLang[gid.CountryCode] 385 newLanguageCode := misc.CCtoLang[gid.CountryCode]
397 newDateInfo, 408 newDateInfo,
398 newSourceOrganization, 409 newSourceOrganization,
399 )) 410 ))
400 } 411 }
401 412
413 oldGMLines := map[models.Isrs]map[int64]*agmLine{}
414 for gid, minMax := range mdMinMax {
415 oldGMLines[gid], err = getOldGMLines(
416 ctx, selectStmt, gid, minMax[0], minMax[1])
417 if err != nil {
418 return nil, err
419 }
420 }
421
402 agmLines: 422 agmLines:
403 for _, line := range agmLines { 423 for _, line := range agmLines {
404 var (
405 oldID int64
406 oldCountryCode string
407 oldSender string
408 oldLanguageCode string
409 oldDateIssue time.Time
410 oldReferenceCode string
411 oldValue float64
412 oldDateInfo time.Time
413 oldSourceOrganization string
414 )
415
416 err = selectStmt.QueryRowContext(
417 ctx,
418 line.Location.CountryCode,
419 line.Location.LoCode,
420 line.Location.FairwaySection,
421 line.Location.Orc,
422 line.Location.Hectometre,
423 line.MeasureDate.Time,
424 ).Scan(
425 &oldID,
426 &oldCountryCode,
427 &oldSender,
428 &oldLanguageCode,
429 &oldDateIssue,
430 &oldReferenceCode,
431 &oldValue,
432 &oldDateInfo,
433 &oldSourceOrganization,
434 )
435
436 var newEntry bool
437 switch {
438 case err == sql.ErrNoRows:
439 // Complete new one
440 newEntry = true
441 case err != nil:
442 return nil, err
443 }
444 424
445 switch err := func() error { 425 switch err := func() error {
446 tx, err := conn.BeginTx(ctx, nil) 426 tx, err := conn.BeginTx(ctx, nil)
447 if err != nil { 427 if err != nil {
448 return err 428 return err
493 ase := &agmSummaryEntry{ 473 ase := &agmSummaryEntry{
494 FKGaugeID: line.Location, 474 FKGaugeID: line.Location,
495 MeasureDate: line.MeasureDate, 475 MeasureDate: line.MeasureDate,
496 } 476 }
497 477
498 if newEntry { 478 if o, hasOld := oldGMLines[line.Location][line.MeasureDate.Time.Unix()]; !hasOld {
499 ase.Versions = []*agmLine{line} 479 ase.Versions = []*agmLine{line}
500 } else { 480 } else {
501 o := newAGMLine(
502 line.Location,
503 oldCountryCode,
504 oldSender,
505 oldLanguageCode,
506 oldDateIssue,
507 oldReferenceCode,
508 line.MeasureDate.Time,
509 oldValue,
510 oldDateInfo,
511 oldSourceOrganization,
512 )
513 // Ignore if there is no diff. 481 // Ignore if there is no diff.
514 if !line.hasDiff(o) { 482 if !line.hasDiff(o) {
515 continue 483 continue
516 } 484 }
517 ase.Versions = []*agmLine{o, line} 485 ase.Versions = []*agmLine{o, line}
525 feedback.Info("Imported %d entries with changes", len(entries)) 493 feedback.Info("Imported %d entries with changes", len(entries))
526 feedback.Info("Importing approved gauge measurements took %s", 494 feedback.Info("Importing approved gauge measurements took %s",
527 time.Since(start)) 495 time.Since(start))
528 496
529 return entries, nil 497 return entries, nil
498 }
499
500 func getOldGMLines(
501 ctx context.Context,
502 stmt *sql.Stmt,
503 location models.Isrs,
504 from time.Time,
505 to time.Time,
506 ) (map[int64]*agmLine, error) {
507 var (
508 oldCountryCode string
509 oldSender string
510 oldLanguageCode string
511 oldDateIssue time.Time
512 oldReferenceCode string
513 oldMeasureDate time.Time
514 oldValue float64
515 oldDateInfo time.Time
516 oldSourceOrganization string
517 )
518 gmLines := map[int64]*agmLine{}
519
520 gms, err := stmt.QueryContext(
521 ctx,
522 location.CountryCode,
523 location.LoCode,
524 location.FairwaySection,
525 location.Orc,
526 location.Hectometre,
527 from,
528 to,
529 )
530 if err != nil {
531 return nil, err
532 }
533 defer gms.Close()
534 for gms.Next() {
535 if err = gms.Scan(
536 &oldCountryCode,
537 &oldSender,
538 &oldLanguageCode,
539 &oldDateIssue,
540 &oldReferenceCode,
541 &oldMeasureDate,
542 &oldValue,
543 &oldDateInfo,
544 &oldSourceOrganization,
545 ); err != nil {
546 return nil, err
547 }
548 gmLines[oldMeasureDate.Unix()] = newAGMLine(
549 location,
550 oldCountryCode,
551 oldSender,
552 oldLanguageCode,
553 oldDateIssue,
554 oldReferenceCode,
555 oldMeasureDate,
556 oldValue,
557 oldDateInfo,
558 oldSourceOrganization,
559 )
560 }
561 if err = gms.Err(); err != nil {
562 return nil, err
563 }
564 return gmLines, nil
530 } 565 }
531 566
532 func newAGMLine( 567 func newAGMLine(
533 location models.Isrs, 568 location models.Isrs,
534 countryCode string, 569 countryCode string,