This commit is contained in:
2024-04-21 14:42:52 +02:00
parent 4b69674ede
commit 8a25f53c99
10700 changed files with 55767 additions and 14201 deletions

View File

@ -0,0 +1,73 @@
package pipeline
import (
"errors"
"voltaserve/client"
"voltaserve/core"
"voltaserve/identifier"
)
type Dispatcher struct {
pipelineIdentifier *identifier.PipelineIdentifier
pdfPipeline core.Pipeline
imagePipeline core.Pipeline
officePipeline core.Pipeline
videoPipeline core.Pipeline
apiClient *client.APIClient
}
func NewDispatcher() *Dispatcher {
return &Dispatcher{
pipelineIdentifier: identifier.NewPipelineIdentifier(),
pdfPipeline: NewPDFPipeline(),
imagePipeline: NewImagePipeline(),
officePipeline: NewOfficePipeline(),
videoPipeline: NewVideoPipeline(),
apiClient: client.NewAPIClient(),
}
}
func (d *Dispatcher) Dispatch(opts core.PipelineRunOptions) error {
if err := d.apiClient.UpdateSnapshot(core.SnapshotUpdateOptions{
Options: opts,
Status: core.SnapshotStatusProcessing,
}); err != nil {
return err
}
p := d.pipelineIdentifier.Identify(opts)
var err error
if p == core.PipelinePDF {
err = d.pdfPipeline.Run(opts)
} else if p == core.PipelineOffice {
err = d.officePipeline.Run(opts)
} else if p == core.PipelineImage {
err = d.imagePipeline.Run(opts)
} else if p == core.PipelineVideo {
err = d.videoPipeline.Run(opts)
} else {
if err := d.apiClient.UpdateSnapshot(core.SnapshotUpdateOptions{
Options: opts,
Status: core.SnapshotStatusError,
}); err != nil {
return err
}
return errors.New("no matching pipeline found")
}
if err != nil {
if err := d.apiClient.UpdateSnapshot(core.SnapshotUpdateOptions{
Options: opts,
Status: core.SnapshotStatusError,
}); err != nil {
return err
}
return nil
} else {
if err := d.apiClient.UpdateSnapshot(core.SnapshotUpdateOptions{
Options: opts,
Status: core.SnapshotStatusReady,
}); err != nil {
return err
}
return nil
}
}

View File

