changeset 1995:59055c8301df

Move import queue to its own database namespace Authorisation of the import queue has to be handled differently from the waterway-related data in the waterway schema. This is easier to handle, if both are in their own schema/namespace.
author Tom Gottfried <tom@intevation.de>
date Thu, 24 Jan 2019 12:56:31 +0100
parents a7c4005b723f
children 352493221fa5 1a4b218f5e19
files pkg/controllers/importconfig.go pkg/controllers/importqueue.go pkg/imports/agm.go pkg/imports/bn.go pkg/imports/config.go pkg/imports/fd.go pkg/imports/queue.go pkg/imports/sr.go pkg/imports/st.go pkg/imports/track.go pkg/scheduler/boot.go schema/auth.sql schema/auth_tests.sql schema/gemma.sql schema/tap_tests_data.sql
diffstat 15 files changed, 168 insertions(+), 151 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/importconfig.go	Thu Jan 24 12:50:17 2019 +0100
+++ b/pkg/controllers/importconfig.go	Thu Jan 24 12:56:31 2019 +0100
@@ -39,7 +39,7 @@
   send_email,
   cron,
   url
-FROM waterway.import_configuration`
+FROM import.import_configuration`
 
 	selectImportConfigurationSQL = selectImportConfigurationPrefix + `
 ORDER by id`
@@ -48,30 +48,30 @@
 WHERE id = $1`
 
 	insertImportConfigurationSQL = `
-INSERT INTO waterway.import_configuration
+INSERT INTO import.import_configuration
 (username, kind, cron, send_email, url)
 VALUES ($1, $2, $3, $4, $5)
 RETURNING id`
 
 	insertImportConfigurationAttributeSQL = `
-INSERT INTO waterway.import_configuration_attributes
+INSERT INTO import.import_configuration_attributes
 (import_configuration_id, k, v)
 VALUES ($1, $2, $3)`
 
 	hasImportConfigurationSQL = `
-SELECT true FROM waterway.import_configuration
+SELECT true FROM import.import_configuration
 WHERE id = $1`
 
 	deleteImportConfiguationAttributesSQL = `
-DELETE FROM waterway.import_configuration_attributes
+DELETE FROM import.import_configuration_attributes
 WHERE import_configuration_id = $1`
 
 	deleteImportConfiguationSQL = `
-DELETE FROM waterway.import_configuration
+DELETE FROM import.import_configuration
 WHERE id = $1`
 
 	updateImportConfigurationSQL = `
-UPDATE waterway.import_configuration SET
+UPDATE import.import_configuration SET
   username = $2,
   kind = $3,
   cron = $4,
--- a/pkg/controllers/importqueue.go	Thu Jan 24 12:50:17 2019 +0100
+++ b/pkg/controllers/importqueue.go	Thu Jan 24 12:56:31 2019 +0100
@@ -40,30 +40,30 @@
   username,
   signer,
   summary
-FROM waterway.imports
+FROM import.imports
 `
 
 	selectHasImportSQL = `
-SELECT true FROM Waterway.imports WHERE id = $1`
+SELECT true FROM import.imports WHERE id = $1`
 
 	selectHasNoRunningImportSQL = `
-SELECT true FROM waterway.imports
-WHERE id = $1 AND state <> 'running'::waterway.import_state`
+SELECT true FROM import.imports
+WHERE id = $1 AND state <> 'running'::import_state`
 
 	selectImportLogsSQL = `
 SELECT
   time,
   kind::varchar,
   msg
-FROM waterway.import_logs
+FROM import.import_logs
 WHERE import_id = $1
 ORDER BY time`
 
 	deleteLogsSQL = `
-DELETE FROM waterway.import_logs WHERE import_id = $1`
+DELETE FROM import.import_logs WHERE import_id = $1`
 
 	deleteImportSQL = `
-DELETE FROM waterway.imports WHERE id = $1`
+DELETE FROM import.imports WHERE id = $1`
 )
 
 func toInt8Array(txt string) *pgtype.Int8Array {
@@ -347,23 +347,23 @@
 
 const (
 	isPendingSQL = `
