Skip to content

Commit

Permalink
code refactoring (code moved around, comment fixes, error handling im…
Browse files Browse the repository at this point in the history
…provement)
  • Loading branch information
consolethinks committed Oct 16, 2024
1 parent f497d0f commit 6faa78b
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 36 deletions.
29 changes: 3 additions & 26 deletions internal/core/ingestdataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,34 +133,13 @@ func IngestDataset(
return "", err
}

// extract dataset folder path and metadata map
datasetFolder := ingestionTask.DatasetFolder.FolderPath
var metaDataMap map[string]interface{}
var datasetFolder string
if len(ingestionTask.DatasetMetadata) > 0 {
metaDataMap = ingestionTask.DatasetMetadata

var ok bool
_, ok = metaDataMap["sourceFolder"]
if !ok {
return "", errors.New("no sourceFolder specified in metadata")
}
switch v := metaDataMap["sourceFolder"].(type) {
case string:
datasetFolder = v
default:
return "", errors.New("sourceFolder in metadata isn't a string")
}

fileInfo, err := os.Stat(datasetFolder)
if err != nil {
return "", err
}
if !fileInfo.IsDir() {
return "", errors.New("'sourceFolder' is not a directory")
}
} else {
// check if dataset already exists (identified by source folder)
var err error
datasetFolder = ingestionTask.DatasetFolder.FolderPath
metadatafile := filepath.Join(datasetFolder, "metadata.json")
if _, err = os.Stat(metadatafile); errors.Is(err, os.ErrNotExist) {
return "", err
Expand All @@ -172,9 +151,7 @@ func IngestDataset(
}
}

// HACK: use ownerGroup as the accessGroup
// TODO: replace with SciCat backend userInfo check from scicat-cli!

// check if dataset already exists (identified by source folder)
_, _, err = scicat.CheckMetadata(http_client, SCICAT_API_URL, metaDataMap, user, accessGroups)
if err != nil {
return "", err
Expand Down
31 changes: 29 additions & 2 deletions internal/core/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -54,12 +55,38 @@ func (w *TaskQueue) CreateTaskFromDatasetFolder(folder task.DatasetFolder) error
return nil
}

func (w *TaskQueue) CreateTaskFromMetadata(id uuid.UUID, metadata map[string]interface{}) {
func (w *TaskQueue) CreateTaskFromMetadata(id uuid.UUID, metadataMap map[string]interface{}) error {
transferMethod := w.getTransferMethod()
task := task.CreateIngestionTask(task.DatasetFolder{Id: id}, metadata, transferMethod, nil)
task := task.CreateIngestionTask(task.DatasetFolder{Id: id}, metadataMap, transferMethod, nil)

// extract dataset folder path (sourceFolder)
var ok bool
_, ok = metadataMap["sourceFolder"]
if !ok {
return errors.New("no sourceFolder specified in metadata")
}
switch v := metadataMap["sourceFolder"].(type) {
case string:
task.DatasetFolder.FolderPath = v
default:
return errors.New("sourceFolder in metadata isn't a string")
}

// check if the folder exists
fileInfo, err := os.Stat(task.DatasetFolder.FolderPath)
if err != nil {
return err
}
if !fileInfo.IsDir() {
return errors.New("'sourceFolder' is not a directory")
}

// add to task list
w.taskListLock.Lock()
defer w.taskListLock.Unlock()
w.datasetUploadTasks.Set(id, task)

return nil
}

// Go routine that listens on the channel continously for upload requests and executes uploads.
Expand Down
24 changes: 16 additions & 8 deletions internal/webserver/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"math"
"net/http"
"os"

"github.com/SwissOpenEM/Ingestor/internal/core"
"github.com/gin-gonic/gin"
Expand Down Expand Up @@ -49,12 +50,12 @@ func (i *IngestorWebServerImplemenation) DatasetControllerIngestDataset(c *gin.C
// convert body to struct
reqBody, err := io.ReadAll(c.Request.Body)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Failed to read request body: %v", err)})
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Failed to read request body: %s", err.Error())})
return
}
err = json.Unmarshal(reqBody, &request)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Invalid JSON data: %v", err)})
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Invalid JSON data: %s", err.Error())})
return
}
if request.MetaData == nil {
Expand All @@ -67,13 +68,20 @@ func (i *IngestorWebServerImplemenation) DatasetControllerIngestDataset(c *gin.C
var metadata map[string]interface{}
err = json.Unmarshal([]byte(metadataString), &metadata)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Metadata is not a valid JSON document: %v", err)})
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Metadata is not a valid JSON document: %s", err.Error())})
return
}

// create and start task
id := uuid.New()
i.taskQueue.CreateTaskFromMetadata(id, metadata)
err = i.taskQueue.CreateTaskFromMetadata(id, metadata)
if err != nil {
if _, ok := err.(*os.PathError); ok {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("Could not create the task due to a path error: %s", err.Error())})
} else {
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Invalid metadata: %s", err.Error())})
}
}
i.taskQueue.ScheduleTask(id)

// NOTE: because of the way the tasks are created, right now it'll search for a metadata.json
Expand Down Expand Up @@ -116,12 +124,12 @@ func (i *IngestorWebServerImplemenation) TransferControllerDeleteTransfer(c *gin

reqBody, err := io.ReadAll(c.Request.Body)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Failed to read request body: %v", err)})
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Failed to read request body: %s", err.Error())})
return
}
err = json.Unmarshal(reqBody, &request)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Invalid JSON data: %v", err)})
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Invalid JSON data: %s", err.Error())})
return
}
if request.IngestId == nil {
Expand All @@ -132,7 +140,7 @@ func (i *IngestorWebServerImplemenation) TransferControllerDeleteTransfer(c *gin
id := *request.IngestId
uuid, err := uuid.Parse(id)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Ingest ID '%s' could not be parsed as uuid: %v", id, err)})
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Ingest ID '%s' could not be parsed as uuid: %s", id, err.Error())})
return
}

Expand Down Expand Up @@ -164,7 +172,7 @@ func (i *IngestorWebServerImplemenation) TransferControllerGetTransfer(c *gin.Co
id := *params.TransferId
uid, err := uuid.Parse(id)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Can't parse UUID: %v", err)})
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("Can't parse UUID: %s", err.Error())})
return
}

Expand Down

0 comments on commit 6faa78b

Please sign in to comment.