-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathparamserve.go
124 lines (103 loc) · 2.92 KB
/
paramserve.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package fastparamfetch
import (
"context"
"fmt"
"net/http"
"sync"
"github.com/filecoin-project/curio/deps"
"github.com/filecoin-project/curio/harmony/harmonydb"
"github.com/gorilla/mux"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
)
type ParamServe struct {
db *harmonydb.DB
lk sync.Mutex
allow map[string]bool // file CIDs
cidToFile map[string]string // mapping from CID string to file path
machineID int
}
func NewParamServe(db *harmonydb.DB, machineID int) *ParamServe {
return &ParamServe{
db: db,
allow: make(map[string]bool),
cidToFile: make(map[string]string),
machineID: machineID,
}
}
func (ps *ParamServe) allowCid(ctx context.Context, c cid.Cid, path string) {
ps.lk.Lock()
defer ps.lk.Unlock()
ps.allow[c.String()] = true
ps.cidToFile[c.String()] = path
// Insert into the database that this machine has this CID
err := ps.insertCidForMachine(ctx, c.String())
if err != nil {
log.Errorf("Failed to insert CID %s for machine: %v", c.String(), err)
}
}
func (ps *ParamServe) insertCidForMachine(ctx context.Context, cidStr string) error {
// Insert into paramfetch_urls (machine, cid)
_, err := ps.db.Exec(ctx, `INSERT INTO paramfetch_urls (machine, cid) VALUES ($1, $2) ON CONFLICT DO NOTHING`, ps.machineID, cidStr)
return err
}
func (ps *ParamServe) urlsForCid(ctx context.Context, c cid.Cid) ([]string, error) {
rows, err := ps.db.Query(ctx, `SELECT harmony_machines.host_and_port FROM harmony_machines
JOIN paramfetch_urls ON harmony_machines.id = paramfetch_urls.machine
WHERE paramfetch_urls.cid = $1`, c.String())
if err != nil {
return nil, err
}
defer rows.Close()
var urls []string
for rows.Next() {
var hostAndPort string
if err := rows.Scan(&hostAndPort); err != nil {
return nil, err
}
urls = append(urls, hostAndPort)
}
if err := rows.Err(); err != nil {
return nil, err
}
return urls, nil
}
func (ps *ParamServe) getFilePathForCid(c cid.Cid) (string, error) {
ps.lk.Lock()
defer ps.lk.Unlock()
filePath, ok := ps.cidToFile[c.String()]
if !ok {
return "", xerrors.Errorf("file path for CID %s not found", c.String())
}
return filePath, nil
}
func (ps *ParamServe) ServeHTTP(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
cidStr := vars["cid"]
if cidStr == "" {
http.Error(w, "CID not specified", http.StatusBadRequest)
return
}
// Parse the CID
c, err := cid.Parse(cidStr)
if err != nil {
http.Error(w, "Invalid CID", http.StatusBadRequest)
return
}
ps.lk.Lock()
allowed := ps.allow[c.String()]
ps.lk.Unlock()
if !allowed {
http.Error(w, "CID not allowed", http.StatusNotFound)
return
}
filePath, err := ps.getFilePathForCid(c)
if err != nil {
http.Error(w, "File not found", http.StatusNotFound)
return
}
http.ServeFile(w, r, filePath)
}
func Routes(r *mux.Router, deps *deps.Deps, serve *ParamServe) {
r.Methods("GET", "HEAD").Path("/params/ipfs/{cid}").HandlerFunc(serve.ServeHTTP)
}