comparison pkg/imports/agm.go @ 1778:164b46ebd60d

Approved gauge measurement import: Implemented. TODO: Fix staging.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 11 Jan 2019 15:28:10 +0100
parents fcb0106ec510
children ad1c12e999df
comparison
equal deleted inserted replaced
1777:e70b7b8e7b74 1778:164b46ebd60d
16 import ( 16 import (
17 "bufio" 17 "bufio"
18 "context" 18 "context"
19 "database/sql" 19 "database/sql"
20 "encoding/csv" 20 "encoding/csv"
21 "errors" 21 "fmt"
22 "io"
22 "log" 23 "log"
23 "os" 24 "os"
24 "path/filepath" 25 "path/filepath"
26 "strconv"
25 "strings" 27 "strings"
28 "time"
26 29
27 "gemma.intevation.de/gemma/pkg/common" 30 "gemma.intevation.de/gemma/pkg/common"
31 "gemma.intevation.de/gemma/pkg/models"
28 ) 32 )
29 33
30 type ApprovedGaugeMeasurements struct { 34 type ApprovedGaugeMeasurements struct {
31 Dir string `json:"dir"` 35 Dir string `json:"dir"`
32 } 36 }
85 // the approved gauge measurements. 89 // the approved gauge measurements.
86 func (agm *ApprovedGaugeMeasurements) CleanUp() error { 90 func (agm *ApprovedGaugeMeasurements) CleanUp() error {
87 return os.RemoveAll(agm.Dir) 91 return os.RemoveAll(agm.Dir)
88 } 92 }
89 93
94 func guessDate(s string) (time.Time, error) {
95 var err error
96 var t time.Time
97 for _, layout := range [...]string{
98 "02.01.2006 15:04",
99 "2006-01-02T15:04:05-07:00",
100 } {
101 if t, err = time.Parse(layout, s); err == nil {
102 break
103 }
104 }
105 return t, err
106 }
107
90 // Do executes the actual approved gauge measurements import. 108 // Do executes the actual approved gauge measurements import.
91 func (agm *ApprovedGaugeMeasurements) Do( 109 func (agm *ApprovedGaugeMeasurements) Do(
92 ctx context.Context, 110 ctx context.Context,
93 importID int64, 111 importID int64,
94 conn *sql.Conn, 112 conn *sql.Conn,
95 feedback Feedback, 113 feedback Feedback,
96 ) (interface{}, error) { 114 ) (interface{}, error) {
97 115
116 start := time.Now()
117
98 f, err := os.Open(filepath.Join(agm.Dir, "agm.csv")) 118 f, err := os.Open(filepath.Join(agm.Dir, "agm.csv"))
99 if err != nil { 119 if err != nil {
100 return nil, err 120 return nil, err
101 } 121 }
102 defer f.Close() 122 defer f.Close()
112 132
113 headerIndices := map[string]int{} 133 headerIndices := map[string]int{}
114 134
115 for i, f := range headers { 135 for i, f := range headers {
116 log.Printf("%d: %s\n", i, f) 136 log.Printf("%d: %s\n", i, f)
117 headerIndices[strings.ToLower(strings.TrimSpace(f))] = i 137 headerIndices[strings.Replace(
118 } 138 strings.ToLower(
139 strings.TrimSpace(f)), " ", "_", -1)] = i
140 }
141
142 var missing []string
119 143
120 for _, m := range [...]string{ 144 for _, m := range [...]string{
121 "fk_gauge_id", 145 "fk_gauge_id",
122 "measure_date", 146 "measure_date",
123 "from", // "sender", 147 "from", // "sender",
124 "language_code", 148 "language_code",
125 "country_code", 149 "country_code",
126 "date_issue", 150 "date_issue",
127 "reference_code", 151 "reference_code",
128 "water_level", 152 "value", // "water_level",
129 "predicted", 153 "predicted",
130 "is_waterlevel", 154 // "is_waterlevel",
131 "value_min", 155 "value_min",
132 "value_max", 156 "value_max",
133 "date_info", 157 "date_info",
134 "originator", // "source_organization", 158 "originator", // "source_organization",
135 } { 159 } {
136 idx, found := headerIndices[m] 160 if _, found := headerIndices[m]; !found {
161 missing = append(missing, m)
162 }
163 }
164
165 if len(missing) > 0 {
166 return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
167 }
168
169 inCm, _ := rescale("cm")
170 scaler := func(row []string) (func(float32) float32, error) {
171 idx, found := headerIndices["unit"]
137 if !found { 172 if !found {
138 log.Printf("missing column '%s'\n", m) 173 return inCm, nil
139 } 174 }
140 _ = idx 175 log.Printf("scaler index: %d %d\n", idx, len(row))
141 } 176 unit := row[idx]
142 177 if unit == "cm" {
143 return nil, errors.New("Not implemented, yet!") 178 return inCm, nil
144 } 179 }
180 s, err := rescale(unit)
181 return s, err
182 }
183
184 tx, err := conn.BeginTx(ctx, nil)
185 if err != nil {
186 return nil, err
187 }
188 defer tx.Rollback()
189
190 insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
191 if err != nil {
192 return nil, err
193 }
194 defer insertStmt.Close()
195 trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
196 if err != nil {
197 return nil, err
198 }
199 defer trackStmt.Close()
200
201 ids := []int64{}
202
203 args := make([]interface{}, 18)
204
205 lines:
206 for line := 1; ; line++ {
207
208 row, err := r.Read()
209 switch {
210 case err == io.EOF || len(row) == 0:
211 break lines
212 case err != nil:
213 return nil, fmt.Errorf("CSV parsing failed: %v", err)
214 }
215 convert, err := scaler(row)
216 if err != nil {
217 return nil, fmt.Errorf("line %d: %v", line, err)
218 }
219
220 gids := row[headerIndices["fk_gauge_id"]]
221 gid, err := models.IsrsFromString(gids)
222 if err != nil {
223 return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err)
224 }
225
226 args[0] = gid.CountryCode
227 args[1] = gid.LoCode
228 args[2] = gid.FairwaySection
229 args[3] = gid.Orc
230 args[4] = gid.Hectometre
231
232 md, err := guessDate(row[headerIndices["measure_date"]])
233 if err != nil {
234 return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err)
235 }
236 args[5] = md
237
238 args[6] = row[headerIndices["from"]]
239 args[7] = row[headerIndices["language_code"]]
240 args[8] = row[headerIndices["country_code"]]
241
242 dis, err := guessDate(row[headerIndices["date_issue"]])
243 if err != nil {
244 return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err)
245 }
246 args[9] = dis
247
248 args[10] = row[headerIndices["reference_code"]]
249
250 value, err := strconv.ParseFloat(row[headerIndices["value"]], 32)
251 if err != nil {
252 return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err)
253 }
254 args[11] = convert(float32(value))
255
256 predicted := strings.ToLower(row[headerIndices["predicted"]]) == "true"
257 args[12] = predicted
258
259 args[13] = true // is_waterlevel
260
261 valueMin, err := strconv.ParseFloat(row[headerIndices["value_min"]], 32)
262 if err != nil {
263 return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err)
264 }
265 args[14] = convert(float32(valueMin))
266
267 valueMax, err := strconv.ParseFloat(row[headerIndices["value_max"]], 32)
268 if err != nil {
269 return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err)
270 }
271 args[15] = convert(float32(valueMax))
272
273 din, err := guessDate(row[headerIndices["date_info"]])
274 if err != nil {
275 return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err)
276 }
277 args[16] = din
278
279 args[17] = row[headerIndices["originator"]]
280
281 var id int64
282 if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil {
283 return nil, fmt.Errorf("Failed to insert line %d: %v", line, err)
284 }
285 ids = append(ids, id)
286
287 if _, err := trackStmt.ExecContext(
288 ctx, importID, "waterway.gauge_measurements", id,
289 ); err != nil {
290 return nil, err
291 }
292 }
293
294 if err := tx.Commit(); err != nil {
295 return nil, fmt.Errorf("Commit failed: %v", err)
296 }
297
298 feedback.Info("Importing approved gauge measurements took %s",
299 time.Since(start))
300
301 summary := struct {
302 IDs []int64 `json:"ids"`
303 }{
304 IDs: ids,
305 }
306 return &summary, nil
307 }