changeset 5135:53618d18e387

Merge queued-stage-done into default
author Tom Gottfried <tom@intevation.de>
date Fri, 27 Mar 2020 15:42:11 +0100
parents 8d5e3ce27d20 (current diff) 590ccab9ab70 (diff)
children 03f28c5d2a88 3c748b2b4de6
files schema/updates/1430/01.bottlenecks_geoserver_add_time.sql
diffstat 8 files changed, 373 insertions(+), 148 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/importqueue.go	Fri Mar 27 15:30:15 2020 +0100
+++ b/pkg/controllers/importqueue.go	Fri Mar 27 15:42:11 2020 +0100
@@ -23,6 +23,7 @@
 	"net/http"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/gorilla/mux"
@@ -120,6 +121,9 @@
 		b = append(b, term)
 	}
 
+	// Always filter review jobs. They are only for internal use.
+	cond(` NOT kind LIKE '%%` + imports.ReviewJobSuffix + `'`)
+
 	if query := req.FormValue("query"); query != "" {
 		query = "%" + query + "%"
 		cond(` (kind ILIKE $%d OR username ILIKE $%d OR signer ILIKE $%d OR `+
@@ -595,21 +599,46 @@
 
 	results := make([]reviewResult, len(rs))
 
-	for i := range rs {
-		rev := &rs[i]
-		msg, err := decideImport(req, rev.ID, string(rev.State))
-		var errString string
-		if err != nil {
-			errString = err.Error()
-		}
-		results[i] = reviewResult{
-			ID:      rev.ID,
-			Message: msg,
-			Error:   errString,
-		}
+	for i := range results {
+		results[i].ID = rs[i].ID
+		results[i].Message = fmt.Sprintf("Finalizing import #%d in progress.", rs[i].ID)
 	}
 
-	return mw.JSONResult{Result: results}, nil
+	var wg sync.WaitGroup
+	var mu sync.Mutex
+
+	for i := range rs {
+		wg.Add(1)
+		go func(idx int) {
+			defer wg.Done()
+			rev := &rs[idx]
+			msg, err := decideImport(req, rev.ID, string(rev.State))
+			mu.Lock()
+			if err != nil {
+				results[idx].Error = err.Error()
+			}
+			results[idx].Message = msg
+			mu.Unlock()
+		}(i)
+	}
+
+	done := make(chan struct{})
+	go func() {
+		defer close(done)
+		wg.Wait()
+	}()
+
+	select {
+	case <-time.After(5 * time.Second):
+	case <-done:
+	}
+
+	out := make([]reviewResult, len(rs))
+	mu.Lock()
+	copy(out, results)
+	mu.Unlock()
+
+	return mw.JSONResult{Result: out}, nil
 }
 
 func reviewImport(req *http.Request) (jr mw.JSONResult, err error) {
@@ -638,23 +667,16 @@
 	id int64,
 	state string,
 ) (message string, err error) {
-	ctx := req.Context()
-
-	accepted := state == "accepted"
 
 	session, _ := auth.GetSession(req)
 	reviewer := session.User
 
+	ctx := req.Context()
+	accepted := state == "accepted"
+
 	if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil {
-		err = mw.JSONError{
-			Code:    http.StatusBadRequest,
-			Message: err.Error(),
-		}
-		return
+		return "", err
 	}
 
-	message = fmt.Sprintf(
-		"Requested import #%d to be %s.", id, state)
-
-	return
+	return fmt.Sprintf("Import #%d is %s.", id, state), nil
 }
--- a/pkg/imports/queue.go	Fri Mar 27 15:30:15 2020 +0100
+++ b/pkg/imports/queue.go	Fri Mar 27 15:42:11 2020 +0100
@@ -66,6 +66,11 @@
 		CleanUp() error
 	}
 
+	FeedbackJob interface {
+		Job
+		CreateFeedback(int64) Feedback
+	}
+
 	// JobKind is the type of an import.
 	// Choose a unique name for every import.
 	JobKind string
@@ -92,12 +97,17 @@
 		AutoAccept() bool
 	}
 
