comparison pkg/imports/wg.go @ 1829:b4b9089c2d79

Waterway gauges: Started with deleting old gauges to be overwritten.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 16 Jan 2019 16:34:47 +0100
parents 4910bcfab319
children f7b926440449
comparison
equal deleted inserted replaced
1828:1ecfcf46e4da 1829:b4b9089c2d79
15 15
16 import ( 16 import (
17 "context" 17 "context"
18 "database/sql" 18 "database/sql"
19 "errors" 19 "errors"
20 "fmt"
20 "log" 21 "log"
21 "strings" 22 "strings"
22 23
23 "gemma.intevation.de/gemma/pkg/common" 24 "gemma.intevation.de/gemma/pkg/common"
25 "gemma.intevation.de/gemma/pkg/models"
24 "gemma.intevation.de/gemma/pkg/soap" 26 "gemma.intevation.de/gemma/pkg/soap"
25 "gemma.intevation.de/gemma/pkg/soap/erdms" 27 "gemma.intevation.de/gemma/pkg/soap/erdms"
26 ) 28 )
27 29
28 type WaterwayGauge struct { 30 type WaterwayGauge struct {
45 RegisterJobCreator(WGJobKind, wgJobCreator{}) 47 RegisterJobCreator(WGJobKind, wgJobCreator{})
46 } 48 }
47 49
48 func (wgJobCreator) Description() string { return "waterway gauges" } 50 func (wgJobCreator) Description() string { return "waterway gauges" }
49 51
50 func (wgJobCreator) AutoAccept() bool { return false } 52 func (wgJobCreator) AutoAccept() bool { return true }
51 53
52 func (wgJobCreator) Create(_ JobKind, data string) (Job, error) { 54 func (wgJobCreator) Create(_ JobKind, data string) (Job, error) {
53 wg := new(WaterwayGauge) 55 wg := new(WaterwayGauge)
54 if err := common.FromJSONString(data, wg); err != nil { 56 if err := common.FromJSONString(data, wg); err != nil {
55 return nil, err 57 return nil, err
58 } 60 }
59 61
60 func (wgJobCreator) Depends() []string { 62 func (wgJobCreator) Depends() []string {
61 return []string{ 63 return []string{
62 "gauges", 64 "gauges",
63 } 65 "gauges_reference_water_levels",
64 } 66 }
65 67 }
66 func (wgJobCreator) StageDone( 68
67 ctx context.Context, 69 // StageDone does nothing as there is no staging for gauges.
68 tx *sql.Tx, 70 func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil }
69 id int64, 71
70 ) error { 72 // CleanUp does nothing as there is nothing to cleanup with gauges.
71 // TODO: Implement me!
72 return nil
73 }
74
75 func (*WaterwayGauge) CleanUp() error { return nil } 73 func (*WaterwayGauge) CleanUp() error { return nil }
76 74
77 const ( 75 const (
78 selectCurrentUserCountrySQL = `SELECT users.current_user_country()` 76 selectCurrentUserCountrySQL = `SELECT users.current_user_country()`
77
78 hasGaugeSQL = `
79 SELECT true
80 FROM waterway.gauges
81 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
82
83 deleteReferenceWaterLevelsSQL = `
84 DELETE FROM waterway.gauges_reference_water_levels
85 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
86
87 deleteGaugeSQL = `
88 DELETE FROM waterway.gauges
89 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
79 ) 90 )
80 91
81 func (wg *WaterwayGauge) Do( 92 func (wg *WaterwayGauge) Do(
82 ctx context.Context, 93 ctx context.Context,
83 importID int64, 94 importID int64,
122 133
123 data, err := client.GetRisDataXML(request) 134 data, err := client.GetRisDataXML(request)
124 135
125 if err != nil { 136 if err != nil {
126 log.Printf("error: %v\n", err) 137 log.Printf("error: %v\n", err)
127 return nil, err 138 return nil, fmt.Errorf("Error requesting ERDMS service: %v", err)
128 } 139 }
129 140
130 for _, dr := range data.RisdataReturn { 141 hasGaugeStmt, err := tx.PrepareContext(ctx, hasGaugeSQL)
142 if err != nil {
143 return nil, err
144 }
145 defer hasGaugeStmt.Close()
146
147 var ignored int
148
149 type idxCode struct {
150 idx int
151 code *models.Isrs
152 }
153
154 var news, olds []idxCode
155
156 for i, dr := range data.RisdataReturn {
131 if dr.RisidxCode == nil { 157 if dr.RisidxCode == nil {
132 log.Printf("warn: RisidxCode == nil") 158 ignored++
133 continue 159 continue
134 } 160 }
161 code, err := models.IsrsFromString(string(*dr.RisidxCode))
162 if err != nil {
163 feedback.Warn("invalid ISRS code %v", err)
164 ignored++
165 continue
166 }
167
135 if dr.Objname.Loc == nil { 168 if dr.Objname.Loc == nil {
136 log.Printf("warn: Objname == nil") 169 feedback.Warn("missing objname: %s", code)
137 continue 170 ignored++
138 } 171 continue
139 log.Printf("RisidxCode: %s\n", *dr.RisidxCode) 172 }
140 log.Printf("\tObjname: %s\n", *dr.Objname.Loc) 173
174 if dr.Lat == nil || dr.Lon == nil {
175 feedback.Warn("missing lat/lon: %s", code)
176 ignored++
177 continue
178 }
179
180 if dr.Zeropoint == nil {
181 feedback.Warn("missing zeropoint: %s", code)
182 ignored++
183 continue
184 }
185
186 var dummy bool
187 err = hasGaugeStmt.QueryRowContext(ctx,
188 code.CountryCode,
189 code.LoCode,
190 code.FairwaySection,
191 code.Orc,
192 code.Hectometre,
193 ).Scan(&dummy)
194 switch {
195 case err == sql.ErrNoRows:
196 olds = append(olds, idxCode{idx: i, code: code})
197 case err != nil:
198 return nil, err
199 case !dummy:
200 return nil, errors.New("Unexpected result")
201 default:
202 news = append(news, idxCode{idx: i, code: code})
203 }
204 }
205 feedback.Info("ignored gauges: %d", ignored)
206 feedback.Info("new gauges: %d", len(news))
207 feedback.Info("update gauges: %d", len(olds))
208
209 if len(news) == 0 && len(olds) == 0 {
210 return nil, errors.New("nothing to do")
211 }
212
213 // delete the old
214 if len(olds) > 0 {
215 deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(ctx, deleteReferenceWaterLevelsSQL)
216 if err != nil {
217 return nil, err
218 }
219 defer deleteReferenceWaterLevelsStmt.Close()
220 deleteGaugeStmt, err := tx.PrepareContext(ctx, deleteGaugeSQL)
221 if err != nil {
222 return nil, err
223 }
224 for i := range olds {
225 ic := &olds[i]
226
227 if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx,
228 ic.code.CountryCode,
229 ic.code.LoCode,
230 ic.code.FairwaySection,
231 ic.code.Orc,
232 ic.code.Hectometre,
233 ); err != nil {
234 return nil, err
235 }
236 if _, err := deleteGaugeStmt.ExecContext(ctx,
237 ic.code.CountryCode,
238 ic.code.LoCode,
239 ic.code.FairwaySection,
240 ic.code.Orc,
241 ic.code.Hectometre,
242 ); err != nil {
243 return nil, err
244 }
245 }
246 // treat them as new
247 news = append(news, olds...)
141 } 248 }
142 249
143 // TODO: Implement me! 250 // TODO: Implement me!
144 return nil, errors.New("Not implemented, yet!") 251 return nil, errors.New("Not implemented, yet!")
145 } 252 }