Mercurial > gemma
comparison pkg/imports/agm.go @ 1780:48791416bea5
(Approved) gauge measurement import: Fixed row level security.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Fri, 11 Jan 2019 16:48:14 +0100 |
parents | ad1c12e999df |
children | 0a53c33bc7b2 |
comparison
equal
deleted
inserted
replaced
1779:ad1c12e999df | 1780:48791416bea5 |
---|---|
63 "gauge_measurements", | 63 "gauge_measurements", |
64 } | 64 } |
65 } | 65 } |
66 | 66 |
67 const ( | 67 const ( |
68 // TODO: re-add staging_done field in table and fix RLS policy | 68 // delete the old and keep the new measures. |
69 // issue for raw import. | 69 agmStageDoneDeleteSQL = ` |
70 WITH staged AS ( | |
71 SELECT key | |
72 FROM waterway.track_imports | |
73 WHERE import_id = $1 AND | |
74 relation = 'waterway.gauge_measurements'::regclass | |
75 ), | |
76 to_delete AS ( | |
77 SELECT o.id AS id | |
78 FROM waterway.gauge_measurements oJOIN waterway.gauge_measurements n | |
79 ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date | |
80 WHERE n.id IN (SELECT key FROM staged) | |
81 AND o.id NOT IN (SELECT key FROM staged) | |
82 ) | |
83 DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)` | |
84 | |
70 agmStageDoneSQL = ` | 85 agmStageDoneSQL = ` |
71 UPDATE waterway.gauge_measurements SET staging_done = true | 86 UPDATE waterway.gauge_measurements SET staging_done = true |
72 WHERE id = ( | 87 WHERE id IN ( |
73 SELECT key from waterway.track_imports | 88 SELECT key FROM waterway.track_imports |
74 WHERE import_id = $1 AND | 89 WHERE import_id = $1 AND |
75 relation = 'waterway.gauge_measurements'::regclass)` | 90 relation = 'waterway.gauge_measurements'::regclass)` |
76 ) | 91 ) |
77 | 92 |
78 func (agmJobCreator) StageDone( | 93 func (agmJobCreator) StageDone( |
79 ctx context.Context, | 94 ctx context.Context, |
80 tx *sql.Tx, | 95 tx *sql.Tx, |
81 id int64, | 96 id int64, |
82 ) error { | 97 ) error { |
83 _, err := tx.ExecContext(ctx, agmStageDoneSQL, id) | 98 _, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id) |
99 if err == nil { | |
100 _, err = tx.ExecContext(ctx, agmStageDoneSQL, id) | |
101 } | |
84 return err | 102 return err |
85 } | 103 } |
86 | 104 |
87 // CleanUp removes the folder containing the CSV file with the | 105 // CleanUp removes the folder containing the CSV file with the |
88 // the approved gauge measurements. | 106 // the approved gauge measurements. |
195 } | 213 } |
196 defer trackStmt.Close() | 214 defer trackStmt.Close() |
197 | 215 |
198 ids := []int64{} | 216 ids := []int64{} |
199 | 217 |
200 args := make([]interface{}, 18) | 218 args := make([]interface{}, 19) |
219 | |
220 args[18] = false // staging_done | |
201 | 221 |
202 lines: | 222 lines: |
203 for line := 1; ; line++ { | 223 for line := 1; ; line++ { |
204 | 224 |
205 row, err := r.Read() | 225 row, err := r.Read() |
273 } | 293 } |
274 args[16] = din | 294 args[16] = din |
275 | 295 |
276 args[17] = row[headerIndices["originator"]] | 296 args[17] = row[headerIndices["originator"]] |
277 | 297 |
298 // args[18] (staging_done) is set to true outside the loop. | |
299 | |
278 var id int64 | 300 var id int64 |
279 if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil { | 301 if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil { |
280 return nil, fmt.Errorf("Failed to insert line %d: %v", line, err) | 302 return nil, fmt.Errorf("Failed to insert line %d: %v", line, err) |
281 } | 303 } |
282 ids = append(ids, id) | 304 ids = append(ids, id) |