+	JobRemover interface {
+		JobCreator
+		RemoveJob() bool
+	}
+
 	idJob struct {
 		id        int64
 		kind      JobKind
 		user      string
 		waitRetry pgtype.Interval
-		trysLeft  sql.NullInt64
+		triesLeft sql.NullInt64
 		sendEmail bool
 		data      string
 	}
@@ -108,17 +118,28 @@
 	runExclusive = -66666
 )
 
+const (
+	ReviewJobSuffix  = "#review"
+	reviewJobRetries = 10
+	reviewJobWait    = time.Minute
+)
+
 type importQueue struct {
-	signalChan chan struct{}
+	cmdCh chan func(*importQueue)
+
 	creatorsMu sync.Mutex
 	creators   map[JobKind]JobCreator
 	usedDeps   map[string]int
+
+	waiting map[int64]chan struct{}
 }
 
 var iqueue = importQueue{
-	signalChan: make(chan struct{}),
-	creators:   map[JobKind]JobCreator{},
-	usedDeps:   map[string]int{},
+	cmdCh: make(chan func(*importQueue)),
+
+	creators: map[JobKind]JobCreator{},
+	usedDeps: map[string]int{},
+	waiting:  make(map[int64]chan struct{}),
 }
 
 var (
@@ -131,6 +152,7 @@
 		"pending",
 		"accepted",
 		"declined",
+		"reviewed",
 	}
 )
 
@@ -162,8 +184,9 @@
   $7
 ) RETURNING id`
 
+	// Select oldest queued job but prioritize review jobs
 	selectJobSQL = `
-SELECT
+SELECT DISTINCT ON (kind LIKE '%` + ReviewJobSuffix + `')
   id,
   kind,
   trys_left,
@@ -174,11 +197,9 @@
 FROM import.imports
 WHERE
   due <= CURRENT_TIMESTAMP + interval '5 seconds' AND
-  state = 'queued'::import_state AND enqueued IN (
-    SELECT min(enqueued)
-    FROM import.imports
-    WHERE state = 'queued'::import_state AND
-    kind = ANY($1))
+  state = 'queued'::import_state AND
+  kind = ANY($1)
+ORDER BY kind LIKE '%` + ReviewJobSuffix + `' DESC, enqueued
 LIMIT 1`
 
 	updateStateSQL = `
@@ -194,6 +215,9 @@
    summary = $2
 WHERE id = $3`
 
+	deleteJobSQL = `
+DELETE FROM import.imports WHERE id = $1`
+
 	logMessageSQL = `
 INSERT INTO import.import_logs (
   import_id,
@@ -215,10 +239,122 @@
 	return string(ue)
 }
 
+type reviewedJobCreator struct {
+	jobCreator JobCreator
+}
+
+func (*reviewedJobCreator) AutoAccept() bool {
+	return true
+}
+
+func (*reviewedJobCreator) RemoveJob() bool {
+	return true
+}
+
+func (rjc *reviewedJobCreator) Depends() [2][]string {
+	return rjc.jobCreator.Depends()
+}
+
+func (rjc *reviewedJobCreator) Description() string {
+	return rjc.jobCreator.Description() + ReviewJobSuffix
+}
+
+func (*reviewedJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
+	return nil
+}
+
+type reviewedJob struct {
+	ID       int64 `json:"id"`
+	Accepted bool  `json:"accepted"`
+}
+
+func (*reviewedJobCreator) Create() Job {
+	return new(reviewedJob)
+}
+
+func (*reviewedJob) CleanUp() error { return nil }
+
+func (r *reviewedJob) CreateFeedback(int64) Feedback {
+	return logFeedback(r.ID)
+}
+
+func (rj *reviewedJob) Do(
+	ctx context.Context,
+	importID int64,
+	conn *sql.Conn,
+	feedback Feedback,
+) (interface{}, error) {
+
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	var signer string
+	if err := tx.QueryRowContext(ctx, selectUserSQL, importID).Scan(&signer); err != nil {
+		return nil, err
+	}
+
+	var user, kind string
+	if err := tx.QueryRowContext(ctx, selectUserKindSQL, rj.ID).Scan(&user, &kind); err != nil {
+		return nil, err
+	}
+
+	jc := FindJobCreator(JobKind(kind))
+	if jc == nil {
+		return nil, fmt.Errorf("no job creator found for '%s'", kind)
+	}
+
+	importFeedback := logFeedback(rj.ID)
+
+	if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
+		userTx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return err
+		}
+		defer userTx.Rollback()
+
+		if rj.Accepted {
+			err = jc.StageDone(ctx, userTx, rj.ID, importFeedback)
+		} else {
+			_, err = userTx.ExecContext(ctx, deleteImportDataSQL, rj.ID)
+		}
+		if err == nil {
+			err = userTx.Commit()
+		}
+		return err
+	}); err != nil {
+		return nil, err
+	}
+
+	// Remove the import track
+	if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, rj.ID); err != nil {
+		return nil, err
+	}
+
+	var state string
+	if rj.Accepted {
+		state = "accepted"
+	} else {
+		state = "declined"
+	}
+
+	if _, err := tx.ExecContext(ctx, reviewSQL, state, signer, rj.ID); err != nil {
+		return nil, err
+	}
+
+	importFeedback.Info("User '%s' %s import %d.", signer, state, rj.ID)
+
+	return nil, tx.Commit()
+}
+
 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
 	q.creatorsMu.Lock()
 	defer q.creatorsMu.Unlock()
 	q.creators[kind] = jc
