Mercurial > gemma
changeset 2233:137addc77b1b
Gauge measurement imports: Do all database ops in one transaction.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 13 Feb 2019 12:39:56 +0100 |
parents | 7936b46a88d4 |
children | 9b2f8e94671e |
files | pkg/imports/fa.go pkg/imports/gm.go |
diffstat | 2 files changed, 61 insertions(+), 48 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/fa.go Wed Feb 13 11:44:27 2019 +0100 +++ b/pkg/imports/fa.go Wed Feb 13 12:39:56 2019 +0100 @@ -300,14 +300,13 @@ } feedback.Info("Processed %d fairway availabilities", len(faids)) - if err = tx.Commit(); err == nil { - feedback.Info( - "Importing fairway availabilities successfully took %s", time.Since(start)) - } else { + if err = tx.Commit(); err != nil { feedback.Info( "Importing fairway availabilities failed after %s", time.Since(start)) return nil, err } + feedback.Info( + "Importing fairway availabilities successfully took %s", time.Since(start)) // TODO: needs to be filled more useful. summary := struct {
--- a/pkg/imports/gm.go Wed Feb 13 11:44:27 2019 +0100 +++ b/pkg/imports/gm.go Wed Feb 13 12:39:56 2019 +0100 @@ -34,10 +34,10 @@ Insecure bool `json:"insecure"` } -// gaugeMeasurement holds information about a gauge and the latest measurement -type gaugeMeasurement struct { - Gauge models.Isrs - LatestDateIssue time.Time +// gauge holds information about a gauge and the latest measurement +type gauge struct { + location models.Isrs + latest time.Time } // GMJobKind is the import queue type identifier. @@ -137,6 +137,37 @@ // CleanUp of a gauge measurement import is a NOP. func (*GaugeMeasurement) CleanUp() error { return nil } +func loadGauges(ctx context.Context, tx *sql.Tx) ([]*gauge, error) { + + rows, err := tx.QueryContext(ctx, listGaugesSQL) + if err != nil { + return nil, err + } + defer rows.Close() + + var gauges []*gauge + + for rows.Next() { + var g gauge + if err = rows.Scan( + &g.location.CountryCode, + &g.location.LoCode, + &g.location.FairwaySection, + &g.location.Orc, + &g.location.Hectometre, + ); err != nil { + return nil, err + } + gauges = append(gauges, &g) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return gauges, nil +} + // Do executes the actual bottleneck import. func (gm *GaugeMeasurement) Do( ctx context.Context, @@ -145,45 +176,40 @@ feedback Feedback, ) (interface{}, error) { - // Get available gauges from database for use as filter in SOAP request - var rows *sql.Rows + start := time.Now() - rows, err := conn.QueryContext(ctx, listGaugesSQL) + tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } - defer rows.Close() - - gauges := []gaugeMeasurement{} + defer tx.Rollback() - for rows.Next() { - var g gaugeMeasurement - if err = rows.Scan( - &g.Gauge.CountryCode, - &g.Gauge.LoCode, - &g.Gauge.FairwaySection, - &g.Gauge.Orc, - &g.Gauge.Hectometre, - ); err != nil { - return nil, err - } - gauges = append(gauges, g) - } - - if err = rows.Err(); err != nil { + // Get available gauges from database for use as filter in SOAP request + gauges, err := loadGauges(ctx, tx) + if err != nil { return nil, err } // TODO get date_issue for selected gauges - gids, err := gm.doForGM(ctx, gauges, conn, feedback) + gids, err := gm.doForGM(ctx, gauges, tx, feedback) if err != nil { feedback.Error("Error processing %d gauges: %v", len(gauges), err) return nil, err } + if len(gids) == 0 { feedback.Info("No new gauge measurements found") - return nil, nil + return nil, UnchangedError("No new gauge measurements found") } + + if err = tx.Commit(); err != nil { + feedback.Info( + "Importing gauge measurements failed after %s", time.Since(start)) + return nil, err + } + feedback.Info( + "Importing gauge measurements successfully took %s", time.Since(start)) + // TODO: needs to be filled more useful. summary := struct { GaugeMeasuremets []string `json:"gaugeMeasurements"` @@ -221,27 +247,20 @@ func (gm *GaugeMeasurement) doForGM( ctx context.Context, - gauges []gaugeMeasurement, - conn *sql.Conn, + gauges []*gauge, + tx *sql.Tx, feedback Feedback, ) ([]string, error) { - start := time.Now() client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil) - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return nil, err - } - defer tx.Rollback() - mt := nts.Message_type_typeWRM var gids []string for _, g := range gauges { var idPair []*nts.Id_pair - isrs := g.Gauge.String() + isrs := g.location.String() isrsID := nts.Isrs_code_type(isrs) idPair = append(idPair, &nts.Id_pair{ Id: &isrsID, @@ -261,7 +280,7 @@ if resp.Result_message == nil { for _, e := range resp.Result_error { if e != nil { - feedback.Error("No gauge measurements found for %s", g.Gauge.String()) + feedback.Error("No gauge measurements found for %s", g.location.String()) } else { feedback.Error("unknown") } @@ -278,7 +297,7 @@ for _, msg := range result { var gid int64 - feedback.Info("Found measurements for %s", g.Gauge.String()) + feedback.Info("Found measurements for %s", g.location.String()) for _, wrm := range msg.Wrm { currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id)) if err != nil { @@ -337,10 +356,5 @@ } } } - feedback.Info("Storing gauge measurements took %s", time.Since(start)) - if err = tx.Commit(); err == nil { - feedback.Info("Import of gauge measurements was successful") - } - return gids, nil }