-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinit.go
More file actions
128 lines (108 loc) · 3.53 KB
/
init.go
File metadata and controls
128 lines (108 loc) · 3.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package peerdb
import (
"context"
"github.com/elastic/go-elasticsearch/v9"
"github.com/hashicorp/go-cleanhttp"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog"
"gitlab.com/tozd/go/errors"
internalBase "gitlab.com/peerdb/peerdb/internal/base"
internalSearch "gitlab.com/peerdb/peerdb/internal/search"
internalStore "gitlab.com/peerdb/peerdb/internal/store"
)
// init initializes the store, coordinator, storage, and bridge for a specific site.
//
// It can be called multiple times. In that case it will not initialize again if
// the site has already been initialized.
func (s *Site) init(ctx context.Context, logger zerolog.Logger, dbpool *pgxpool.Pool, esClient *elasticsearch.TypedClient, shards int) (func(), errors.E) {
if s.initialized {
return nil, nil //nolint:nilnil
}
s.initialized = true
logger = logger.With().Str("schema", s.Schema).Str("index", s.Index).Logger()
ctx = WithFallbackDBContext(ctx, s.Schema, "init")
ctx = logger.WithContext(ctx)
b, riverClient, onShutdown, errE := internalBase.InitAndStartComponents(ctx, logger, dbpool, esClient, s.Schema, s.Index, shards, s.LanguagePriority)
if errE != nil {
return onShutdown, errE
}
s.Base = b
s.DBPool = dbpool
s.ESClient = esClient
s.RiverClient = riverClient
errE = s.initDebugRiverHandler(ctx, logger)
if errE != nil {
return onShutdown, errE
}
return onShutdown, nil
}
// Init initializes PeerDB for all sites defined in globals.
//
// It establishes connections to PostgreSQL database and ElasticSearch.
// It configures PostgreSQL schemas and ElasticSearch indices.
//
// It can be called multiple times. In that case it will initialize only
// sites which have not been initialized yet.
//
// You have to run site.Start for each site after this call to start the
// base for each site.
func Init(ctx context.Context, globals *Globals) (func(), errors.E) {
var dbpool *pgxpool.Pool
var esClient *elasticsearch.TypedClient
// First we check if any site have them initialized already.
for _, site := range globals.Sites {
if dbpool == nil && site.DBPool != nil {
dbpool = site.DBPool
}
if esClient == nil && site.ESClient != nil {
esClient = site.ESClient
}
if dbpool != nil && esClient != nil {
break
}
}
onShutdown := []func(){}
onShutdownF := func() {
for _, f := range onShutdown {
f()
}
}
// Initialize for the first time.
if dbpool == nil {
var errE errors.E
var dbpoolCleanup func()
// We use context.WithoutCancel here because we want to cancel the pool ourselves and not when context
// is cancelled (so that cleanup code which needs PostgreSQL access can continue to use connections).
dbpool, dbpoolCleanup, errE = internalStore.InitPostgres(
context.WithoutCancel(ctx),
string(globals.Postgres.URL),
globals.Logger,
getRequestWithFallback(),
)
if errE != nil {
return nil, errE
}
// We want dbpoolCleanup to be last.
onShutdown = append(onShutdown, dbpoolCleanup)
}
// Initialize for the first time.
if esClient == nil {
var errE errors.E
esClient, errE = internalSearch.GetClient(cleanhttp.DefaultPooledClient(), globals.Logger, globals.Elastic.URL)
if errE != nil {
return onShutdownF, errE
}
}
for i := range globals.Sites {
site := &globals.Sites[i]
onS, errE := site.init(ctx, globals.Logger, dbpool, esClient, globals.Elastic.Shards)
// We want existing onShutdown functions (e.g., dbpool.Close) to be last.
if onS != nil {
onShutdown = append([]func(){onS}, onShutdown...)
}
if errE != nil {
return onShutdownF, errE
}
}
return onShutdownF, nil
}