+	q.creators[kind+ReviewJobSuffix] = &reviewedJobCreator{jobCreator: jc}
+
 }
 
 // FindJobCreator looks up a JobCreator in the global import queue.
@@ -275,16 +411,16 @@
 
 func (idj *idJob) nextRetry(feedback Feedback) bool {
 	switch {
-	case idj.waitRetry.Status != pgtype.Present && !idj.trysLeft.Valid:
+	case idj.waitRetry.Status != pgtype.Present && !idj.triesLeft.Valid:
 		return false
-	case idj.waitRetry.Status == pgtype.Present && !idj.trysLeft.Valid:
+	case idj.waitRetry.Status == pgtype.Present && !idj.triesLeft.Valid:
 		return true
-	case idj.trysLeft.Valid:
-		if idj.trysLeft.Int64 < 1 {
-			feedback.Warn("import should be retried, but no retrys left")
+	case idj.triesLeft.Valid:
+		if idj.triesLeft.Int64 < 1 {
+			feedback.Warn("no retries left")
 		} else {
-			idj.trysLeft.Int64--
-			feedback.Info("import failed but will be retried")
+			idj.triesLeft.Int64--
+			feedback.Info("failed but will retry")
 			return true
 		}
 	}
@@ -304,11 +440,11 @@
 	return now
 }
 
-func (idj *idJob) trysLeftPointer() *int {
-	if !idj.trysLeft.Valid {
+func (idj *idJob) triesLeftPointer() *int {
+	if !idj.triesLeft.Valid {
 		return nil
 	}
-	t := int(idj.trysLeft.Int64)
+	t := int(idj.triesLeft.Int64)
 	return &t
 }
 
@@ -357,12 +493,13 @@
 func (q *importQueue) addJob(
 	kind JobKind,
 	due time.Time,
-	trysLeft *int,
+	triesLeft *int,
 	waitRetry *time.Duration,
 	user string,
 	sendEmail bool,
 	data string,
-) (int64, error) {
+	sync bool,
+) (int64, chan struct{}, error) {
 
 	var id int64
 	if due.IsZero() {
@@ -371,39 +508,47 @@
 	due = due.UTC()
 
 	var tl sql.NullInt64
-	if trysLeft != nil {
-		tl = sql.NullInt64{Int64: int64(*trysLeft), Valid: true}
+	if triesLeft != nil {
+		tl = sql.NullInt64{Int64: int64(*triesLeft), Valid: true}
 	}
 
 	var wr pgtype.Interval
 	if waitRetry != nil {
 		if err := wr.Set(*waitRetry); err != nil {
-			return 0, err
+			return 0, nil, err
 		}
 	} else {
 		wr = pgtype.Interval{Status: pgtype.Null}
 	}
 
-	ctx := context.Background()
-	err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
-		return conn.QueryRowContext(
-			ctx,
-			insertJobSQL,
-			string(kind),
-			due,
-			tl,
-			&wr,
-			user,
-			sendEmail,
-			data).Scan(&id)
-	})
-	if err == nil {
-		select {
-		case q.signalChan <- struct{}{}:
-		default:
-		}
+	errCh := make(chan error)
+	var done chan struct{}
+
+	q.cmdCh <- func(q *importQueue) {
+		ctx := context.Background()
+		errCh <- auth.RunAs(ctx, user, func(conn *sql.Conn) error {
+			err := conn.QueryRowContext(
+				ctx,
+				insertJobSQL,
+				string(kind),
+				due,
+				tl,
+				&wr,
+				user,
+				sendEmail,
+				data).Scan(&id)
+
+			if err == nil && sync {
+				log.Printf("info: register wait for %d\n", id)
+				done = make(chan struct{})
+				q.waiting[id] = done
+			}
+
+			return err
+		})
 	}
-	return id, err
+
+	return id, done, <-errCh
 }
 
 // AddJob adds a job to the global import queue to be executed
@@ -413,31 +558,38 @@
 func AddJob(
 	kind JobKind,
 	due time.Time,
-	trysLeft *int,
+	triesLeft *int,
 	waitRetry *time.Duration,
 	user string,
 	sendEmail bool,
 	data string,
 ) (int64, error) {
-	return iqueue.addJob(
+	id, _, err := iqueue.addJob(
 		kind,
 		due,
-		trysLeft,
+		triesLeft,
 		waitRetry,
 		user,
 		sendEmail,
-		data)
+		data,
+		false)
+	return id, err
 }
 
 const (
 	isPendingSQL = `
 SELECT
 	state = 'pending'::import_state,
-	kind,
-	username
+	kind
 FROM import.imports
 WHERE id = $1`
 
+	selectUserSQL = `
+SELECT username from import.imports WHERE ID = $1`
+
+	selectUserKindSQL = `
+SELECT username, kind from import.imports WHERE ID = $1`
+
 	reviewSQL = `
 UPDATE import.imports SET
   state = $1::import_state,
@@ -449,9 +601,6 @@
 
 	deleteImportTrackSQL = `
 DELETE FROM import.track_imports WHERE import_id = $1`
-
-	logDecisionSQL = `
-INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
 )
 
 func (q *importQueue) decideImportTx(
@@ -460,70 +609,62 @@
 	id int64,
 	accepted bool,
 	reviewer string,
-) error {
+) (chan struct{}, error) {
 	var (
 		pending bool
 		kind    string
-		user    string
 	)
 
-	switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); {
+	switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind); {
 	case err == sql.ErrNoRows:
-		return fmt.Errorf("cannot find import #%d", id)
+		return nil, fmt.Errorf("cannot find import #%d", id)
 	case err != nil:
-		return err
+		return nil, err
 	case !pending:
-		return fmt.Errorf("#%d is not pending", id)
+		return nil, fmt.Errorf("#%d is not pending", id)
 	}
 
 	jc := q.jobCreator(JobKind(kind))
 	if jc == nil {
-		return fmt.Errorf("no job creator for kind '%s'", kind)
+		return nil, fmt.Errorf("no job creator for kind '%s'", kind)
+	}
+
+	r := &reviewedJob{
+		ID:       id,
+		Accepted: accepted,
+	}
+	serialized, err := common.ToJSONString(r)
+	if err != nil {
+		return nil, err
 	}
 
-	if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
-		txUser, err := conn.BeginTx(ctx, nil)
-		if err != nil {
-			return err
-		}
-		defer txUser.Rollback()
+	// Try a little harder to persist the decision.
+	tries := reviewJobRetries
+	wait := reviewJobWait
 
-		if accepted {
-			feedback := logFeedback(id)
-			err = jc.StageDone(ctx, txUser, id, feedback)
-		} else {
-			_, err = txUser.ExecContext(ctx, deleteImportDataSQL, id)
-		}
-
-		if err == nil {
-			err = txUser.Commit()
-		}
-
-		return err
-	}); err != nil {
-		return err
+	rID, done, err := q.addJob(
+		JobKind(kind+ReviewJobSuffix),
+		time.Now(),
+		&tries,
+		&wait,
+		reviewer,
+		false,
+		serialized,
+		true)
+	if err != nil {
+		return nil, err
 	}
-
-	// Remove the import track
-	if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
-		return err
+	log.Printf("info: add review job %d\n", rID)
+	_, err = tx.ExecContext(ctx, updateStateSQL, "reviewed", id)
+	if err != nil && done != nil {
+		go func() {
+			q.cmdCh <- func(q *importQueue) {
+				delete(q.waiting, rID)
+			}
+		}()
+		done = nil
 	}
-
-	var state string
-	if accepted {
-		state = "accepted"
-	} else {
-		state = "declined"
-	}
-
-	logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id)
-
-	if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil {
-		return err
-	}
-
-	_, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id)
-	return err
+	return done, err
 }
 
 func (q *importQueue) decideImport(
@@ -536,18 +677,25 @@
 		ctx = context.Background()
 	}
 
