comparison pkg/imports/queue.go @ 5028:d727641911a5

Moved import desision logic to import queue (where it belongs). Major change: StageDone of the import job is executed by the original user who does the import.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 18 Mar 2020 17:52:00 +0100
parents 47922c1a088d
children 0fedd50dbf52
comparison
equal deleted inserted replaced
5027:fa662af56a3d 5028:d727641911a5
427 user, 427 user,
428 sendEmail, 428 sendEmail,
429 data) 429 data)
430 } 430 }
431 431
432 const (
433 isPendingSQL = `
434 SELECT
435 state = 'pending'::import_state,
436 kind,
437 user
438 FROM import.imports
439 WHERE id = $1`
440
441 reviewSQL = `
442 UPDATE import.imports SET
443 state = $1::import_state,
444 changed = CURRENT_TIMESTAMP,
445 signer = $2
446 WHERE id = $3`
447
448 deleteImportDataSQL = `SELECT import.del_import($1)`
449
450 deleteImportTrackSQL = `
451 DELETE FROM import.track_imports WHERE import_id = $1`
452
453 logDecisionSQL = `
454 INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
455 )
456
457 func (q *importQueue) decideImportTx(
458 ctx context.Context,
459 tx *sql.Tx,
460 id int64,
461 accepted bool,
462 reviewer string,
463 ) error {
464 var (
465 pending bool
466 kind string
467 user string
468 )
469
470 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); {
471 case err == sql.ErrNoRows:
472 return fmt.Errorf("cannot find import #%d", id)
473 case err != nil:
474 return err
475 case !pending:
476 return fmt.Errorf("#%d is not pending", id)
477 }
478
479 jc := q.jobCreator(JobKind(kind))
480 if jc == nil {
481 return fmt.Errorf("no job creator for kind '%s'", kind)
482 }
483
484 if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
485 txUser, err := conn.BeginTx(ctx, nil)
486 if err != nil {
487 return err
488 }
489 defer txUser.Rollback()
490
491 if accepted {
492 err = jc.StageDone(ctx, txUser, id)
493 } else {
494 _, err = txUser.ExecContext(ctx, deleteImportDataSQL, id)
495 }
496
497 if err == nil {
498 err = txUser.Commit()
499 }
500
501 return err
502 }); err != nil {
503 return err
504 }
505
506 // Remove the import track
507 if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
508 return err
509 }
510
511 var state string
512 if accepted {
513 state = "accepted"
514 } else {
515 state = "declined"
516 }
517
518 logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id)
519
520 if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil {
521 return err
522 }
523
524 _, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id)
525 return err
526 }
527
528 func (q *importQueue) decideImport(
529 ctx context.Context,
530 id int64,
531 accepted bool,
532 reviewer string,
533 ) error {
534 if ctx == nil {
535 ctx = context.Background()
536 }
537
538 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
539 tx, err := conn.BeginTx(ctx, nil)
540 if err != nil {
541 return err
542 }
543 defer tx.Rollback()
544 err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
545 if err == nil {
546 err = tx.Commit()
547 }
548 return err
549 })
550 }
551
552 func DecideImport(
553 ctx context.Context,
554 id int64,
555 accepted bool,
556 reviewer string,
557 ) error {
558 return iqueue.decideImport(ctx, id, accepted, reviewer)
559 }
560
432 type logFeedback int64 561 type logFeedback int64
433 562
434 func (lf logFeedback) log(kind, format string, args ...interface{}) { 563 func (lf logFeedback) log(kind, format string, args ...interface{}) {
435 ctx := context.Background() 564 ctx := context.Background()
436 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { 565 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {