annotate contrib/gmaggregate/main.go @ 5584:7ed9e32706d0 surveysperbottleneckid

Merged delault
author Sascha Wilde <wilde@sha-bang.de>
date Fri, 01 Apr 2022 16:47:53 +0200
parents e9ef27c75e5c
children 2dd155cc95ec
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
5548
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
1 // This is Free Software under GNU Affero General Public License v >= 3.0
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
2 // without warranty, see README.md and license for details.
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
3 //
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
4 // SPDX-License-Identifier: AGPL-3.0-or-later
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
5 // License-Filename: LICENSE
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
6 //
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
7 // Copyright (C) 2021 by via donau
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
8 // - Österreichische Wasserstraßen-Gesellschaft mbH
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
9 // Software engineering by Intevation GmbH
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
10 //
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
11 // Author(s):
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de>
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
13
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
14 //go:generate ragel -Z -G2 -o matcher.go matcher.rl
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
15 //go:generate go fmt matcher.go
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
16
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
17 package main
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
18
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
19 import (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
20 "container/heap"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
21 "context"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
22 "database/sql"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
23 "encoding/csv"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
24 "flag"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
25 "fmt"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
26 "log"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
27 "os"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
28 "runtime"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
29 "sort"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
30 "strconv"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
31 "strings"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
32 "sync"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
33 "time"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
34
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
35 _ "github.com/jackc/pgx/v4/stdlib"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
36 )
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
37
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
38 const (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
39 selectOldGMLogsSQL = `
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
40 SELECT
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
41 lo.import_id,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
42 lo.time,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
43 lo.kind,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
44 lo.msg
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
45 FROM import.imports im
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
46 JOIN import.import_logs lo
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
47 ON lo.import_id = im.id
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
48 WHERE im.kind = 'gm'
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
49 ORDER BY lo.import_id`
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
50
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
51 createFilteredLogsSQL = `
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
52 CREATE TABLE filtered_logs (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
53 import_id integer NOT NULL,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
54 time timestamp with time zone NOT NULL,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
55 kind log_type NOT NULL,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
56 msg text NOT NULL
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
57 )`
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
58
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
59 insertFilteredLogsSQL = `
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
60 INSERT INTO filtered_logs (import_id, time, kind, msg)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
61 VALUES ($1, $2, $3::log_type, $4)`
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
62
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
63 deleteOldGMLogsSQL = `
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
64 DELETE FROM import.import_logs WHERE import_id IN (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
65 SELECT import_id FROM filtered_logs)`
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
66
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
67 copyDataSQL = `
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
68 INSERT INTO import.import_logs (import_id, time, kind, msg)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
69 SELECT import_id, time, kind, msg
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
70 FROM filtered_logs`
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
71
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
72 dropFilteredLogsSQL = `DROP TABLE filtered_logs`
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
73 )
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
74
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
75 type phases int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
76
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
77 const (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
78 nonePhase phases = 0
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
79 filterPhase phases = 1 << iota
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
80 transferPhase
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
81 )
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
82
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
83 type gauge struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
84 gid string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
85 unknown bool
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
86 assumeZPG bool
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
87 ignMeasCodes []string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
88 rescaleErrors []string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
89 missingValues []string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
90 assumeCM int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
91 badValues int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
92 measurements int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
93 predictions int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
94 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
95
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
96 type aggregator struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
97 current string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
98 hold *line
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
99
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
100 lastGauge *gauge
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
101 gauges []*gauge
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
102
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
103 stack [4]string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
104 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
105
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
106 type line struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
107 time time.Time
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
108 kind string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
109 msg string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
110 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
111
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
112 type importLines struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
113 seq int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
114 id int64
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
115 lines []line
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
116 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
117
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
118 type processor struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
119 cond *sync.Cond
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
120 aggregated []*importLines
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
121 nextOutSeq int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
122 done bool
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
123 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
124
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
125 type writer interface {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
126 prepare(context.Context, *sql.Conn) error
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
127 write(*importLines)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
128 finish()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
129 error() error
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
130 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
131
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
132 type csvWriter struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
133 err error
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
134 file *os.File
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
135 out *csv.Writer
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
136 row [1 + 1 + 1 + 1]string
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
137 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
138
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
139 type sqlWriter struct {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
140 err error
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
141 ctx context.Context
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
142 tx *sql.Tx
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
143 stmt *sql.Stmt
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
144 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
145
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
146 func (ps phases) has(p phases) bool {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
147 return ps&p == p
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
148 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
149
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
150 func parsePhases(s string) (phases, error) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
151 ps := nonePhase
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
152 for _, x := range strings.Split(s, ",") {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
153 switch strings.ToLower(strings.TrimSpace(x)) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
154 case "transfer":
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
155 ps |= transferPhase
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
156 case "filter":
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
157 ps |= filterPhase
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
158 default:
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
159 return nonePhase, fmt.Errorf("invalid phase '%s'", x)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
160 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
161 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
162 return ps, nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
163 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
164
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
165 func (g *gauge) getAssumeZPG() bool { return g.assumeZPG }
5551
e9ef27c75e5c Fixed minor typo.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5548
diff changeset
166 func (g *gauge) getUnknown() bool { return g.unknown }
5548
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
167 func (g *gauge) getIgnoredMeasureCodes() []string { return g.ignMeasCodes }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
168 func (g *gauge) getRescaleErrors() []string { return g.rescaleErrors }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
169 func (g *gauge) getMissingValues() []string { return g.missingValues }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
170 func (g *gauge) getAssumeCM() int { return g.assumeCM }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
171 func (g *gauge) getBadValues() int { return g.badValues }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
172 func (g *gauge) getPredictions() int { return g.predictions }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
173 func (g *gauge) getMeasurements() int { return g.measurements }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
174 func (g *gauge) nothingChanged() bool { return g.measurements == 0 && g.predictions == 0 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
175
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
176 func (agg *aggregator) reset() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
177 agg.current = ""
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
178 agg.hold = nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
179 agg.lastGauge = nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
180 agg.gauges = nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
181 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
182
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
183 func (agg *aggregator) find(name string) *gauge {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
184 if agg.lastGauge != nil && name == agg.lastGauge.gid {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
185 return agg.lastGauge
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
186 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
187 for _, g := range agg.gauges {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
188 if g.gid == name {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
189 agg.lastGauge = g
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
190 return g
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
191 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
192 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
193 g := &gauge{gid: name}
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
194 agg.gauges = append(agg.gauges, g)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
195 agg.lastGauge = g
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
196 return g
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
197 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
198
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
199 func extend(haystack []string, needle string) []string {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
200 for _, straw := range haystack {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
201 if straw == needle {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
202 return haystack
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
203 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
204 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
205 return append(haystack, needle)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
206 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
207
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
208 func (agg *aggregator) logBool(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
209 access func(*gauge) bool,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
210 header string,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
211 log func(string),
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
212 ) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
213 var sb strings.Builder
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
214 for _, g := range agg.gauges {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
215 if access(g) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
216 if sb.Len() == 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
217 sb.WriteString(header)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
218 } else {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
219 sb.WriteString(", ")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
220 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
221 sb.WriteString(g.gid)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
222 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
223 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
224 if sb.Len() > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
225 log(sb.String())
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
226 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
227 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
228
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
229 func (agg *aggregator) logInt(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
230 access func(*gauge) int,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
231 header string,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
232 log func(string),
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
233 ) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
234 gs := make([]*gauge, 0, len(agg.gauges))
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
235 for _, g := range agg.gauges {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
236 if access(g) > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
237 gs = append(gs, g)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
238 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
239 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
240
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
241 if len(gs) == 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
242 return
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
243 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
244
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
245 sort.SliceStable(gs, func(i, j int) bool {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
246 return access(gs[i]) < access(gs[j])
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
247 })
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
248
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
249 var sb strings.Builder
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
250 var last int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
251
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
252 for _, g := range gs {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
253 if c := access(g); c != last {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
254 if sb.Len() == 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
255 sb.WriteString(header)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
256 } else {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
257 sb.WriteString("); ")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
258 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
259 sb.WriteString(strconv.Itoa(c))
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
260 sb.WriteString(" (")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
261 last = c
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
262 } else {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
263 sb.WriteString(", ")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
264 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
265 sb.WriteString(g.gid)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
266 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
267
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
268 sb.WriteByte(')')
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
269 log(sb.String())
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
270 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
271
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
272 func (agg *aggregator) logString(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
273 access func(*gauge) []string,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
274 header string,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
275 log func(string),
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
276 ) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
277 var sb strings.Builder
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
278 for _, g := range agg.gauges {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
279 if s := access(g); len(s) > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
280 if sb.Len() == 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
281 sb.WriteString(header)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
282 } else {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
283 sb.WriteString(", ")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
284 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
285 sb.WriteString(g.gid)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
286 sb.WriteString(" (")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
287 for i, v := range s {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
288 if i > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
289 sb.WriteString("; ")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
290 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
291 sb.WriteString(v)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
292 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
293 sb.WriteByte(')')
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
294 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
295 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
296 if sb.Len() > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
297 log(sb.String())
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
298 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
299 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
300
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
301 func (agg *aggregator) aggregate(out []line, last time.Time) []line {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
302
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
303 // Guarantee that new lines has a time after already put out lines.
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
304 if n := len(out); n > 0 && !out[n-1].time.Before(last) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
305 last = out[n-1].time.Add(time.Millisecond)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
306 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
307
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
308 log := func(kind, msg string) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
309 out = append(out, line{last, kind, msg})
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
310 last = last.Add(time.Millisecond)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
311 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
312
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
313 infoLog := func(msg string) { log("info", msg) }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
314 warnLog := func(msg string) { log("warn", msg) }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
315 errLog := func(msg string) { log("error", msg) }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
316
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
317 agg.logBool(
5551
e9ef27c75e5c Fixed minor typo.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents: 5548
diff changeset
318 (*gauge).getUnknown,
5548
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
319 "Cannot find following gauges: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
320 warnLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
321
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
322 agg.logBool(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
323 (*gauge).getAssumeZPG,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
324 "'Reference_code' not specified. Assuming 'ZPG': ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
325 warnLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
326
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
327 agg.logInt(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
328 (*gauge).getAssumeCM,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
329 "'Unit' not specified. Assuming 'cm': ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
330 warnLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
331
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
332 agg.logInt(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
333 (*gauge).getBadValues,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
334 "Ignored measurements with value -99999: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
335 warnLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
336
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
337 agg.logString(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
338 (*gauge).getMissingValues,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
339 "Missing mandatory values: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
340 warnLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
341
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
342 agg.logString(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
343 (*gauge).getRescaleErrors,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
344 "Cannot convert units: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
345 errLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
346
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
347 agg.logString(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
348 (*gauge).getRescaleErrors,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
349 "Ignored measure codes: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
350 warnLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
351
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
352 agg.logInt(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
353 (*gauge).getPredictions,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
354 "New predictions: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
355 infoLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
356
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
357 agg.logInt(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
358 (*gauge).getMeasurements,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
359 "New measurements: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
360 infoLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
361
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
362 agg.logBool(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
363 (*gauge).nothingChanged,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
364 "No changes for: ",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
365 infoLog)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
366
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
367 if agg.hold != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
368 agg.hold.time = last
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
369 out = append(out, *agg.hold)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
370 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
371 return out
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
372 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
373
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
374 func (agg *aggregator) run(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
375 wg *sync.WaitGroup,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
376 logs <-chan *importLines,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
377 pr *processor,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
378 ) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
379 defer wg.Done()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
380 for l := range logs {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
381 // Do sorting by time in user land to take advantage
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
382 // of concurrent workers.
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
383 lines := l.lines
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
384 sort.Slice(lines, func(i, j int) bool {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
385 return lines[i].time.Before(lines[j].time)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
386 })
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
387
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
388 out := lines[:0:len(lines)]
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
389 for i := range lines {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
390 line := &lines[i]
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
391 if !agg.match(line.msg, line) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
392 out = append(out, *line)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
393 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
394 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
395 l.lines = agg.aggregate(out, lines[len(lines)-1].time)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
396 pr.consume(l)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
397 agg.reset()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
398 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
399 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
400
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
401 const timeFormat = "2006-01-02 15:04:05.999999-07"
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
402
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
403 func newCSVWriter(filename string) (*csvWriter, error) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
404
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
405 f, err := os.Create(filename)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
406 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
407 return nil, err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
408 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
409
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
410 return &csvWriter{
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
411 file: f,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
412 out: csv.NewWriter(f),
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
413 }, nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
414 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
415
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
416 func (cw *csvWriter) prepare(context.Context, *sql.Conn) error {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
417 return nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
418 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
419
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
420 func (cw *csvWriter) error() error { return cw.err }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
421
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
422 func (cw *csvWriter) write(entry *importLines) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
423 if cw.err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
424 return
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
425 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
426 row := cw.row[:]
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
427 row[0] = strconv.FormatInt(entry.id, 10)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
428 for _, l := range entry.lines {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
429 row[1] = l.time.Format(timeFormat)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
430 row[2] = l.kind
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
431 row[3] = l.msg
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
432 if cw.err = cw.out.Write(row); cw.err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
433 log.Printf("error: Writing to CSV file failed: %v\n", cw.err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
434 return
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
435 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
436 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
437 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
438
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
439 func (cw *csvWriter) finish() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
440 cw.out.Flush()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
441 if err := cw.out.Error(); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
442 log.Printf("error: flushing CSV file failed: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
443 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
444 if err := cw.file.Close(); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
445 log.Printf("Closing CSV file failed: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
446 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
447 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
448
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
449 func (sw *sqlWriter) prepare(ctx context.Context, conn *sql.Conn) error {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
450
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
451 tx, err := conn.BeginTx(ctx, nil)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
452 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
453 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
454 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
455
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
456 if _, err := tx.ExecContext(ctx, createFilteredLogsSQL); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
457 tx.Rollback()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
458 return fmt.Errorf("cannot create new log table: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
459 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
460
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
461 stmt, err := tx.PrepareContext(ctx, insertFilteredLogsSQL)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
462 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
463 tx.Rollback()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
464 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
465 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
466
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
467 sw.ctx = ctx
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
468 sw.tx = tx
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
469 sw.stmt = stmt
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
470 return nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
471 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
472
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
473 func (sw *sqlWriter) error() error { return sw.err }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
474
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
475 func (sw *sqlWriter) write(entry *importLines) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
476 if sw.err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
477 return
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
478 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
479 for _, l := range entry.lines {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
480 if _, sw.err = sw.stmt.ExecContext(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
481 sw.ctx,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
482 entry.id,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
483 l.time,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
484 l.kind,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
485 l.msg,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
486 ); sw.err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
487 log.Printf("error: writing log line to db failed: %v\n", sw.err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
488 return
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
489 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
490 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
491 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
492
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
493 func (sw *sqlWriter) finish() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
494 if err := sw.stmt.Close(); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
495 log.Printf("error: close stmt failed: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
496 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
497 if sw.err == nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
498 if err := sw.tx.Commit(); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
499 log.Printf("error: Commiting transaction failed: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
500 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
501 } else if err := sw.tx.Rollback(); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
502 log.Printf("error: Rollback transaction failed: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
503 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
504 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
505
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
506 func (pr *processor) Push(x interface{}) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
507 pr.aggregated = append(pr.aggregated, x.(*importLines))
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
508 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
509
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
510 func (pr *processor) Pop() interface{} {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
511 n := len(pr.aggregated)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
512 x := pr.aggregated[n-1]
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
513 pr.aggregated[n-1] = nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
514 pr.aggregated = pr.aggregated[:n-1]
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
515 return x
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
516 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
517
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
518 func (pr *processor) Len() int { return len(pr.aggregated) }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
519
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
520 func (pr *processor) Less(i, j int) bool {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
521 return pr.aggregated[i].seq < pr.aggregated[j].seq
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
522 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
523
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
524 func (pr *processor) Swap(i, j int) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
525 pr.aggregated[i], pr.aggregated[j] = pr.aggregated[j], pr.aggregated[i]
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
526 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
527
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
528 func (pr *processor) consume(l *importLines) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
529 pr.cond.L.Lock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
530 heap.Push(pr, l)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
531 pr.cond.L.Unlock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
532 pr.cond.Signal()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
533 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
534
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
535 func (pr *processor) quit() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
536 pr.cond.L.Lock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
537 pr.done = true
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
538 pr.cond.L.Unlock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
539 pr.cond.Signal()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
540 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
541
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
542 func (pr *processor) drain(write func(*importLines)) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
543
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
544 for {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
545 pr.cond.L.Lock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
546 for !pr.done &&
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
547 (len(pr.aggregated) == 0 || pr.aggregated[0].seq != pr.nextOutSeq) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
548 pr.cond.Wait()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
549 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
550 if pr.done {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
551 for len(pr.aggregated) > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
552 write(heap.Pop(pr).(*importLines))
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
553 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
554 pr.cond.L.Unlock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
555 return
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
556 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
557 l := heap.Pop(pr).(*importLines)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
558 //log.Printf("%d %p\n", c.nextOutSeq, l)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
559 pr.nextOutSeq++
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
560 pr.cond.L.Unlock()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
561 write(l)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
562 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
563 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
564
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
565 func (pr *processor) filterPhase(db *sql.DB, worker int, wr writer) error {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
566
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
567 log.Println("filter phase started")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
568
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
569 ctx := context.Background()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
570
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
571 con1, err := db.Conn(ctx)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
572 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
573 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
574 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
575 defer con1.Close()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
576
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
577 con2, err := db.Conn(ctx)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
578 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
579 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
580 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
581 defer con2.Close()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
582
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
583 tx, err := con1.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
584 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
585 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
586 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
587 defer tx.Rollback()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
588
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
589 if err := wr.prepare(ctx, con2); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
590 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
591 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
592 defer wr.finish()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
593
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
594 logs := make(chan *importLines)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
595 var wg sync.WaitGroup
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
596
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
597 for i := 0; i < worker; i++ {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
598 wg.Add(1)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
599 go new(aggregator).run(&wg, logs, pr)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
600 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
601
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
602 writeDone := make(chan struct{})
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
603
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
604 go func() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
605 defer close(writeDone)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
606 pr.drain(wr.write)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
607 }()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
608
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
609 log.Println("Querying for old logs started. (Can take a while.)")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
610 rows, err := tx.QueryContext(ctx, selectOldGMLogsSQL)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
611 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
612 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
613 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
614 defer rows.Close()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
615
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
616 log.Println("Querying done. (Maybe restart the gemma server, now?)")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
617
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
618 var (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
619 count int64
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
620 current *importLines
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
621 seq int
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
622 l line
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
623 importID int64
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
624 start = time.Now()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
625 last = start
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
626 )
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
627
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
628 log.Println("Filtering started.")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
629 for rows.Next() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
630 if err := rows.Scan(&importID, &l.time, &l.kind, &l.msg); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
631 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
632 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
633
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
634 if current == nil || importID != current.id {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
635 if current != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
636 logs <- current
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
637 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
638 current = &importLines{
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
639 seq: seq,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
640 id: importID,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
641 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
642 seq++
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
643 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
644 current.lines = append(current.lines, l)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
645
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
646 if count++; count%1_000_000 == 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
647 now := time.Now()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
648 diff := now.Sub(last)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
649 log.Printf("lines: %d rate: %.2f lines/s\n",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
650 count,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
651 1_000_000/diff.Seconds())
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
652 last = now
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
653 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
654 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
655 if current != nil && len(current.lines) > 0 {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
656 logs <- current
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
657 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
658 close(logs)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
659 wg.Wait()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
660
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
661 pr.quit()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
662
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
663 <-writeDone
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
664
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
665 rate := float64(count) / time.Since(start).Seconds()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
666 log.Printf("lines: %d rate: %.2f lines/s imports: %d\n",
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
667 count, rate, seq)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
668 return nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
669 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
670
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
671 func (pr *processor) transferPhase(db *sql.DB) error {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
672 log.Println("Transfer phase started.")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
673 ctx := context.Background()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
674 conn, err := db.Conn(ctx)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
675 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
676 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
677 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
678 defer conn.Close()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
679 tx, err := conn.BeginTx(ctx, nil)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
680 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
681 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
682 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
683 defer tx.Rollback()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
684
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
685 for _, sql := range []string{
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
686 deleteOldGMLogsSQL,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
687 copyDataSQL,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
688 dropFilteredLogsSQL,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
689 } {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
690 if _, err := tx.ExecContext(ctx, sql); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
691 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
692 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
693 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
694 return tx.Commit()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
695 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
696
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
697 func newProcessor() *processor {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
698 return &processor{
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
699 cond: sync.NewCond(new(sync.Mutex)),
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
700 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
701 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
702
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
703 func process(
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
704 host, dbname string, port int,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
705 worker int,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
706 csvFile string,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
707 ps phases,
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
708 ) error {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
709
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
710 p := newProcessor()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
711 var wr writer
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
712
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
713 if csvFile != "" {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
714 var err error
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
715 if wr, err = newCSVWriter(csvFile); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
716 return fmt.Errorf("error: Cannot create CSV file: %v", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
717 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
718 } else {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
719 wr = new(sqlWriter)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
720 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
721
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
722 dsn := fmt.Sprintf("host=%s dbname=%s port=%d", host, dbname, port)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
723 db, err := sql.Open("pgx", dsn)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
724 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
725 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
726 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
727 defer db.Close()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
728
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
729 if ps.has(filterPhase) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
730 if err := p.filterPhase(db, worker, wr); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
731 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
732 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
733 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
734 if ps.has(transferPhase) {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
735 if err := p.transferPhase(db); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
736 return err
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
737 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
738 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
739
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
740 return nil
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
741 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
742
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
743 func main() {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
744 var (
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
745 host = flag.String("h", "/var/run/postgresql", "database host")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
746 dbname = flag.String("d", "gemma", "database")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
747 port = flag.Int("p", 5432, "database port")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
748 worker = flag.Int("w", runtime.NumCPU(), "workers to aggregate")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
749 csv = flag.String("c", "", "CSV file to be written")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
750 phases = flag.String("phases", "filter,transfer", "Phases filter and/or transfer")
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
751 )
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
752
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
753 flag.Parse()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
754
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
755 ps, err := parsePhases(*phases)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
756 if err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
757 log.Fatalf("error: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
758 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
759
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
760 start := time.Now()
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
761 if err := process(*host, *dbname, *port, *worker, *csv, ps); err != nil {
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
762 log.Fatalf("error: %v\n", err)
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
763 }
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
764 log.Printf("time took: %s\n", time.Since(start))
02c2d0edeb2a Added gmaggregate tool as contrib.
Sascha L. Teichmann <sascha.teichmann@intevation.de>
parents:
diff changeset
765 }