-	return auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error {
+	var done chan struct{}
+
+	if err := auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error {
 		tx, err := conn.BeginTx(ctx, nil)
 		if err != nil {
 			return err
 		}
 		defer tx.Rollback()
-		err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
+		done, err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
 		if err == nil {
 			err = tx.Commit()
 		}
 		return err
-	})
+	}); err != nil {
+		return err
+	}
+
+	<-done
+	return nil
 }
 
 func DecideImport(
@@ -646,7 +794,7 @@
 		if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
 			&ji.id,
 			&ji.kind,
-			&ji.trysLeft,
+			&ji.triesLeft,
 			&ji.waitRetry,
 			&ji.user,
 			&ji.sendEmail,
@@ -714,6 +862,13 @@
 	})
 }
 
+func deleteJob(ctx context.Context, id int64) error {
+	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
+		_, err := conn.ExecContext(ctx, deleteJobSQL, id)
+		return err
+	})
+}
+
 func errorAndFail(id int64, format string, args ...interface{}) error {
 	ctx := context.Background()
 	return tryHardToStoreState(ctx, func(conn *sql.Conn) error {
@@ -756,7 +911,9 @@
 				break
 			}
 			select {
-			case <-q.signalChan:
+			case cmd := <-q.cmdCh:
+				cmd(q)
+
 			case <-time.After(pollDuration):
 			}
 		}
@@ -774,12 +931,16 @@
 
 		go func(jc JobCreator, idj *idJob) {
 
-			// Unlock the dependencies.
 			defer func() {
+				// Unlock the dependencies.
 				q.unlockDependencies(jc)
-				select {
-				case q.signalChan <- struct{}{}:
-				default:
+				// Unlock waiting.
+				q.cmdCh <- func(q *importQueue) {
+					if w := q.waiting[idj.id]; w != nil {
+						log.Printf("info: unlock waiting %d\n", idj.id)
+						close(w)
+						delete(q.waiting, idj.id)
+					}
 				}
 			}()
 
@@ -790,9 +951,12 @@
 				return
 			}
 
-			feedback := logFeedback(idj.id)
-
-			feedback.Info("import #%d started", idj.id)
+			var feedback Feedback
+			if fc, ok := job.(FeedbackJob); ok {
+				feedback = fc.CreateFeedback(idj.id)
+			} else {
+				feedback = logFeedback(idj.id)
+			}
 
 			ctx := context.Background()
 			var summary interface{}
@@ -823,6 +987,11 @@
 				}
 			}
 
