Mercurial > gemma
comparison pkg/imports/agm.go @ 1778:164b46ebd60d
Approved gauge measurement import: Implemented. TODO: Fix staging.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Fri, 11 Jan 2019 15:28:10 +0100 |
parents | fcb0106ec510 |
children | ad1c12e999df |
comparison
equal
deleted
inserted
replaced
1777:e70b7b8e7b74 | 1778:164b46ebd60d |
---|---|
16 import ( | 16 import ( |
17 "bufio" | 17 "bufio" |
18 "context" | 18 "context" |
19 "database/sql" | 19 "database/sql" |
20 "encoding/csv" | 20 "encoding/csv" |
21 "errors" | 21 "fmt" |
22 "io" | |
22 "log" | 23 "log" |
23 "os" | 24 "os" |
24 "path/filepath" | 25 "path/filepath" |
26 "strconv" | |
25 "strings" | 27 "strings" |
28 "time" | |
26 | 29 |
27 "gemma.intevation.de/gemma/pkg/common" | 30 "gemma.intevation.de/gemma/pkg/common" |
31 "gemma.intevation.de/gemma/pkg/models" | |
28 ) | 32 ) |
29 | 33 |
30 type ApprovedGaugeMeasurements struct { | 34 type ApprovedGaugeMeasurements struct { |
31 Dir string `json:"dir"` | 35 Dir string `json:"dir"` |
32 } | 36 } |
85 // the approved gauge measurements. | 89 // the approved gauge measurements. |
86 func (agm *ApprovedGaugeMeasurements) CleanUp() error { | 90 func (agm *ApprovedGaugeMeasurements) CleanUp() error { |
87 return os.RemoveAll(agm.Dir) | 91 return os.RemoveAll(agm.Dir) |
88 } | 92 } |
89 | 93 |
94 func guessDate(s string) (time.Time, error) { | |
95 var err error | |
96 var t time.Time | |
97 for _, layout := range [...]string{ | |
98 "02.01.2006 15:04", | |
99 "2006-01-02T15:04:05-07:00", | |
100 } { | |
101 if t, err = time.Parse(layout, s); err == nil { | |
102 break | |
103 } | |
104 } | |
105 return t, err | |
106 } | |
107 | |
90 // Do executes the actual approved gauge measurements import. | 108 // Do executes the actual approved gauge measurements import. |
91 func (agm *ApprovedGaugeMeasurements) Do( | 109 func (agm *ApprovedGaugeMeasurements) Do( |
92 ctx context.Context, | 110 ctx context.Context, |
93 importID int64, | 111 importID int64, |
94 conn *sql.Conn, | 112 conn *sql.Conn, |
95 feedback Feedback, | 113 feedback Feedback, |
96 ) (interface{}, error) { | 114 ) (interface{}, error) { |
97 | 115 |
116 start := time.Now() | |
117 | |
98 f, err := os.Open(filepath.Join(agm.Dir, "agm.csv")) | 118 f, err := os.Open(filepath.Join(agm.Dir, "agm.csv")) |
99 if err != nil { | 119 if err != nil { |
100 return nil, err | 120 return nil, err |
101 } | 121 } |
102 defer f.Close() | 122 defer f.Close() |
112 | 132 |
113 headerIndices := map[string]int{} | 133 headerIndices := map[string]int{} |
114 | 134 |
115 for i, f := range headers { | 135 for i, f := range headers { |
116 log.Printf("%d: %s\n", i, f) | 136 log.Printf("%d: %s\n", i, f) |
117 headerIndices[strings.ToLower(strings.TrimSpace(f))] = i | 137 headerIndices[strings.Replace( |
118 } | 138 strings.ToLower( |
139 strings.TrimSpace(f)), " ", "_", -1)] = i | |
140 } | |
141 | |
142 var missing []string | |
119 | 143 |
120 for _, m := range [...]string{ | 144 for _, m := range [...]string{ |
121 "fk_gauge_id", | 145 "fk_gauge_id", |
122 "measure_date", | 146 "measure_date", |
123 "from", // "sender", | 147 "from", // "sender", |
124 "language_code", | 148 "language_code", |
125 "country_code", | 149 "country_code", |
126 "date_issue", | 150 "date_issue", |
127 "reference_code", | 151 "reference_code", |
128 "water_level", | 152 "value", // "water_level", |
129 "predicted", | 153 "predicted", |
130 "is_waterlevel", | 154 // "is_waterlevel", |
131 "value_min", | 155 "value_min", |
132 "value_max", | 156 "value_max", |
133 "date_info", | 157 "date_info", |
134 "originator", // "source_organization", | 158 "originator", // "source_organization", |
135 } { | 159 } { |
136 idx, found := headerIndices[m] | 160 if _, found := headerIndices[m]; !found { |
161 missing = append(missing, m) | |
162 } | |
163 } | |
164 | |
165 if len(missing) > 0 { | |
166 return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) | |
167 } | |
168 | |
169 inCm, _ := rescale("cm") | |
170 scaler := func(row []string) (func(float32) float32, error) { | |
171 idx, found := headerIndices["unit"] | |
137 if !found { | 172 if !found { |
138 log.Printf("missing column '%s'\n", m) | 173 return inCm, nil |
139 } | 174 } |
140 _ = idx | 175 log.Printf("scaler index: %d %d\n", idx, len(row)) |
141 } | 176 unit := row[idx] |
142 | 177 if unit == "cm" { |
143 return nil, errors.New("Not implemented, yet!") | 178 return inCm, nil |
144 } | 179 } |
180 s, err := rescale(unit) | |
181 return s, err | |
182 } | |
183 | |
184 tx, err := conn.BeginTx(ctx, nil) | |
185 if err != nil { | |
186 return nil, err | |
187 } | |
188 defer tx.Rollback() | |
189 | |
190 insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) | |
191 if err != nil { | |
192 return nil, err | |
193 } | |
194 defer insertStmt.Close() | |
195 trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) | |
196 if err != nil { | |
197 return nil, err | |
198 } | |
199 defer trackStmt.Close() | |
200 | |
201 ids := []int64{} | |
202 | |
203 args := make([]interface{}, 18) | |
204 | |
205 lines: | |
206 for line := 1; ; line++ { | |
207 | |
208 row, err := r.Read() | |
209 switch { | |
210 case err == io.EOF || len(row) == 0: | |
211 break lines | |
212 case err != nil: | |
213 return nil, fmt.Errorf("CSV parsing failed: %v", err) | |
214 } | |
215 convert, err := scaler(row) | |
216 if err != nil { | |
217 return nil, fmt.Errorf("line %d: %v", line, err) | |
218 } | |
219 | |
220 gids := row[headerIndices["fk_gauge_id"]] | |
221 gid, err := models.IsrsFromString(gids) | |
222 if err != nil { | |
223 return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err) | |
224 } | |
225 | |
226 args[0] = gid.CountryCode | |
227 args[1] = gid.LoCode | |
228 args[2] = gid.FairwaySection | |
229 args[3] = gid.Orc | |
230 args[4] = gid.Hectometre | |
231 | |
232 md, err := guessDate(row[headerIndices["measure_date"]]) | |
233 if err != nil { | |
234 return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) | |
235 } | |
236 args[5] = md | |
237 | |
238 args[6] = row[headerIndices["from"]] | |
239 args[7] = row[headerIndices["language_code"]] | |
240 args[8] = row[headerIndices["country_code"]] | |
241 | |
242 dis, err := guessDate(row[headerIndices["date_issue"]]) | |
243 if err != nil { | |
244 return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err) | |
245 } | |
246 args[9] = dis | |
247 | |
248 args[10] = row[headerIndices["reference_code"]] | |
249 | |
250 value, err := strconv.ParseFloat(row[headerIndices["value"]], 32) | |
251 if err != nil { | |
252 return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) | |
253 } | |
254 args[11] = convert(float32(value)) | |
255 | |
256 predicted := strings.ToLower(row[headerIndices["predicted"]]) == "true" | |
257 args[12] = predicted | |
258 | |
259 args[13] = true // is_waterlevel | |
260 | |
261 valueMin, err := strconv.ParseFloat(row[headerIndices["value_min"]], 32) | |
262 if err != nil { | |
263 return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err) | |
264 } | |
265 args[14] = convert(float32(valueMin)) | |
266 | |
267 valueMax, err := strconv.ParseFloat(row[headerIndices["value_max"]], 32) | |
268 if err != nil { | |
269 return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err) | |
270 } | |
271 args[15] = convert(float32(valueMax)) | |
272 | |
273 din, err := guessDate(row[headerIndices["date_info"]]) | |
274 if err != nil { | |
275 return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err) | |
276 } | |
277 args[16] = din | |
278 | |
279 args[17] = row[headerIndices["originator"]] | |
280 | |
281 var id int64 | |
282 if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil { | |
283 return nil, fmt.Errorf("Failed to insert line %d: %v", line, err) | |
284 } | |
285 ids = append(ids, id) | |
286 | |
287 if _, err := trackStmt.ExecContext( | |
288 ctx, importID, "waterway.gauge_measurements", id, | |
289 ); err != nil { | |
290 return nil, err | |
291 } | |
292 } | |
293 | |
294 if err := tx.Commit(); err != nil { | |
295 return nil, fmt.Errorf("Commit failed: %v", err) | |
296 } | |
297 | |
298 feedback.Info("Importing approved gauge measurements took %s", | |
299 time.Since(start)) | |
300 | |
301 summary := struct { | |
302 IDs []int64 `json:"ids"` | |
303 }{ | |
304 IDs: ids, | |
305 } | |
306 return &summary, nil | |
307 } |