comparison pkg/imports/wp.go @ 2084:ddbac0f22ffb

Waterway profiles import: Restructured code a bit in preparation of downloading Profile geometries from WFS first (needs to be done).
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 31 Jan 2019 16:32:18 +0100
parents 9318973487a1
children 6096ec4951f8
comparison
equal deleted inserted replaced
2083:6deafd6f7f86 2084:ddbac0f22ffb
125 ctx context.Context, 125 ctx context.Context,
126 importID int64, 126 importID int64,
127 conn *sql.Conn, 127 conn *sql.Conn,
128 feedback Feedback, 128 feedback Feedback,
129 ) (interface{}, error) { 129 ) (interface{}, error) {
130
131 start := time.Now() 130 start := time.Now()
131
132 tx, err := conn.BeginTx(ctx, nil)
133 if err != nil {
134 return nil, err
135 }
136 defer tx.Rollback()
137
138 // TODO: Download profile geometries from WFS.
139
140 summary, err := wp.processCSV(ctx, importID, tx, start, feedback)
141 if err != nil {
142 return nil, fmt.Errorf("error processing CVS: %v", err)
143 }
144
145 if err := tx.Commit(); err != nil {
146 return nil, fmt.Errorf(
147 "Importing waterway profiles failed after %s: %v",
148 time.Since(start), err)
149 }
150
151 feedback.Info("Importing waterway profiles took %s",
152 time.Since(start))
153 return summary, nil
154 }
155
156 func (wp *WaterwayProfiles) processCSV(
157 ctx context.Context,
158 importID int64,
159 tx *sql.Tx,
160 start time.Time,
161 feedback Feedback,
162 ) (interface{}, error) {
132 163
133 f, err := os.Open(filepath.Join(wp.Dir, "wp.csv")) 164 f, err := os.Open(filepath.Join(wp.Dir, "wp.csv"))
134 if err != nil { 165 if err != nil {
135 return nil, err 166 return nil, err
136 } 167 }
156 fe100Idx = -1 187 fe100Idx = -1
157 dateInfoIdx = -1 188 dateInfoIdx = -1
158 sourceIdx = -1 189 sourceIdx = -1
159 ) 190 )
160 191
161 type headerField struct { 192 fields := []struct {
162 idx *int 193 idx *int
163 substr string 194 substr string
164 } 195 }{
165
166 fields := []headerField{
167 {&locationIdx, "location"}, 196 {&locationIdx, "location"},
168 {&validFromIdx, "valid_from"}, 197 {&validFromIdx, "valid_from"},
169 {&validToIdx, "valid_to"}, 198 {&validToIdx, "valid_to"},
170 {&lnwlIdx, "lnwl"}, 199 {&lnwlIdx, "lnwl"},
171 {&mwlIdx, "mwl"}, 200 {&mwlIdx, "mwl"},
203 "CSV is missing columns: %s", 232 "CSV is missing columns: %s",
204 strings.Join(missing, ", ")) 233 strings.Join(missing, ", "))
205 } 234 }
206 235
207 parseDate := misc.TimeGuesser([]string{"02.01.2006"}).Guess 236 parseDate := misc.TimeGuesser([]string{"02.01.2006"}).Guess
208
209 tx, err := conn.BeginTx(ctx, nil)
210 if err != nil {
211 return nil, err
212 }
213 defer tx.Rollback()
214 237
215 insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL) 238 insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL)
216 if err != nil { 239 if err != nil {
217 return nil, err 240 return nil, err
218 } 241 }
346 } 369 }
347 if len(ids) == 0 { 370 if len(ids) == 0 {
348 return nil, UnchangedError("No new entries in waterway profiles.") 371 return nil, UnchangedError("No new entries in waterway profiles.")
349 } 372 }
350 373
351 if err := tx.Commit(); err != nil {
352 return nil, fmt.Errorf(
353 "Importing waterway profiles failed after %s: %v",
354 time.Since(start), err)
355 }
356
357 feedback.Info("%d new entries in waterway profiles.", len(ids)) 374 feedback.Info("%d new entries in waterway profiles.", len(ids))
358 feedback.Info("Importing waterway profiles took %s",
359 time.Since(start))
360 375
361 summary := struct { 376 summary := struct {
362 IDs []int64 `json:"ids"` 377 IDs []int64 `json:"ids"`
363 }{ 378 }{
364 IDs: ids, 379 IDs: ids,