comparison pkg/imports/agm.go @ 1780:48791416bea5

(Approved) gauge measurement import: Fixed row level security.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 11 Jan 2019 16:48:14 +0100
parents ad1c12e999df
children 0a53c33bc7b2
comparison
equal deleted inserted replaced
1779:ad1c12e999df 1780:48791416bea5
63 "gauge_measurements", 63 "gauge_measurements",
64 } 64 }
65 } 65 }
66 66
67 const ( 67 const (
68 // TODO: re-add staging_done field in table and fix RLS policy 68 // delete the old and keep the new measures.
69 // issue for raw import. 69 agmStageDoneDeleteSQL = `
70 WITH staged AS (
71 SELECT key
72 FROM waterway.track_imports
73 WHERE import_id = $1 AND
74 relation = 'waterway.gauge_measurements'::regclass
75 ),
76 to_delete AS (
77 SELECT o.id AS id
78 FROM waterway.gauge_measurements oJOIN waterway.gauge_measurements n
79 ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date
80 WHERE n.id IN (SELECT key FROM staged)
81 AND o.id NOT IN (SELECT key FROM staged)
82 )
83 DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)`
84
70 agmStageDoneSQL = ` 85 agmStageDoneSQL = `
71 UPDATE waterway.gauge_measurements SET staging_done = true 86 UPDATE waterway.gauge_measurements SET staging_done = true
72 WHERE id = ( 87 WHERE id IN (
73 SELECT key from waterway.track_imports 88 SELECT key FROM waterway.track_imports
74 WHERE import_id = $1 AND 89 WHERE import_id = $1 AND
75 relation = 'waterway.gauge_measurements'::regclass)` 90 relation = 'waterway.gauge_measurements'::regclass)`
76 ) 91 )
77 92
78 func (agmJobCreator) StageDone( 93 func (agmJobCreator) StageDone(
79 ctx context.Context, 94 ctx context.Context,
80 tx *sql.Tx, 95 tx *sql.Tx,
81 id int64, 96 id int64,
82 ) error { 97 ) error {
83 _, err := tx.ExecContext(ctx, agmStageDoneSQL, id) 98 _, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id)
99 if err == nil {
100 _, err = tx.ExecContext(ctx, agmStageDoneSQL, id)
101 }
84 return err 102 return err
85 } 103 }
86 104
87 // CleanUp removes the folder containing the CSV file with the 105 // CleanUp removes the folder containing the CSV file with the
88 // the approved gauge measurements. 106 // the approved gauge measurements.
195 } 213 }
196 defer trackStmt.Close() 214 defer trackStmt.Close()
197 215
198 ids := []int64{} 216 ids := []int64{}
199 217
200 args := make([]interface{}, 18) 218 args := make([]interface{}, 19)
219
220 args[18] = false // staging_done
201 221
202 lines: 222 lines:
203 for line := 1; ; line++ { 223 for line := 1; ; line++ {
204 224
205 row, err := r.Read() 225 row, err := r.Read()
273 } 293 }
274 args[16] = din 294 args[16] = din
275 295
276 args[17] = row[headerIndices["originator"]] 296 args[17] = row[headerIndices["originator"]]
277 297
298 // args[18] (staging_done) is set to true outside the loop.
299
278 var id int64 300 var id int64
279 if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil { 301 if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil {
280 return nil, fmt.Errorf("Failed to insert line %d: %v", line, err) 302 return nil, fmt.Errorf("Failed to insert line %d: %v", line, err)
281 } 303 }
282 ids = append(ids, id) 304 ids = append(ids, id)