annotate contrib/gmaggregate/main.go @ 5560:f2204f91d286

Join the log lines of imports to the log exports to recover data from them. Used in SR export to extract information that where in the meta json but now are only found in the log.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Feb 2022 18:34:40 +0100
parents e9ef27c75e5c
children 2dd155cc95ec
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
5548
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
1 // This is Free Software under GNU Affero General Public License v >= 3.0
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
2 // without warranty, see README.md and license for details.
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
3 //
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
4 // SPDX-License-Identifier: AGPL-3.0-or-later
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
5 // License-Filename: LICENSE
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
6 //
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
7 // Copyright (C) 2021 by via donau
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
8 // - Österreichische Wasserstraßen-Gesellschaft mbH
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
9 // Software engineering by Intevation GmbH
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
10 //
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
11 // Author(s):
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de>
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
13
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
14 //go:generate ragel -Z -G2 -o matcher.go matcher.rl
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
15 //go:generate go fmt matcher.go
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
16
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
17 package main
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
18
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
19 import (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
20 "container/heap"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
21 "context"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
22 "database/sql"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
23 "encoding/csv"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
24 "flag"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
25 "fmt"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
26 "log"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
27 "os"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
28 "runtime"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
29 "sort"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
30 "strconv"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
31 "strings"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
32 "sync"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
33 "time"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
34
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
35 _ "github.com/jackc/pgx/v4/stdlib"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
36 )
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
37
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
38 const (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
39 selectOldGMLogsSQL = `
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
40 SELECT
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
41 lo.import_id,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
42 lo.time,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
43 lo.kind,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
44 lo.msg
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
45 FROM import.imports im
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
46 JOIN import.import_logs lo
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
47 ON lo.import_id = im.id
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
48 WHERE im.kind = 'gm'
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
49 ORDER BY lo.import_id`
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
50
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
51 createFilteredLogsSQL = `
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
52 CREATE TABLE filtered_logs (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
53 import_id integer NOT NULL,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
54 time timestamp with time zone NOT NULL,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
55 kind log_type NOT NULL,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
56 msg text NOT NULL
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
57 )`
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
58
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
59 insertFilteredLogsSQL = `
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
60 INSERT INTO filtered_logs (import_id, time, kind, msg)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
61 VALUES ($1, $2, $3::log_type, $4)`
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
62
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
63 deleteOldGMLogsSQL = `
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
64 DELETE FROM import.import_logs WHERE import_id IN (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
65 SELECT import_id FROM filtered_logs)`
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
66
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
67 copyDataSQL = `
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
68 INSERT INTO import.import_logs (import_id, time, kind, msg)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
69 SELECT import_id, time, kind, msg
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
70 FROM filtered_logs`
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
71
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
72 dropFilteredLogsSQL = `DROP TABLE filtered_logs`
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
73 )
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
74
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
75 type phases int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
76
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
77 const (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
78 nonePhase phases = 0
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
79 filterPhase phases = 1 << iota
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
80 transferPhase
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
81 )
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
82
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
83 type gauge struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
84 gid string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
85 unknown bool
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
86 assumeZPG bool
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
87 ignMeasCodes []string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
88 rescaleErrors []string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
89 missingValues []string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
90 assumeCM int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
91 badValues int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
92 measurements int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
93 predictions int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
94 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
95
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
96 type aggregator struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
97 current string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
98 hold *line
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
99
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
100 lastGauge *gauge
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
101 gauges []*gauge
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
102
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
103 stack [4]string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
104 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
105
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
106 type line struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
107 time time.Time
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
108 kind string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
109 msg string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
110 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
111
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
112 type importLines struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
113 seq int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
114 id int64
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
115 lines []line
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
116 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
117
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
118 type processor struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
119 cond *sync.Cond
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
120 aggregated []*importLines
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
121 nextOutSeq int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
122 done bool
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
123 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
124
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
125 type writer interface {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
126 prepare(context.Context, *sql.Conn) error
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
127 write(*importLines)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
128 finish()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
129 error() error
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
130 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
131
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
132 type csvWriter struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
133 err error
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
134 file *os.File
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
135 out *csv.Writer
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
136 row [1 + 1 + 1 + 1]string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
137 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
138
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
139 type sqlWriter struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
140 err error
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
141 ctx context.Context
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
142 tx *sql.Tx
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
143 stmt *sql.Stmt
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
144 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
145
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
146 func (ps phases) has(p phases) bool {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
147 return ps&p == p
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
148 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
149
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
150 func parsePhases(s string) (phases, error) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
151 ps := nonePhase
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
152 for _, x := range strings.Split(s, ",") {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
153 switch strings.ToLower(strings.TrimSpace(x)) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
154 case "transfer":
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
155 ps |= transferPhase
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
156 case "filter":
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
157 ps |= filterPhase
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
158 default:
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
159 return nonePhase, fmt.Errorf("invalid phase '%s'", x)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
160 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
161 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
162 return ps, nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
163 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
164
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
165 func (g *gauge) getAssumeZPG() bool { return g.assumeZPG }
5551
e9ef27c75e5c Fixed minor typo.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5548
diff changeset
166 func (g *gauge) getUnknown() bool { return g.unknown }
5548
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
167 func (g *gauge) getIgnoredMeasureCodes() []string { return g.ignMeasCodes }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
168 func (g *gauge) getRescaleErrors() []string { return g.rescaleErrors }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
169 func (g *gauge) getMissingValues() []string { return g.missingValues }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
170 func (g *gauge) getAssumeCM() int { return g.assumeCM }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
171 func (g *gauge) getBadValues() int { return g.badValues }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
172 func (g *gauge) getPredictions() int { return g.predictions }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
173 func (g *gauge) getMeasurements() int { return g.measurements }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
174 func (g *gauge) nothingChanged() bool { return g.measurements == 0 && g.predictions == 0 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
175
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
176 func (agg *aggregator) reset() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
177 agg.current = ""
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
178 agg.hold = nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
179 agg.lastGauge = nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
180 agg.gauges = nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
181 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
182
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
183 func (agg *aggregator) find(name string) *gauge {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
184 if agg.lastGauge != nil && name == agg.lastGauge.gid {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
185 return agg.lastGauge
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
186 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
187 for _, g := range agg.gauges {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
188 if g.gid == name {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
189 agg.lastGauge = g
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
190 return g
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
191 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
192 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
193 g := &gauge{gid: name}
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
194 agg.gauges = append(agg.gauges, g)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
195 agg.lastGauge = g
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
196 return g
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
197 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
198
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
199 func extend(haystack []string, needle string) []string {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
200 for _, straw := range haystack {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
201 if straw == needle {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
202 return haystack
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
203 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
204 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
205 return append(haystack, needle)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
206 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
207
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
208 func (agg *aggregator) logBool(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
209 access func(*gauge) bool,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
210 header string,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
211 log func(string),
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
212 ) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
213 var sb strings.Builder
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
214 for _, g := range agg.gauges {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
215 if access(g) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
216 if sb.Len() == 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
217 sb.WriteString(header)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
218 } else {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
219 sb.WriteString(", ")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
220 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
221 sb.WriteString(g.gid)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
222 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
223 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
224 if sb.Len() > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
225 log(sb.String())
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
226 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
227 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
228
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
229 func (agg *aggregator) logInt(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
230 access func(*gauge) int,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
231 header string,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
232 log func(string),
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
233 ) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
234 gs := make([]*gauge, 0, len(agg.gauges))
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
235 for _, g := range agg.gauges {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
236 if access(g) > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
237 gs = append(gs, g)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
238 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
239 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
240
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
241 if len(gs) == 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
242 return
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
243 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
244
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
245 sort.SliceStable(gs, func(i, j int) bool {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
246 return access(gs[i]) < access(gs[j])
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
247 })
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
248
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
249 var sb strings.Builder
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
250 var last int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
251
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
252 for _, g := range gs {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
253 if c := access(g); c != last {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
254 if sb.Len() == 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
255 sb.WriteString(header)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
256 } else {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
257 sb.WriteString("); ")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
258 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
259 sb.WriteString(strconv.Itoa(c))
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
260 sb.WriteString(" (")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
261 last = c
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
262 } else {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
263 sb.WriteString(", ")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
264 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
265 sb.WriteString(g.gid)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
266 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
267
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
268 sb.WriteByte(')')
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
269 log(sb.String())
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
270 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
271
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
272 func (agg *aggregator) logString(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
273 access func(*gauge) []string,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
274 header string,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
275 log func(string),
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
276 ) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
277 var sb strings.Builder
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
278 for _, g := range agg.gauges {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
279 if s := access(g); len(s) > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
280 if sb.Len() == 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
281 sb.WriteString(header)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
282 } else {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
283 sb.WriteString(", ")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
284 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
285 sb.WriteString(g.gid)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
286 sb.WriteString(" (")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
287 for i, v := range s {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
288 if i > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
289 sb.WriteString("; ")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
290 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
291 sb.WriteString(v)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
292 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
293 sb.WriteByte(')')
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
294 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
295 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
296 if sb.Len() > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
297 log(sb.String())
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
298 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
299 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
300
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
301 func (agg *aggregator) aggregate(out []line, last time.Time) []line {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
302
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
303 // Guarantee that new lines has a time after already put out lines.
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
304 if n := len(out); n > 0 && !out[n-1].time.Before(last) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
305 last = out[n-1].time.Add(time.Millisecond)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
306 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
307
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
308 log := func(kind, msg string) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
309 out = append(out, line{last, kind, msg})
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
310 last = last.Add(time.Millisecond)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
311 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
312
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
313 infoLog := func(msg string) { log("info", msg) }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
314 warnLog := func(msg string) { log("warn", msg) }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
315 errLog := func(msg string) { log("error", msg) }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
316
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
317 agg.logBool(
5551
e9ef27c75e5c Fixed minor typo.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5548
diff changeset
318 (*gauge).getUnknown,
5548
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
319 "Cannot find following gauges: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
320 warnLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
321
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
322 agg.logBool(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
323 (*gauge).getAssumeZPG,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
324 "'Reference_code' not specified. Assuming 'ZPG': ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
325 warnLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
326
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
327 agg.logInt(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
328 (*gauge).getAssumeCM,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
329 "'Unit' not specified. Assuming 'cm': ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
330 warnLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
331
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
332 agg.logInt(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
333 (*gauge).getBadValues,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
334 "Ignored measurements with value -99999: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
335 warnLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
336
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
337 agg.logString(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
338 (*gauge).getMissingValues,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
339 "Missing mandatory values: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
340 warnLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
341
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
342 agg.logString(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
343 (*gauge).getRescaleErrors,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
344 "Cannot convert units: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
345 errLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
346
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
347 agg.logString(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
348 (*gauge).getRescaleErrors,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
349 "Ignored measure codes: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
350 warnLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
351
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
352 agg.logInt(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
353 (*gauge).getPredictions,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
354 "New predictions: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
355 infoLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
356
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
357 agg.logInt(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
358 (*gauge).getMeasurements,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
359 "New measurements: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
360 infoLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
361
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
362 agg.logBool(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
363 (*gauge).nothingChanged,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
364 "No changes for: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
365 infoLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
366
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
367 if agg.hold != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
368 agg.hold.time = last
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
369 out = append(out, *agg.hold)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
370 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
371 return out
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
372 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
373
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
374 func (agg *aggregator) run(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
375 wg *sync.WaitGroup,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
376 logs <-chan *importLines,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
377 pr *processor,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
378 ) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
379 defer wg.Done()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
380 for l := range logs {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
381 // Do sorting by time in user land to take advantage
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
382 // of concurrent workers.
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
383 lines := l.lines
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
384 sort.Slice(lines, func(i, j int) bool {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
385 return lines[i].time.Before(lines[j].time)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
386 })
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
387
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
388 out := lines[:0:len(lines)]
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
389 for i := range lines {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
390 line := &lines[i]
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
391 if !agg.match(line.msg, line) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
392 out = append(out, *line)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
393 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
394 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
395 l.lines = agg.aggregate(out, lines[len(lines)-1].time)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
396 pr.consume(l)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
397 agg.reset()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
398 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
399 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
400
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
401 const timeFormat = "2006-01-02 15:04:05.999999-07"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
402
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
403 func newCSVWriter(filename string) (*csvWriter, error) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
404
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
405 f, err := os.Create(filename)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
406 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
407 return nil, err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
408 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
409
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
410 return &csvWriter{
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
411 file: f,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
412 out: csv.NewWriter(f),
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
413 }, nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
414 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
415
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
416 func (cw *csvWriter) prepare(context.Context, *sql.Conn) error {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
417 return nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
418 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
419
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
420 func (cw *csvWriter) error() error { return cw.err }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
421
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
422 func (cw *csvWriter) write(entry *importLines) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
423 if cw.err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
424 return
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
425 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
426 row := cw.row[:]
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
427 row[0] = strconv.FormatInt(entry.id, 10)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
428 for _, l := range entry.lines {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
429 row[1] = l.time.Format(timeFormat)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
430 row[2] = l.kind
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
431 row[3] = l.msg
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
432 if cw.err = cw.out.Write(row); cw.err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
433 log.Printf("error: Writing to CSV file failed: %v\n", cw.err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
434 return
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
435 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
436 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
437 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
438
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
439 func (cw *csvWriter) finish() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
440 cw.out.Flush()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
441 if err := cw.out.Error(); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
442 log.Printf("error: flushing CSV file failed: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
443 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
444 if err := cw.file.Close(); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
445 log.Printf("Closing CSV file failed: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
446 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
447 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
448
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
449 func (sw *sqlWriter) prepare(ctx context.Context, conn *sql.Conn) error {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
450
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
451 tx, err := conn.BeginTx(ctx, nil)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
452 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
453 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
454 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
455
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
456 if _, err := tx.ExecContext(ctx, createFilteredLogsSQL); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
457 tx.Rollback()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
458 return fmt.Errorf("cannot create new log table: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
459 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
460
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
461 stmt, err := tx.PrepareContext(ctx, insertFilteredLogsSQL)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
462 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
463 tx.Rollback()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
464 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
465 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
466
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
467 sw.ctx = ctx
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
468 sw.tx = tx
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
469 sw.stmt = stmt
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
470 return nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
471 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
472
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
473 func (sw *sqlWriter) error() error { return sw.err }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
474
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
475 func (sw *sqlWriter) write(entry *importLines) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
476 if sw.err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
477 return
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
478 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
479 for _, l := range entry.lines {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
480 if _, sw.err = sw.stmt.ExecContext(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
481 sw.ctx,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
482 entry.id,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
483 l.time,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
484 l.kind,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
485 l.msg,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
486 ); sw.err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
487 log.Printf("error: writing log line to db failed: %v\n", sw.err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
488 return
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
489 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
490 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
491 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
492
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
493 func (sw *sqlWriter) finish() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
494 if err := sw.stmt.Close(); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
495 log.Printf("error: close stmt failed: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
496 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
497 if sw.err == nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
498 if err := sw.tx.Commit(); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
499 log.Printf("error: Commiting transaction failed: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
500 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
501 } else if err := sw.tx.Rollback(); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
502 log.Printf("error: Rollback transaction failed: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
503 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
504 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
505
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
506 func (pr *processor) Push(x interface{}) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
507 pr.aggregated = append(pr.aggregated, x.(*importLines))
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
508 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
509
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
510 func (pr *processor) Pop() interface{} {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
511 n := len(pr.aggregated)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
512 x := pr.aggregated[n-1]
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
513 pr.aggregated[n-1] = nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
514 pr.aggregated = pr.aggregated[:n-1]
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
515 return x
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
516 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
517
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
518 func (pr *processor) Len() int { return len(pr.aggregated) }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
519
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
520 func (pr *processor) Less(i, j int) bool {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
521 return pr.aggregated[i].seq < pr.aggregated[j].seq
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
522 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
523
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
524 func (pr *processor) Swap(i, j int) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
525 pr.aggregated[i], pr.aggregated[j] = pr.aggregated[j], pr.aggregated[i]
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
526 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
527
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
528 func (pr *processor) consume(l *importLines) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
529 pr.cond.L.Lock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
530 heap.Push(pr, l)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
531 pr.cond.L.Unlock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
532 pr.cond.Signal()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
533 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
534
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
535 func (pr *processor) quit() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
536 pr.cond.L.Lock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
537 pr.done = true
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
538 pr.cond.L.Unlock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
539 pr.cond.Signal()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
540 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
541
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
542 func (pr *processor) drain(write func(*importLines)) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
543
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
544 for {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
545 pr.cond.L.Lock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
546 for !pr.done &&
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
547 (len(pr.aggregated) == 0 || pr.aggregated[0].seq != pr.nextOutSeq) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
548 pr.cond.Wait()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
549 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
550 if pr.done {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
551 for len(pr.aggregated) > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
552 write(heap.Pop(pr).(*importLines))
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
553 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
554 pr.cond.L.Unlock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
555 return
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
556 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
557 l := heap.Pop(pr).(*importLines)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
558 //log.Printf("%d %p\n", c.nextOutSeq, l)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
559 pr.nextOutSeq++
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
560 pr.cond.L.Unlock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
561 write(l)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
562 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
563 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
564
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
565 func (pr *processor) filterPhase(db *sql.DB, worker int, wr writer) error {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
566
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
567 log.Println("filter phase started")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
568
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
569 ctx := context.Background()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
570
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
571 con1, err := db.Conn(ctx)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
572 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
573 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
574 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
575 defer con1.Close()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
576
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
577 con2, err := db.Conn(ctx)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
578 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
579 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
580 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
581 defer con2.Close()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
582
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
583 tx, err := con1.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
584 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
585 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
586 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
587 defer tx.Rollback()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
588
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
589 if err := wr.prepare(ctx, con2); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
590 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
591 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
592 defer wr.finish()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
593
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
594 logs := make(chan *importLines)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
595 var wg sync.WaitGroup
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
596
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
597 for i := 0; i < worker; i++ {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
598 wg.Add(1)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
599 go new(aggregator).run(&wg, logs, pr)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
600 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
601
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
602 writeDone := make(chan struct{})
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
603
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
604 go func() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
605 defer close(writeDone)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
606 pr.drain(wr.write)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
607 }()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
608
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
609 log.Println("Querying for old logs started. (Can take a while.)")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
610 rows, err := tx.QueryContext(ctx, selectOldGMLogsSQL)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
611 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
612 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
613 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
614 defer rows.Close()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
615
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
616 log.Println("Querying done. (Maybe restart the gemma server, now?)")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
617
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
618 var (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
619 count int64
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
620 current *importLines
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
621 seq int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
622 l line
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
623 importID int64
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
624 start = time.Now()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
625 last = start
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
626 )
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
627
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
628 log.Println("Filtering started.")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
629 for rows.Next() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
630 if err := rows.Scan(&importID, &l.time, &l.kind, &l.msg); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
631 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
632 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
633
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
634 if current == nil || importID != current.id {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
635 if current != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
636 logs <- current
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
637 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
638 current = &importLines{
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
639 seq: seq,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
640 id: importID,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
641 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
642 seq++
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
643 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
644 current.lines = append(current.lines, l)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
645
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
646 if count++; count%1_000_000 == 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
647 now := time.Now()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
648 diff := now.Sub(last)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
649 log.Printf("lines: %d rate: %.2f lines/s\n",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
650 count,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
651 1_000_000/diff.Seconds())
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
652 last = now
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
653 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
654 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
655 if current != nil && len(current.lines) > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
656 logs <- current
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
657 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
658 close(logs)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
659 wg.Wait()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
660
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
661 pr.quit()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
662
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
663 <-writeDone
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
664
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
665 rate := float64(count) / time.Since(start).Seconds()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
666 log.Printf("lines: %d rate: %.2f lines/s imports: %d\n",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
667 count, rate, seq)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
668 return nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
669 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
670
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
671 func (pr *processor) transferPhase(db *sql.DB) error {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
672 log.Println("Transfer phase started.")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
673 ctx := context.Background()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
674 conn, err := db.Conn(ctx)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
675 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
676 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
677 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
678 defer conn.Close()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
679 tx, err := conn.BeginTx(ctx, nil)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
680 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
681 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
682 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
683 defer tx.Rollback()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
684
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
685 for _, sql := range []string{
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
686 deleteOldGMLogsSQL,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
687 copyDataSQL,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
688 dropFilteredLogsSQL,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
689 } {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
690 if _, err := tx.ExecContext(ctx, sql); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
691 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
692 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
693 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
694 return tx.Commit()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
695 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
696
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
697 func newProcessor() *processor {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
698 return &processor{
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
699 cond: sync.NewCond(new(sync.Mutex)),
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
700 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
701 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
702
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
703 func process(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
704 host, dbname string, port int,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
705 worker int,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
706 csvFile string,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
707 ps phases,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
708 ) error {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
709
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
710 p := newProcessor()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
711 var wr writer
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
712
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
713 if csvFile != "" {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
714 var err error
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
715 if wr, err = newCSVWriter(csvFile); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
716 return fmt.Errorf("error: Cannot create CSV file: %v", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
717 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
718 } else {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
719 wr = new(sqlWriter)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
720 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
721
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
722 dsn := fmt.Sprintf("host=%s dbname=%s port=%d", host, dbname, port)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
723 db, err := sql.Open("pgx", dsn)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
724 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
725 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
726 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
727 defer db.Close()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
728
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
729 if ps.has(filterPhase) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
730 if err := p.filterPhase(db, worker, wr); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
731 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
732 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
733 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
734 if ps.has(transferPhase) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
735 if err := p.transferPhase(db); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
736 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
737 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
738 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
739
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
740 return nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
741 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
742
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
743 func main() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
744 var (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
745 host = flag.String("h", "/var/run/postgresql", "database host")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
746 dbname = flag.String("d", "gemma", "database")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
747 port = flag.Int("p", 5432, "database port")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
748 worker = flag.Int("w", runtime.NumCPU(), "workers to aggregate")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
749 csv = flag.String("c", "", "CSV file to be written")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
750 phases = flag.String("phases", "filter,transfer", "Phases filter and/or transfer")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
751 )
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
752
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
753 flag.Parse()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
754
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
755 ps, err := parsePhases(*phases)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
756 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
757 log.Fatalf("error: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
758 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
759
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
760 start := time.Now()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
761 if err := process(*host, *dbname, *port, *worker, *csv, ps); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
762 log.Fatalf("error: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
763 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
764 log.Printf("time took: %s\n", time.Since(start))
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
765 }