@ -0,0 +1,99 @@
package pipeline
import (
"os"
"path/filepath"
"voltaserve/client"
"voltaserve/config"
"voltaserve/core"
"voltaserve/helper"
"voltaserve/identifier"
"voltaserve/infra"
"voltaserve/processor"
"go.uber.org/zap"
)
type imagePipeline struct {
imageProc *processor.ImageProcessor
s3 *infra.S3Manager
apiClient *client.APIClient
fileIdent *identifier.FileIdentifier
logger *zap.SugaredLogger
config config.Config
}
func NewImagePipeline() core.Pipeline {
logger, err := infra.GetLogger()
if err != nil {
panic(err)
}
return &imagePipeline{
imageProc: processor.NewImageProcessor(),
s3: infra.NewS3Manager(),
apiClient: client.NewAPIClient(),
fileIdent: identifier.NewFileIdentifier(),
logger: logger,
config: config.GetConfig(),
}
}
func (p *imagePipeline) Run(opts core.PipelineRunOptions) error {
inputPath := filepath.FromSlash(os.TempDir() + "/" + helper.NewID() + filepath.Ext(opts.Key))
if err := p.s3.GetFile(opts.Key, inputPath, opts.Bucket); err != nil {
return err
}
stat, err := os.Stat(inputPath)
if err != nil {
return err
}
imageProps, err := p.imageProc.MeasureImage(inputPath)
if err != nil {
return err
}
updateOpts := core.SnapshotUpdateOptions{
Options: opts,
Original: &core.S3Object{
Bucket: opts.Bucket,
Key: opts.Key,
Image: &imageProps,
Size: stat.Size(),
},
}
if filepath.Ext(inputPath) == ".tiff" {
jpegPath := filepath.FromSlash(os.TempDir() + "/" + helper.NewID() + ".jpg")
if err := p.imageProc.ConvertImage(inputPath, jpegPath); err != nil {
return err
}
thumbnail, err := p.imageProc.Base64Thumbnail(jpegPath)
if err != nil {
return err
}
updateOpts.Thumbnail = &thumbnail
updateOpts.Preview = &core.S3Object{
Bucket: opts.Bucket,
Key: opts.FileID + "/" + opts.SnapshotID + "/preview.jpg",
Size: stat.Size(),
}
if err := p.s3.PutFile(updateOpts.Preview.Key, jpegPath, helper.DetectMimeFromFile(jpegPath), updateOpts.Preview.Bucket); err != nil {
return err
}
if err := os.Remove(inputPath); err != nil {
return err
}
inputPath = jpegPath
} else {
thumbnail, err := p.imageProc.Base64Thumbnail(inputPath)
if err != nil {
return err
}
updateOpts.Thumbnail = &thumbnail
}
if err := p.apiClient.UpdateSnapshot(updateOpts); err != nil {
return err
}
if err := os.Remove(inputPath); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,86 @@
package pipeline
import (
"os"
"path/filepath"
"voltaserve/client"
"voltaserve/config"
"voltaserve/core"
"voltaserve/helper"
"voltaserve/infra"
"voltaserve/processor"
)
type officePipeline struct {
pdfPipeline core.Pipeline
officeProc *processor.OfficeProcessor
pdfProc *processor.PDFProcessor
s3 *infra.S3Manager
config config.Config
apiClient *client.APIClient
}
func NewOfficePipeline() core.Pipeline {
return &officePipeline{
pdfPipeline: NewPDFPipeline(),
officeProc: processor.NewOfficeProcessor(),
pdfProc: processor.NewPDFProcessor(),
s3: infra.NewS3Manager(),
config: config.GetConfig(),
apiClient: client.NewAPIClient(),
}
}
func (p *officePipeline) Run(opts core.PipelineRunOptions) error {
inputPath := filepath.FromSlash(os.TempDir() + "/" + helper.NewID() + filepath.Ext(opts.Key))
if err := p.s3.GetFile(opts.Key, inputPath, opts.Bucket); err != nil {
return err
}
outputPath, err := p.officeProc.PDF(inputPath)
if err != nil {
return err
}
stat, err := os.Stat(outputPath)
if err != nil {
return err
}
thumbnail, err := p.pdfProc.Base64Thumbnail(outputPath)
if err != nil {
return err
}
if err := p.apiClient.UpdateSnapshot(core.SnapshotUpdateOptions{
Options: opts,
Thumbnail: &thumbnail,
}); err != nil {
return err
}
previewKey := opts.FileID + "/" + opts.SnapshotID + "/preview.pdf"
if err := p.s3.PutFile(previewKey, outputPath, helper.DetectMimeFromFile(outputPath), opts.Bucket); err != nil {
return err
}
if err := p.apiClient.UpdateSnapshot(core.SnapshotUpdateOptions{
Options: opts,
Preview: &core.S3Object{
Bucket: opts.Bucket,
Key: previewKey,
Size: stat.Size(),
},
}); err != nil {
return err
}
if err := p.pdfPipeline.Run(core.PipelineRunOptions{
Bucket: opts.Bucket,
Key: previewKey,
FileID: opts.FileID,
SnapshotID: opts.SnapshotID,
}); err != nil {
return err
}
if err := os.Remove(inputPath); err != nil {
return err
}
if err := os.Remove(outputPath); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,82 @@
package pipeline
import (
"os"
"path/filepath"
"voltaserve/client"
"voltaserve/config"
"voltaserve/core"
"voltaserve/helper"
"voltaserve/identifier"
"voltaserve/infra"
"voltaserve/processor"
"go.uber.org/zap"
)
type pdfPipeline struct {
pdfProc *processor.PDFProcessor
imageProc *processor.ImageProcessor
s3 *infra.S3Manager
apiClient *client.APIClient
fileIdent *identifier.FileIdentifier
logger *zap.SugaredLogger
config config.Config
}
func NewPDFPipeline() core.Pipeline {
logger, err := infra.GetLogger()
if err != nil {
panic(err)
}
return &pdfPipeline{
pdfProc: processor.NewPDFProcessor(),
imageProc: processor.NewImageProcessor(),
s3: infra.NewS3Manager(),
apiClient: client.NewAPIClient(),
fileIdent: identifier.NewFileIdentifier(),
logger: logger,
config: config.GetConfig(),
}
}
func (p *pdfPipeline) Run(opts core.PipelineRunOptions) error {
inputPath := filepath.FromSlash(os.TempDir() + "/" + helper.NewID() + filepath.Ext(opts.Key))
if err := p.s3.GetFile(opts.Key, inputPath, opts.Bucket); err != nil {
return err
}
thumbnail, err := p.pdfProc.Base64Thumbnail(inputPath)
if err != nil {
return err
}
if err := p.apiClient.UpdateSnapshot(core.SnapshotUpdateOptions{
Options: opts,
Thumbnail: &thumbnail,
}); err != nil {
return err
}
text, err := p.pdfProc.TextFromPDF(inputPath)
if err != nil {
p.logger.Named(infra.StrPipeline).Errorw(err.Error())
}
textKey := opts.FileID + "/" + opts.SnapshotID + "/text.txt"
if text != "" && err == nil {
if err := p.s3.PutText(textKey, text, "text/plain", opts.Bucket); err != nil {
return err
}
}
if err := p.apiClient.UpdateSnapshot(core.SnapshotUpdateOptions{
Options: opts,
Text: &core.S3Object{
Bucket: opts.Bucket,
Key: textKey,
Size: int64(len(text)),
},
}); err != nil {
return err
}
if err := os.Remove(inputPath); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,49 @@
package pipeline
import (
"os"
"path/filepath"
"voltaserve/client"
"voltaserve/core"
"voltaserve/helper"
"voltaserve/identifier"
"voltaserve/infra"
"voltaserve/processor"
)
type videoPipeline struct {
pipelineIdentifier *identifier.PipelineIdentifier
videoProc *processor.VideoProcessor
s3 *infra.S3Manager
apiClient *client.APIClient
}
func NewVideoPipeline() core.Pipeline {
return &videoPipeline{
pipelineIdentifier: identifier.NewPipelineIdentifier(),
videoProc: processor.NewVideoProcessor(),
s3: infra.NewS3Manager(),
apiClient: client.NewAPIClient(),
}
}
func (p *videoPipeline) Run(opts core.PipelineRunOptions) error {
inputPath := filepath.FromSlash(os.TempDir() + "/" + helper.NewID() + filepath.Ext(opts.Key))
if err := p.s3.GetFile(opts.Key, inputPath, opts.Bucket); err != nil {
return err
}
thumbnail, err := p.videoProc.Base64Thumbnail(inputPath)
if err != nil {
return err
}
if err := p.apiClient.UpdateSnapshot(core.SnapshotUpdateOptions{
Options: opts,
Thumbnail: &thumbnail,
}); err != nil {
return err
}
if err := os.Remove(inputPath); err != nil {
return err
}
return nil
}