Mercurial > gemma
comparison pkg/imports/wg.go @ 1829:b4b9089c2d79
Waterway gauges: Started with deleting old gauges to be overwritten.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 16 Jan 2019 16:34:47 +0100 |
parents | 4910bcfab319 |
children | f7b926440449 |
comparison
equal
deleted
inserted
replaced
1828:1ecfcf46e4da | 1829:b4b9089c2d79 |
---|---|
15 | 15 |
16 import ( | 16 import ( |
17 "context" | 17 "context" |
18 "database/sql" | 18 "database/sql" |
19 "errors" | 19 "errors" |
20 "fmt" | |
20 "log" | 21 "log" |
21 "strings" | 22 "strings" |
22 | 23 |
23 "gemma.intevation.de/gemma/pkg/common" | 24 "gemma.intevation.de/gemma/pkg/common" |
25 "gemma.intevation.de/gemma/pkg/models" | |
24 "gemma.intevation.de/gemma/pkg/soap" | 26 "gemma.intevation.de/gemma/pkg/soap" |
25 "gemma.intevation.de/gemma/pkg/soap/erdms" | 27 "gemma.intevation.de/gemma/pkg/soap/erdms" |
26 ) | 28 ) |
27 | 29 |
28 type WaterwayGauge struct { | 30 type WaterwayGauge struct { |
45 RegisterJobCreator(WGJobKind, wgJobCreator{}) | 47 RegisterJobCreator(WGJobKind, wgJobCreator{}) |
46 } | 48 } |
47 | 49 |
48 func (wgJobCreator) Description() string { return "waterway gauges" } | 50 func (wgJobCreator) Description() string { return "waterway gauges" } |
49 | 51 |
50 func (wgJobCreator) AutoAccept() bool { return false } | 52 func (wgJobCreator) AutoAccept() bool { return true } |
51 | 53 |
52 func (wgJobCreator) Create(_ JobKind, data string) (Job, error) { | 54 func (wgJobCreator) Create(_ JobKind, data string) (Job, error) { |
53 wg := new(WaterwayGauge) | 55 wg := new(WaterwayGauge) |
54 if err := common.FromJSONString(data, wg); err != nil { | 56 if err := common.FromJSONString(data, wg); err != nil { |
55 return nil, err | 57 return nil, err |
58 } | 60 } |
59 | 61 |
60 func (wgJobCreator) Depends() []string { | 62 func (wgJobCreator) Depends() []string { |
61 return []string{ | 63 return []string{ |
62 "gauges", | 64 "gauges", |
63 } | 65 "gauges_reference_water_levels", |
64 } | 66 } |
65 | 67 } |
66 func (wgJobCreator) StageDone( | 68 |
67 ctx context.Context, | 69 // StageDone does nothing as there is no staging for gauges. |
68 tx *sql.Tx, | 70 func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } |
69 id int64, | 71 |
70 ) error { | 72 // CleanUp does nothing as there is nothing to cleanup with gauges. |
71 // TODO: Implement me! | |
72 return nil | |
73 } | |
74 | |
75 func (*WaterwayGauge) CleanUp() error { return nil } | 73 func (*WaterwayGauge) CleanUp() error { return nil } |
76 | 74 |
77 const ( | 75 const ( |
78 selectCurrentUserCountrySQL = `SELECT users.current_user_country()` | 76 selectCurrentUserCountrySQL = `SELECT users.current_user_country()` |
77 | |
78 hasGaugeSQL = ` | |
79 SELECT true | |
80 FROM waterway.gauges | |
81 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` | |
82 | |
83 deleteReferenceWaterLevelsSQL = ` | |
84 DELETE FROM waterway.gauges_reference_water_levels | |
85 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` | |
86 | |
87 deleteGaugeSQL = ` | |
88 DELETE FROM waterway.gauges | |
89 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` | |
79 ) | 90 ) |
80 | 91 |
81 func (wg *WaterwayGauge) Do( | 92 func (wg *WaterwayGauge) Do( |
82 ctx context.Context, | 93 ctx context.Context, |
83 importID int64, | 94 importID int64, |
122 | 133 |
123 data, err := client.GetRisDataXML(request) | 134 data, err := client.GetRisDataXML(request) |
124 | 135 |
125 if err != nil { | 136 if err != nil { |
126 log.Printf("error: %v\n", err) | 137 log.Printf("error: %v\n", err) |
127 return nil, err | 138 return nil, fmt.Errorf("Error requesting ERDMS service: %v", err) |
128 } | 139 } |
129 | 140 |
130 for _, dr := range data.RisdataReturn { | 141 hasGaugeStmt, err := tx.PrepareContext(ctx, hasGaugeSQL) |
142 if err != nil { | |
143 return nil, err | |
144 } | |
145 defer hasGaugeStmt.Close() | |
146 | |
147 var ignored int | |
148 | |
149 type idxCode struct { | |
150 idx int | |
151 code *models.Isrs | |
152 } | |
153 | |
154 var news, olds []idxCode | |
155 | |
156 for i, dr := range data.RisdataReturn { | |
131 if dr.RisidxCode == nil { | 157 if dr.RisidxCode == nil { |
132 log.Printf("warn: RisidxCode == nil") | 158 ignored++ |
133 continue | 159 continue |
134 } | 160 } |
161 code, err := models.IsrsFromString(string(*dr.RisidxCode)) | |
162 if err != nil { | |
163 feedback.Warn("invalid ISRS code %v", err) | |
164 ignored++ | |
165 continue | |
166 } | |
167 | |
135 if dr.Objname.Loc == nil { | 168 if dr.Objname.Loc == nil { |
136 log.Printf("warn: Objname == nil") | 169 feedback.Warn("missing objname: %s", code) |
137 continue | 170 ignored++ |
138 } | 171 continue |
139 log.Printf("RisidxCode: %s\n", *dr.RisidxCode) | 172 } |
140 log.Printf("\tObjname: %s\n", *dr.Objname.Loc) | 173 |
174 if dr.Lat == nil || dr.Lon == nil { | |
175 feedback.Warn("missing lat/lon: %s", code) | |
176 ignored++ | |
177 continue | |
178 } | |
179 | |
180 if dr.Zeropoint == nil { | |
181 feedback.Warn("missing zeropoint: %s", code) | |
182 ignored++ | |
183 continue | |
184 } | |
185 | |
186 var dummy bool | |
187 err = hasGaugeStmt.QueryRowContext(ctx, | |
188 code.CountryCode, | |
189 code.LoCode, | |
190 code.FairwaySection, | |
191 code.Orc, | |
192 code.Hectometre, | |
193 ).Scan(&dummy) | |
194 switch { | |
195 case err == sql.ErrNoRows: | |
196 olds = append(olds, idxCode{idx: i, code: code}) | |
197 case err != nil: | |
198 return nil, err | |
199 case !dummy: | |
200 return nil, errors.New("Unexpected result") | |
201 default: | |
202 news = append(news, idxCode{idx: i, code: code}) | |
203 } | |
204 } | |
205 feedback.Info("ignored gauges: %d", ignored) | |
206 feedback.Info("new gauges: %d", len(news)) | |
207 feedback.Info("update gauges: %d", len(olds)) | |
208 | |
209 if len(news) == 0 && len(olds) == 0 { | |
210 return nil, errors.New("nothing to do") | |
211 } | |
212 | |
213 // delete the old | |
214 if len(olds) > 0 { | |
215 deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(ctx, deleteReferenceWaterLevelsSQL) | |
216 if err != nil { | |
217 return nil, err | |
218 } | |
219 defer deleteReferenceWaterLevelsStmt.Close() | |
220 deleteGaugeStmt, err := tx.PrepareContext(ctx, deleteGaugeSQL) | |
221 if err != nil { | |
222 return nil, err | |
223 } | |
224 for i := range olds { | |
225 ic := &olds[i] | |
226 | |
227 if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx, | |
228 ic.code.CountryCode, | |
229 ic.code.LoCode, | |
230 ic.code.FairwaySection, | |
231 ic.code.Orc, | |
232 ic.code.Hectometre, | |
233 ); err != nil { | |
234 return nil, err | |
235 } | |
236 if _, err := deleteGaugeStmt.ExecContext(ctx, | |
237 ic.code.CountryCode, | |
238 ic.code.LoCode, | |
239 ic.code.FairwaySection, | |
240 ic.code.Orc, | |
241 ic.code.Hectometre, | |
242 ); err != nil { | |
243 return nil, err | |
244 } | |
245 } | |
246 // treat them as new | |
247 news = append(news, olds...) | |
141 } | 248 } |
142 | 249 |
143 // TODO: Implement me! | 250 // TODO: Implement me! |
144 return nil, errors.New("Not implemented, yet!") | 251 return nil, errors.New("Not implemented, yet!") |
145 } | 252 } |