+			var remove bool
+			if remover, ok := jc.(JobRemover); ok {
+				remove = remover.RemoveJob()
+			}
+
 			var state string
 			switch {
 			case unchanged:
@@ -834,28 +1003,36 @@
 			default:
 				state = "pending"
 			}
-			if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
-				log.Printf("error: setting state of job %d failed: %v\n", idj.id, err)
+			if !remove {
+				if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
+					log.Printf("error: setting state of job %d failed: %v\n", idj.id, err)
+				}
+				log.Printf("info: import #%d finished: %s\n", idj.id, state)
 			}
-			log.Printf("info: import #%d finished: %s\n", idj.id, state)
 			if idj.sendEmail {
 				go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
 			}
 
 			if retry {
-				nid, err := q.addJob(
+				nid, _, err := q.addJob(
 					idj.kind,
 					idj.nextDue(),
-					idj.trysLeftPointer(),
+					idj.triesLeftPointer(),
 					idj.waitRetryPointer(),
 					idj.user, idj.sendEmail,
-					idj.data)
+					idj.data,
+					false)
 				if err != nil {
 					log.Printf("error: retry enqueue failed: %v\n", err)
 				} else {
 					log.Printf("info: re-enqueued job with id %d\n", nid)
 				}
 			}
+			if remove {
+				if err := deleteJob(ctx, idj.id); err != nil {
+					log.Printf("error: deleting job %d failed: %v\n", idj.id, err)
+				}
+			}
 		}(jc, idj)
 	}
 }