-SELECT state = 'pending'::waterway.import_state, kind
-FROM waterway.imports
+SELECT state = 'pending'::import_state, kind
+FROM import.imports
 WHERE id = $1`
 
 	reviewSQL = `
-UPDATE waterway.imports SET
-  state = $1::waterway.import_state,
+UPDATE import.imports SET
+  state = $1::import_state,
   signer = $2
 WHERE id = $3`
 
-	deleteImportDataSQL = `SELECT waterway.del_import($1)`
+	deleteImportDataSQL = `SELECT import.del_import($1)`
 
 	deleteImportTrackSQL = `
-DELETE FROM waterway.track_imports WHERE import_id = $1`
+DELETE FROM import.track_imports WHERE import_id = $1`
 
 	logDecisionSQL = `
-INSERT INTO waterway.import_logs (import_id, msg) VALUES ($1, $2)`
+INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
 )
 
 func reviewImports(
--- a/pkg/imports/agm.go	Thu Jan 24 12:50:17 2019 +0100
+++ b/pkg/imports/agm.go	Thu Jan 24 12:56:31 2019 +0100
@@ -70,7 +70,7 @@
 	agmStageDoneDeleteSQL = `
 WITH staged AS (
   SELECT key
-  FROM waterway.track_imports
+  FROM import.track_imports
   WHERE import_id = $1 AND
         relation = 'waterway.gauge_measurements'::regclass
 ),
@@ -87,7 +87,7 @@
 	agmStageDoneSQL = `
 UPDATE waterway.gauge_measurements SET staging_done = true
 WHERE id IN (
-  SELECT key FROM waterway.track_imports
+  SELECT key FROM import.track_imports
   WHERE import_id = $1 AND
     relation = 'waterway.gauge_measurements'::regclass)`
 )
--- a/pkg/imports/bn.go	Thu Jan 24 12:50:17 2019 +0100
+++ b/pkg/imports/bn.go	Thu Jan 24 12:56:31 2019 +0100
@@ -107,7 +107,7 @@
 	bnStageDoneSQL = `
 UPDATE waterway.bottlenecks SET staging_done = true
 WHERE id IN (
-  SELECT key from waterway.track_imports
+  SELECT key from import.track_imports
   WHERE import_id = $1 AND
         relation = 'waterway.bottlenecks'::regclass)`
 )
--- a/pkg/imports/config.go	Thu Jan 24 12:50:17 2019 +0100
+++ b/pkg/imports/config.go	Thu Jan 24 12:56:31 2019 +0100
@@ -106,12 +106,12 @@
   send_email,
   cron,
   url
-FROM waterway.import_configuration
+FROM import.import_configuration
 WHERE id = $1`
 
 	loadConfigAttributesSQL = `
 SELECT k, v
-FROM waterway.import_configuration_attributes
+FROM import.import_configuration_attributes
 WHERE import_configuration_id = $1`
 )
 
--- a/pkg/imports/fd.go	Thu Jan 24 12:50:17 2019 +0100
+++ b/pkg/imports/fd.go	Thu Jan 24 12:56:31 2019 +0100
@@ -121,7 +121,7 @@
 	fdStageDoneSQL = `
 UPDATE waterway.fairway_dimensions SET staging_done = true
 WHERE id IN (
-  SELECT key from waterway.track_imports
+  SELECT key from import.track_imports
   WHERE import_id = $1 AND
 		relation = 'waterway.fairway_dimensions'::regclass)`
 
--- a/pkg/imports/queue.go	Thu Jan 24 12:50:17 2019 +0100
+++ b/pkg/imports/queue.go	Thu Jan 24 12:56:31 2019 +0100
@@ -134,11 +134,11 @@
 	queueUser = "sys_admin"
 
 	reEnqueueRunningSQL = `
-UPDATE waterway.imports SET state = 'queued'::waterway.import_state
-WHERE state = 'running'::waterway.import_state`
+UPDATE import.imports SET state = 'queued'::import_state
+WHERE state = 'running'::import_state`
 
 	insertJobSQL = `
