-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
133 lines (120 loc) · 3.36 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"context"
"fmt"
apiGrpc "github.com/awakari/source-websocket/api/grpc"
"github.com/awakari/source-websocket/api/http/pub"
"github.com/awakari/source-websocket/config"
"github.com/awakari/source-websocket/model"
"github.com/awakari/source-websocket/service"
"github.com/awakari/source-websocket/service/converter"
"github.com/awakari/source-websocket/service/handler"
"github.com/awakari/source-websocket/storage/mongo"
"log/slog"
"net/http"
"os"
"strconv"
"strings"
"sync"
)
func main() {
cfg, err := config.NewConfigFromEnv()
if err != nil {
panic(fmt.Sprintf("failed to load the config from env: %s", err))
}
opts := slog.HandlerOptions{
Level: slog.Level(cfg.Log.Level),
}
log := slog.New(slog.NewTextHandler(os.Stdout, &opts))
log.Info("starting the update for the feeds")
// determine the replica index
replicaNameParts := strings.Split(cfg.Replica.Name, "-")
if len(replicaNameParts) < 2 {
panic("unable to parse the replica name: " + cfg.Replica.Name)
}
var replicaIndex int
replicaIndex, err = strconv.Atoi(replicaNameParts[len(replicaNameParts)-1])
if err != nil {
panic(err)
}
if replicaIndex < 0 {
panic(fmt.Sprintf("Negative replica index: %d", replicaIndex))
}
log.Info(fmt.Sprintf("Replica: %d", replicaIndex))
svcPub := pub.NewService(http.DefaultClient, cfg.Api.Writer.Uri, cfg.Api.Token.Internal, cfg.Api.Writer.Timeout)
svcPub = pub.NewLogging(svcPub, log)
log.Info("initialized the Awakari publish API client")
ctx := context.Background()
stor, err := mongo.NewStorage(ctx, cfg.Db)
if err != nil {
panic(err)
}
defer stor.Close()
conv := converter.NewService(cfg.Api.Events.Type)
conv = converter.NewLogging(conv, log)
handlersLock := &sync.Mutex{}
handlerByUrl := make(map[string]handler.Handler)
handlerFactory := handler.NewFactory(cfg.Api, conv, svcPub, log)
svc := service.NewService(stor, uint32(replicaIndex), handlersLock, handlerByUrl, handlerFactory)
svc = service.NewServiceLogging(svc, log)
err = resumeHandlers(ctx, log, svc, uint32(replicaIndex), handlersLock, handlerByUrl, handlerFactory)
if err != nil {
panic(err)
}
log.Info(fmt.Sprintf("starting to listen the gRPC API @ port #%d...", cfg.Api.Port))
err = apiGrpc.Serve(cfg.Api.Port, svc)
if err != nil {
panic(err)
}
}
func resumeHandlers(
ctx context.Context,
log *slog.Logger,
svc service.Service,
replicaIndex uint32,
handlersLock *sync.Mutex,
handlerByUrl map[string]handler.Handler,
handlerFactory handler.Factory,
) (err error) {
var cursor string
var urls []string
var str model.Stream
for {
urls, err = svc.List(ctx, 100, model.Filter{}, model.OrderAsc, cursor)
if err == nil {
if len(urls) == 0 {
break
}
cursor = urls[len(urls)-1]
for _, url := range urls {
str, err = svc.Read(ctx, url)
if err == nil && str.Replica == replicaIndex {
resumeHandler(ctx, log, url, str, handlersLock, handlerByUrl, handlerFactory)
}
if err != nil {
break
}
}
}
if err != nil {
break
}
}
return
}
func resumeHandler(
ctx context.Context,
log *slog.Logger,
url string,
str model.Stream,
handlersLock *sync.Mutex,
handlerByUrl map[string]handler.Handler,
handlerFactory handler.Factory,
) {
handlersLock.Lock()
defer handlersLock.Unlock()
h := handlerFactory(url, str)
handlerByUrl[url] = h
go h.Handle(ctx)
log.Info(fmt.Sprintf("resumed handler for %s", url))
}