--- a/schema/auth.sql	Fri Mar 27 15:30:15 2020 +0100
+++ b/schema/auth.sql	Fri Mar 27 15:42:11 2020 +0100
@@ -61,6 +61,7 @@
 GRANT INSERT, UPDATE ON sys_admin.system_config TO sys_admin;
 GRANT UPDATE ON sys_admin.published_services TO sys_admin;
 GRANT INSERT, DELETE, UPDATE ON sys_admin.password_reset_requests TO sys_admin;
+GRANT DELETE ON import.imports, import.import_logs TO sys_admin;
 
 --
 -- Privileges assigned directly to metamorph
--- a/schema/gemma.sql	Fri Mar 27 15:30:15 2020 +0100
+++ b/schema/gemma.sql	Fri Mar 27 15:42:11 2020 +0100
@@ -1220,7 +1220,7 @@
     'queued',
     'running',
     'failed', 'unchanged', 'pending',
-    'accepted', 'declined'
+    'accepted', 'declined', 'reviewed'
 );
 
 CREATE TYPE log_type AS ENUM ('info', 'warn', 'error');
@@ -1270,8 +1270,10 @@
         data       TEXT,
         summary    TEXT
     )
-
-    CREATE INDEX enqueued_idx ON imports(enqueued, state)
+    -- Mainly for listing imports in clients:
+    CREATE INDEX enqueued_idx ON imports(enqueued)
+    -- For fast retrieval of queued imports by the import queue in backend:
+    CREATE INDEX state_idx ON imports(state)
 
     CREATE TABLE import_logs (
         import_id int NOT NULL REFERENCES imports(id)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1433/01.add_state.sql	Fri Mar 27 15:42:11 2020 +0100
@@ -0,0 +1,19 @@
+-- DROP and re-CREATE type because adding a value isn't possible in transaction
+-- https://www.postgresql.org/docs/11/sql-altertype.html#id-1.9.3.42.7
+
+ALTER TABLE import.imports
+    ALTER COLUMN state DROP DEFAULT,
+    ALTER COLUMN state TYPE varchar;
+
+DROP TYPE import_state;
+
+CREATE TYPE import_state AS ENUM (
+    'queued',
+    'running',
+    'failed', 'unchanged', 'pending',
+    'accepted', 'declined', 'reviewed'
+);
+
+ALTER TABLE import.imports
+    ALTER COLUMN state TYPE import_state USING CAST(state AS import_state),
+    ALTER COLUMN state SET DEFAULT 'queued';
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1434/01.allow_job_delete_sys_admin.sql	Fri Mar 27 15:42:11 2020 +0100
@@ -0,0 +1,1 @@
+GRANT DELETE ON import.imports, import.import_logs TO sys_admin;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1435/01.add_import_state_idx.sql	Fri Mar 27 15:42:11 2020 +0100
@@ -0,0 +1,3 @@
+DROP INDEX import.enqueued_idx;
+CREATE INDEX enqueued_idx ON import.imports(enqueued);
+CREATE INDEX state_idx ON import.imports(state);
--- a/schema/version.sql	Fri Mar 27 15:30:15 2020 +0100
+++ b/schema/version.sql	Fri Mar 27 15:42:11 2020 +0100
@@ -1,1 +1,1 @@
-INSERT INTO gemma_schema_version(version) VALUES (1432);
+INSERT INTO gemma_schema_version(version) VALUES (1435);