-INSERT INTO waterway.imports (
+INSERT INTO import.imports (
   kind,
   due,
   trys_left,
@@ -165,34 +165,34 @@
   username,
   send_email,
   data
-FROM waterway.imports
+FROM import.imports
 WHERE
   due <= CURRENT_TIMESTAMP + interval '5 seconds' AND
-  state = 'queued'::waterway.import_state AND enqueued IN (
+  state = 'queued'::import_state AND enqueued IN (
     SELECT min(enqueued)
-    FROM waterway.imports
-    WHERE state = 'queued'::waterway.import_state AND
+    FROM import.imports
+    WHERE state = 'queued'::import_state AND
     kind = ANY($1))
 LIMIT 1`
 
 	updateStateSQL = `
-UPDATE waterway.imports SET state = $1::waterway.import_state
+UPDATE import.imports SET state = $1::import_state
 WHERE id = $2`
 
 	updateStateSummarySQL = `
-UPDATE waterway.imports SET
-   state = $1::waterway.import_state,
+UPDATE import.imports SET
+   state = $1::import_state,
    summary = $2
 WHERE id = $3`
 
 	logMessageSQL = `
-INSERT INTO waterway.import_logs (
+INSERT INTO import.import_logs (
   import_id,
   kind,
   msg
 ) VALUES (
   $1,
-  $2::waterway.log_type,
+  $2::log_type,
   $3
 )`
 )
--- a/pkg/imports/sr.go	Thu Jan 24 12:50:17 2019 +0100
+++ b/pkg/imports/sr.go	Thu Jan 24 12:56:31 2019 +0100
@@ -106,7 +106,7 @@
 	srStageDoneSQL = `
 UPDATE waterway.sounding_results SET staging_done = true
 WHERE id = (
-  SELECT key from waterway.track_imports
+  SELECT key from import.track_imports
   WHERE import_id = $1 AND
         relation = 'waterway.sounding_results'::regclass)`
 
--- a/pkg/imports/st.go	Thu Jan 24 12:50:17 2019 +0100
+++ b/pkg/imports/st.go	Thu Jan 24 12:56:31 2019 +0100
@@ -67,7 +67,7 @@
   SELECT name
   FROM waterway.stretches WHERE
   id = (
-    SELECT key from waterway.track_imports
+    SELECT key from import.track_imports
     WHERE import_id = $1 AND
       relation = 'waterway.stretches'::regclass)
   AND NOT staging_done
@@ -76,7 +76,7 @@
 	stStageDoneSQL = `
 UPDATE waterway.stretches SET staging_done = true
 WHERE id IN (
-  SELECT key from waterway.track_imports
+  SELECT key from import.track_imports
   WHERE import_id = $1 AND
         relation = 'waterway.stretches'::regclass)`
 
--- a/pkg/imports/track.go	Thu Jan 24 12:50:17 2019 +0100
+++ b/pkg/imports/track.go	Thu Jan 24 12:56:31 2019 +0100
@@ -20,7 +20,7 @@
 
 const (
 	trackImportSQL = `
-    INSERT INTO waterway.track_imports (import_id, relation, key)
+    INSERT INTO import.track_imports (import_id, relation, key)
 	VALUES ($1, $2::regclass, $3)`
 )
 
--- a/pkg/scheduler/boot.go	Thu Jan 24 12:50:17 2019 +0100
+++ b/pkg/scheduler/boot.go	Thu Jan 24 12:56:31 2019 +0100
@@ -27,11 +27,11 @@
 
 	selectImportConfSQL = `
 SELECT id, username, cron
-FROM waterway.import_configuration
+FROM import.import_configuration
 WHERE cron IS NOT NULL`
 
 	scheduledIDsSQL = `
-SELECT id from waterway.import_configuration
+SELECT id from import.import_configuration
 WHERE username = $1 AND cron IS NOT NULL`
 )
 
--- a/schema/auth.sql	Thu Jan 24 12:50:17 2019 +0100
+++ b/schema/auth.sql	Thu Jan 24 12:56:31 2019 +0100
@@ -42,6 +42,17 @@
 GRANT INSERT, UPDATE, DELETE ON
     users.templates, users.user_templates TO waterway_admin;
 
+GRANT USAGE ON SCHEMA import TO waterway_admin;
+GRANT SELECT, INSERT ON ALL TABLES IN SCHEMA import TO waterway_admin;
+GRANT UPDATE ON
+    import.imports, import.import_configuration,
+        import.import_configuration_attributes
+    TO waterway_admin;
+GRANT DELETE ON
+    import.track_imports, import.import_configuration,
+        import.import_configuration_attributes
+    TO waterway_admin;
+
 --
 -- Extended privileges for sys_admin
 --
@@ -133,22 +144,22 @@
 -- RLS policies for imports and import config
 --
 
-CREATE POLICY same_country ON waterway.imports
+CREATE POLICY same_country ON import.imports
     FOR ALL TO waterway_admin
     USING (users.current_user_country() = (
         SELECT country FROM users.list_users lu
             WHERE lu.username = imports.username));
-ALTER table waterway.imports ENABLE ROW LEVEL SECURITY;
+ALTER table import.imports ENABLE ROW LEVEL SECURITY;
 
 -- The job running the import queue is running as sys_admin and login users
 -- with that role should be able to run imports without restrictions anyhow
-CREATE POLICY import_all ON waterway.imports
+CREATE POLICY import_all ON import.imports
     FOR ALL TO sys_admin
     USING (true);
 
 -- For the given table, check whether the given value is used as primary key,
 -- bypassing row level security.
-CREATE OR REPLACE FUNCTION waterway.is_new_key(
+CREATE OR REPLACE FUNCTION import.is_new_key(
         tablename varchar,
         kv anyelement)
     RETURNS boolean
@@ -160,7 +171,7 @@
         FROM information_schema.key_column_usage k
         JOIN information_schema.table_constraints USING (constraint_name)
         WHERE k.table_name = tablename and constraint_type = 'PRIMARY KEY');
-    EXECUTE format('SELECT NOT $1 = ANY(SELECT %I FROM waterway.%I)',
+    EXECUTE format('SELECT NOT $1 = ANY(SELECT %I FROM import.%I)',
         columnname, tablename)
         INTO ret
         USING kv;
@@ -171,41 +182,41 @@
     SECURITY DEFINER
     STABLE PARALLEL SAFE;
 
-CREATE POLICY parent_allowed ON waterway.import_logs
+CREATE POLICY parent_allowed ON import.import_logs
     FOR ALL TO waterway_admin
-    USING (import_id IN (SELECT id FROM waterway.imports))
-    WITH CHECK (waterway.is_new_key('imports', import_id)
-        OR import_id IN (SELECT id FROM waterway.imports));
-ALTER table waterway.import_logs ENABLE ROW LEVEL SECURITY;
+    USING (import_id IN (SELECT id FROM import.imports))
+    WITH CHECK (import.is_new_key('imports', import_id)
+        OR import_id IN (SELECT id FROM import.imports));
+ALTER table import.import_logs ENABLE ROW LEVEL SECURITY;
 
-CREATE POLICY parent_allowed ON waterway.track_imports
+CREATE POLICY parent_allowed ON import.track_imports
     FOR ALL TO waterway_admin
-    USING (import_id IN (SELECT id FROM waterway.imports))
-    WITH CHECK (waterway.is_new_key('imports', import_id)
-        OR import_id IN (SELECT id FROM waterway.imports));
-ALTER table waterway.track_imports ENABLE ROW LEVEL SECURITY;
+    USING (import_id IN (SELECT id FROM import.imports))
+    WITH CHECK (import.is_new_key('imports', import_id)
+        OR import_id IN (SELECT id FROM import.imports));
+ALTER table import.track_imports ENABLE ROW LEVEL SECURITY;
 
-CREATE POLICY import_configuration_policy ON waterway.import_configuration
+CREATE POLICY import_configuration_policy ON import.import_configuration
     FOR ALL TO waterway_admin
     USING (
         users.current_user_country() = (
             SELECT country FROM users.list_users lu
-            WHERE lu.username = waterway.import_configuration.username));
+            WHERE lu.username = import.import_configuration.username));
 
-CREATE POLICY import_configuration_policy_sys_admin ON waterway.import_configuration
+CREATE POLICY import_configuration_policy_sys_admin ON import.import_configuration
     FOR ALL TO sys_admin
     USING (true);
 
-ALTER table waterway.import_configuration ENABLE ROW LEVEL SECURITY;
+ALTER table import.import_configuration ENABLE ROW LEVEL SECURITY;
 
-CREATE POLICY parent_allowed ON waterway.import_configuration_attributes
+CREATE POLICY parent_allowed ON import.import_configuration_attributes
     FOR ALL TO waterway_admin
     USING (import_configuration_id IN (
-        SELECT id FROM waterway.import_configuration))
+        SELECT id FROM import.import_configuration))
     WITH CHECK (
-        waterway.is_new_key('import_configuration', import_configuration_id)
+        import.is_new_key('import_configuration', import_configuration_id)
         OR import_configuration_id IN (
-            SELECT id FROM waterway.import_configuration));
-ALTER table waterway.import_configuration_attributes ENABLE ROW LEVEL SECURITY;
+            SELECT id FROM import.import_configuration));
+ALTER table import.import_configuration_attributes ENABLE ROW LEVEL SECURITY;
 
 COMMIT;
--- a/schema/auth_tests.sql	Thu Jan 24 12:50:17 2019 +0100
+++ b/schema/auth_tests.sql	Thu Jan 24 12:56:31 2019 +0100
@@ -126,12 +126,12 @@
 SELECT lives_ok($$
     WITH
     job AS (
-        INSERT INTO waterway.imports (kind, username, data) VALUES (
+        INSERT INTO import.imports (kind, username, data) VALUES (
             'test', current_user, 'test') RETURNING id),
     log AS (
-        INSERT INTO waterway.import_logs (import_id, msg)
+        INSERT INTO import.import_logs (import_id, msg)
             SELECT id, 'test' FROM job)
-    INSERT INTO waterway.track_imports
+    INSERT INTO import.track_imports
         SELECT id, 'waterway.bottlenecks', 0 FROM job
     $$,
     'Waterway admin can add import job and related data');
@@ -142,10 +142,10 @@
     $$,
     $$
     WITH job AS (
-        UPDATE waterway.imports SET state = 'accepted'
+        UPDATE import.imports SET state = 'accepted'
             RETURNING id, username),
     log AS (
-        INSERT INTO waterway.import_logs (import_id, msg)
+        INSERT INTO import.import_logs (import_id, msg)
             SELECT id, 'test continued' FROM job)
     SELECT username FROM job
     $$,
@@ -154,9 +154,9 @@
 SELECT lives_ok($$
     WITH
     config AS (
-        INSERT INTO waterway.import_configuration (kind, username) VALUES (
+        INSERT INTO import.import_configuration (kind, username) VALUES (
             'test', current_user) RETURNING id)
-    INSERT INTO waterway.import_configuration_attributes
+    INSERT INTO import.import_configuration_attributes
         SELECT id, 'test key', 'test value' FROM config
     $$,
     'Waterway admin can add import config and related data');
@@ -167,13 +167,13 @@
     $$,
     $$
     WITH config AS (
-        UPDATE waterway.import_configuration SET send_email = true
+        UPDATE import.import_configuration SET send_email = true
             RETURNING id, username),
     attrib AS (
-        INSERT INTO waterway.import_configuration_attributes
+        INSERT INTO import.import_configuration_attributes
             SELECT id, 'test continued', 'test value' FROM config),
     attrib_upd AS (
-        UPDATE waterway.import_configuration_attributes SET v = 'test v'
+        UPDATE import.import_configuration_attributes SET v = 'test v'
             WHERE import_configuration_id = (SELECT id FROM config))
     SELECT username FROM config
     $$,
@@ -181,34 +181,34 @@
 
 SET SESSION AUTHORIZATION test_admin_ro;
 SELECT throws_ok($$
-    INSERT INTO waterway.import_logs (import_id, msg)
-        VALUES (currval(pg_get_serial_sequence('waterway.imports', 'id')),
+    INSERT INTO import.import_logs (import_id, msg)
+        VALUES (currval(pg_get_serial_sequence('import.imports', 'id')),
             'test')
     $$,
     42501, NULL,
     'Waterway admin cannot add log messages to other countries imports');
 
 SELECT throws_ok($$
-    DELETE FROM waterway.track_imports
+    DELETE FROM import.track_imports
         WHERE import_id = currval(
-            pg_get_serial_sequence('waterway.imports', 'id'))
+            pg_get_serial_sequence('import.imports', 'id'))
     $$,
     42501, NULL,
     'Waterway admin cannot delete tracking data of other countries imports');
 
 SELECT throws_ok($$
-    INSERT INTO waterway.import_configuration_attributes
+    INSERT INTO import.import_configuration_attributes
         VALUES (currval(pg_get_serial_sequence(
-                'waterway.import_configuration', 'id')),
+                'import.import_configuration', 'id')),
             'test', 'test value')
     $$,
     42501, NULL,
     'Waterway admin cannot add attributes to other countries import config');
 
 SELECT throws_ok($$
-    UPDATE waterway.import_configuration_attributes SET v = 'evil'
+    UPDATE import.import_configuration_attributes SET v = 'evil'
         WHERE import_configuration_id = currval(
-            pg_get_serial_sequence('waterway.import_configuration', 'id'))
+            pg_get_serial_sequence('import.import_configuration', 'id'))
     $$,
     42501, NULL,
     'Waterway admin cannot overwrite attributes of other countries config');
--- a/schema/gemma.sql	Thu Jan 24 12:50:17 2019 +0100
+++ b/schema/gemma.sql	Thu Jan 24 12:56:31 2019 +0100
@@ -602,6 +602,27 @@
       SELECT bottleneck_id, max(date_info) AS current FROM sounding_results
       GROUP BY bottleneck_id) sr ON sr.bottleneck_id = bn.id
     ORDER BY objnam
+;
+
+-- Configure primary keys for geoserver views
+INSERT INTO waterway.gt_pk_metadata VALUES ('waterway',
+                                            'distance_marks_geoserver',
+                                            'location_code');
+
+--
+-- Import queue and respective logging
+--
+CREATE TYPE import_state AS ENUM (
+    'queued',
+    'running',
+    'failed', 'unchanged', 'pending',
+    'accepted', 'declined'
+);
+
+CREATE TYPE log_type AS ENUM ('info', 'warn', 'error');
+
+-- Namespace for import queue and respective logging
+CREATE SCHEMA import
 
     CREATE TABLE import_configuration (
         id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
@@ -624,71 +645,56 @@
         v TEXT NOT NULL,
         UNIQUE (import_configuration_id, k)
     )
+
+    CREATE TABLE imports (
+        id         int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
+        state      import_state NOT NULL DEFAULT 'queued',
+        kind       varchar   NOT NULL,
+        enqueued   timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+        due        timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+        retry_wait interval
+            CHECK(retry_wait IS NULL
+                OR retry_wait >= interval '0 microseconds'),
+        trys_left  int, -- if NULL and retry_wait NOT NULL, endless
+        username   varchar   NOT NULL
+            REFERENCES internal.user_profiles(username)
+                ON DELETE CASCADE
+                ON UPDATE CASCADE,
+        signer varchar
+            REFERENCES internal.user_profiles(username)
+                ON DELETE SET NULL
+                ON UPDATE CASCADE,
+        send_email boolean NOT NULL DEFAULT false,
+        data       TEXT,
+        summary    TEXT
+    )
+
+    CREATE INDEX enqueued_idx ON imports(enqueued, state)
+
+    CREATE TABLE import_logs (
+        import_id int NOT NULL REFERENCES imports(id)
+            ON DELETE CASCADE,
+        time timestamp NOT NULL DEFAULT now(),
+        kind log_type NOT NULL DEFAULT 'info',
+        msg TEXT NOT NULL
+    )
+
+    CREATE TABLE track_imports (
+        import_id int      NOT NULL REFERENCES imports(id)
+            ON DELETE CASCADE,
+        relation  regclass NOT NULL,
+        key       int      NOT NULL,
+        UNIQUE (relation, key)
+    )
 ;
 
--- Configure primary keys for geoserver views
-INSERT INTO waterway.gt_pk_metadata VALUES ('waterway',
-                                            'distance_marks_geoserver',
-                                            'location_code');
-
-
---
--- Import queue and respective logging
---
-CREATE TYPE waterway.import_state AS ENUM (
-    'queued',
-    'running',
-    'failed', 'unchanged', 'pending',
-    'accepted', 'declined'
-);
-
-CREATE TABLE waterway.imports (
-    id         int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
-    state      waterway.import_state NOT NULL DEFAULT 'queued',
-    kind       varchar   NOT NULL,
-    enqueued   timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
-    due        timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
-    retry_wait interval
-        CHECK(retry_wait IS NULL OR retry_wait >= interval '0 microseconds'),
-    trys_left  int, -- if NULL and retry_wait NOT NUL, endless
-    username  varchar   NOT NULL
-        REFERENCES internal.user_profiles(username)
-            ON DELETE CASCADE
-            ON UPDATE CASCADE,
-    signer varchar
-        REFERENCES internal.user_profiles(username)
-            ON DELETE SET NULL
-            ON UPDATE CASCADE,
-    send_email boolean NOT NULL DEFAULT false,
-    data       TEXT,
-    summary    TEXT
-);
-
-CREATE INDEX enqueued_idx ON waterway.imports(enqueued, state);
-
-CREATE TYPE waterway.log_type AS ENUM ('info', 'warn', 'error');
-
-CREATE TABLE waterway.import_logs (
-    import_id int NOT NULL REFERENCES waterway.imports(id) ON DELETE CASCADE,
-    time timestamp NOT NULL DEFAULT now(),
-    kind waterway.log_type NOT NULL DEFAULT 'info',
-    msg TEXT NOT NULL
-);
-
-CREATE TABLE waterway.track_imports (
-    import_id int      NOT NULL REFERENCES waterway.imports(id) ON DELETE CASCADE,
-    relation  regclass NOT NULL,
-    key       int      NOT NULL,
-    UNIQUE (relation, key)
-);
-
-CREATE FUNCTION waterway.del_import(imp_id int) RETURNS void AS
+CREATE FUNCTION import.del_import(imp_id int) RETURNS void AS
 $$
 DECLARE
     tmp RECORD;
 BEGIN
     FOR tmp IN
-        SELECT * FROM waterway.track_imports WHERE import_id = imp_id
+        SELECT * FROM import.track_imports WHERE import_id = imp_id
     LOOP
         EXECUTE format('DELETE FROM %s WHERE id = $1', tmp.relation) USING tmp.key;
     END LOOP;
@@ -696,7 +702,7 @@
 $$
 LANGUAGE plpgsql;
 
-CREATE FUNCTION waterway.del_import() RETURNS trigger AS
+CREATE FUNCTION import.del_import() RETURNS trigger AS
 $$
 BEGIN
     EXECUTE format('DELETE FROM %s WHERE id = $1', OLD.relation) USING OLD.key;
--- a/schema/tap_tests_data.sql	Thu Jan 24 12:50:17 2019 +0100
+++ b/schema/tap_tests_data.sql	Thu Jan 24 12:56:31 2019 +0100
@@ -95,17 +95,17 @@
 
 WITH
 job AS (
-    INSERT INTO waterway.imports (kind, username, data) VALUES (
+    INSERT INTO import.imports (kind, username, data) VALUES (
         'test', 'test_admin_ro', 'test') RETURNING id),
 log AS (
-    INSERT INTO waterway.import_logs (import_id, msg)
+    INSERT INTO import.import_logs (import_id, msg)
         SELECT id, 'test' FROM job)
-INSERT INTO waterway.track_imports
+INSERT INTO import.track_imports
     SELECT id, 'waterway.bottlenecks', 1 FROM job;
 
 WITH
 config AS (
-    INSERT INTO waterway.import_configuration (kind, username) VALUES (
+    INSERT INTO import.import_configuration (kind, username) VALUES (
         'test', 'test_admin_ro') RETURNING id)
-INSERT INTO waterway.import_configuration_attributes
+INSERT INTO import.import_configuration_attributes
     SELECT id, 'test key', 'test value' FROM config;