view pkg/imports/agm.go @ 1743:85d0f017fbee

Approved gauges measurements: Open CSV and read headers.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Jan 2019 21:58:35 +0100
parents 44398a8bdf94
children 807569b08513
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"log"
	"os"
	"path/filepath"

	"gemma.intevation.de/gemma/pkg/common"
)

type ApprovedGaugeMeasurements struct {
	Dir string `json:"dir"`
}

// GMAPJobKind is the unique name of an approved gauge measurements import job.
const AGMJobKind JobKind = "agm"

type agmJobCreator struct{}

func init() {
	RegisterJobCreator(AGMJobKind, agmJobCreator{})
}

func (agmJobCreator) Description() string {
	return "approved gauge measurements"
}

func (agmJobCreator) Create(_ JobKind, data string) (Job, error) {
	agm := new(ApprovedGaugeMeasurements)
	if err := common.FromJSONString(data, agm); err != nil {
		return nil, err
	}
	return agm, nil
}

func (agmJobCreator) Depends() []string {
	return []string{
		"gauges",
		"gauge_measurements",
	}
}

const (
	// TODO: re-add staging_done field in table and fix RLS policy
	// issue for raw import.
	agmStageDoneSQL = `
UPDATE waterway.gauge_measurements SET staging_done = true
WHERE id = (
  SELECT key from waterway.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.gauge_measurements'::regclass)`
)

func (agmJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, agmStageDoneSQL, id)
	return err
}

// CleanUp removes the folder containing the CSV file with the
// the approved gauge measurements.
func (agm *ApprovedGaugeMeasurements) CleanUp() error {
	return os.RemoveAll(agm.Dir)
}

// Do executes the actual approved gauge measurements import.
func (agm *ApprovedGaugeMeasurements) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	f, err := os.Open(filepath.Join(agm.Dir, "agm.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	for i, f := range headers {
		log.Printf("%d: %s\n", i, f)
	}

	return nil, nil
}