Mercurial > gemma
comparison pkg/imports/queue.go @ 5028:d727641911a5
Moved import desision logic to import queue (where it belongs).
Major change: StageDone of the import job is executed by the original user
who does the import.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 18 Mar 2020 17:52:00 +0100 |
parents | 47922c1a088d |
children | 0fedd50dbf52 |
comparison
equal
deleted
inserted
replaced
5027:fa662af56a3d | 5028:d727641911a5 |
---|---|
427 user, | 427 user, |
428 sendEmail, | 428 sendEmail, |
429 data) | 429 data) |
430 } | 430 } |
431 | 431 |
432 const ( | |
433 isPendingSQL = ` | |
434 SELECT | |
435 state = 'pending'::import_state, | |
436 kind, | |
437 user | |
438 FROM import.imports | |
439 WHERE id = $1` | |
440 | |
441 reviewSQL = ` | |
442 UPDATE import.imports SET | |
443 state = $1::import_state, | |
444 changed = CURRENT_TIMESTAMP, | |
445 signer = $2 | |
446 WHERE id = $3` | |
447 | |
448 deleteImportDataSQL = `SELECT import.del_import($1)` | |
449 | |
450 deleteImportTrackSQL = ` | |
451 DELETE FROM import.track_imports WHERE import_id = $1` | |
452 | |
453 logDecisionSQL = ` | |
454 INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)` | |
455 ) | |
456 | |
457 func (q *importQueue) decideImportTx( | |
458 ctx context.Context, | |
459 tx *sql.Tx, | |
460 id int64, | |
461 accepted bool, | |
462 reviewer string, | |
463 ) error { | |
464 var ( | |
465 pending bool | |
466 kind string | |
467 user string | |
468 ) | |
469 | |
470 switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); { | |
471 case err == sql.ErrNoRows: | |
472 return fmt.Errorf("cannot find import #%d", id) | |
473 case err != nil: | |
474 return err | |
475 case !pending: | |
476 return fmt.Errorf("#%d is not pending", id) | |
477 } | |
478 | |
479 jc := q.jobCreator(JobKind(kind)) | |
480 if jc == nil { | |
481 return fmt.Errorf("no job creator for kind '%s'", kind) | |
482 } | |
483 | |
484 if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { | |
485 txUser, err := conn.BeginTx(ctx, nil) | |
486 if err != nil { | |
487 return err | |
488 } | |
489 defer txUser.Rollback() | |
490 | |
491 if accepted { | |
492 err = jc.StageDone(ctx, txUser, id) | |
493 } else { | |
494 _, err = txUser.ExecContext(ctx, deleteImportDataSQL, id) | |
495 } | |
496 | |
497 if err == nil { | |
498 err = txUser.Commit() | |
499 } | |
500 | |
501 return err | |
502 }); err != nil { | |
503 return err | |
504 } | |
505 | |
506 // Remove the import track | |
507 if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { | |
508 return err | |
509 } | |
510 | |
511 var state string | |
512 if accepted { | |
513 state = "accepted" | |
514 } else { | |
515 state = "declined" | |
516 } | |
517 | |
518 logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id) | |
519 | |
520 if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil { | |
521 return err | |
522 } | |
523 | |
524 _, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id) | |
525 return err | |
526 } | |
527 | |
528 func (q *importQueue) decideImport( | |
529 ctx context.Context, | |
530 id int64, | |
531 accepted bool, | |
532 reviewer string, | |
533 ) error { | |
534 if ctx == nil { | |
535 ctx = context.Background() | |
536 } | |
537 | |
538 return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { | |
539 tx, err := conn.BeginTx(ctx, nil) | |
540 if err != nil { | |
541 return err | |
542 } | |
543 defer tx.Rollback() | |
544 err = q.decideImportTx(ctx, tx, id, accepted, reviewer) | |
545 if err == nil { | |
546 err = tx.Commit() | |
547 } | |
548 return err | |
549 }) | |
550 } | |
551 | |
552 func DecideImport( | |
553 ctx context.Context, | |
554 id int64, | |
555 accepted bool, | |
556 reviewer string, | |
557 ) error { | |
558 return iqueue.decideImport(ctx, id, accepted, reviewer) | |
559 } | |
560 | |
432 type logFeedback int64 | 561 type logFeedback int64 |
433 | 562 |
434 func (lf logFeedback) log(kind, format string, args ...interface{}) { | 563 func (lf logFeedback) log(kind, format string, args ...interface{}) { |
435 ctx := context.Background() | 564 ctx := context.Background() |
436 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { | 565 err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { |