Skip to content

Commit e26c8cf

Browse files
committed
Uploading directories - first draft
1 parent 6f62afe commit e26c8cf

File tree

14 files changed

+707
-2
lines changed

14 files changed

+707
-2
lines changed

codex/blockexchange/engine/advertiser.nim

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ import ../../stores/blockstore
2525
import ../../logutils
2626
import ../../manifest
2727

28+
# tarballs
29+
import ../../tarballs/[directorymanifest, decoding]
30+
2831
logScope:
2932
topics = "codex discoveryengine advertiser"
3033

@@ -66,7 +69,11 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError])
6669
return
6770

6871
without manifest =? Manifest.decode(blk), err:
69-
error "Unable to decode as manifest", err = err.msg
72+
# Try if it not a directory manifest
73+
without manifest =? DirectoryManifest.decode(blk), err:
74+
error "Unable to decode as manifest", err = err.msg
75+
return
76+
await b.addCidToQueue(cid)
7077
return
7178

7279
# announce manifest cid and tree cid

codex/node.nim

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import std/options
1313
import std/sequtils
1414
import std/strformat
1515
import std/sugar
16+
import std/streams
1617
import times
1718

1819
import pkg/taskpools
@@ -47,6 +48,9 @@ import ./logutils
4748
import ./utils/asynciter
4849
import ./utils/trackedfutures
4950

51+
# tarball
52+
import ./tarballs/[tarballs, encoding, decoding, stdstreamwrapper, directorymanifest]
53+
5054
export logutils
5155

5256
logScope:
@@ -132,6 +136,31 @@ proc fetchManifest*(self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.}
132136

133137
return manifest.success
134138

139+
proc fetchDirectoryManifest*(
140+
self: CodexNodeRef, cid: Cid
141+
): Future[?!DirectoryManifest] {.async.} =
142+
## Fetch and decode a manifest block
143+
##
144+
145+
if err =? cid.isManifest.errorOption:
146+
return failure "CID has invalid content type for manifest {$cid}"
147+
148+
trace "Retrieving manifest for cid", cid
149+
150+
without blk =? await self.networkStore.getBlock(BlockAddress.init(cid)), err:
151+
trace "Error retrieve manifest block", cid, err = err.msg
152+
return failure err
153+
154+
trace "Decoding manifest for cid", cid
155+
156+
without manifest =? DirectoryManifest.decode(blk), err:
157+
trace "Unable to decode as manifest", err = err.msg
158+
return failure("Unable to decode as manifest")
159+
160+
trace "Decoded manifest", cid
161+
162+
manifest.success
163+
135164
proc findPeer*(self: CodexNodeRef, peerId: PeerId): Future[?PeerRecord] {.async.} =
136165
## Find peer using the discovery service from the given CodexNode
137166
##
@@ -486,6 +515,83 @@ proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
486515

487516
onManifest(cid, manifest)
488517

518+
proc storeDirectoryManifest*(
519+
self: CodexNodeRef, manifest: DirectoryManifest
520+
): Future[?!bt.Block] {.async.} =
521+
let encodedManifest = manifest.encode()
522+
523+
without blk =? bt.Block.new(data = encodedManifest, codec = ManifestCodec), error:
524+
trace "Unable to create block from manifest"
525+
return failure(error)
526+
527+
if err =? (await self.networkStore.putBlock(blk)).errorOption:
528+
trace "Unable to store manifest block", cid = blk.cid, err = err.msg
529+
return failure(err)
530+
531+
success blk
532+
533+
proc storeTarball*(
534+
self: CodexNodeRef, stream: AsyncStreamReader
535+
): Future[?!string] {.async.} =
536+
info "Storing tarball data"
537+
538+
# Just as a proof of concept, we process tar bar in memory
539+
# Later to see how to do actual streaming to either store
540+
# tarball locally in some tmp folder, or to process the
541+
# tarball incrementally
542+
let tarballBytes = await stream.read()
543+
let stream = newStringStream(string.fromBytes(tarballBytes))
544+
545+
proc onProcessedTarFile(
546+
stream: Stream, fileName: string
547+
): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} =
548+
try:
549+
echo "onProcessedTarFile:name: ", fileName
550+
let stream = newStdStreamWrapper(stream)
551+
await self.store(stream, filename = some fileName)
552+
except CancelledError as e:
553+
raise e
554+
except CatchableError as e:
555+
error "Error processing tar file", fileName, exc = e.msg
556+
return failure(e.msg)
557+
558+
proc onProcessedTarDir(
559+
name: string, cids: seq[Cid]
560+
): Future[?!Cid] {.gcsafe, async: (raises: [CancelledError]).} =
561+
try:
562+
echo "onProcessedTarDir:name: ", name
563+
echo "onProcessedTarDir:cids: ", cids
564+
let directoryManifest = newDirectoryManifest(name = name, cids = cids)
565+
without manifestBlk =? await self.storeDirectoryManifest(directoryManifest), err:
566+
error "Unable to store manifest"
567+
return failure(err)
568+
manifestBlk.cid.success
569+
except CancelledError as e:
570+
raise e
571+
except CatchableError as e:
572+
error "Error processing tar dir", name, exc = e.msg
573+
return failure(e.msg)
574+
575+
let tarball = Tarball()
576+
if err =? (await tarball.open(stream, onProcessedTarFile)).errorOption:
577+
error "Unable to open tarball", err = err.msg
578+
return failure(err)
579+
echo "tarball = ", $tarball
580+
without root =? tarball.findRootDir(), err:
581+
return failure(err.msg)
582+
echo "root = ", root
583+
let dirs = processDirEntries(tarball)
584+
echo "dirs = ", dirs
585+
without tree =? (await buildTree(root = root, dirs = dirs, onProcessedTarDir)), err:
586+
error "Unable to build tree", err = err.msg
587+
return failure(err)
588+
echo ""
589+
echo "preorderTraversal:"
590+
let json = newJArray()
591+
preorderTraversal(tree, json)
592+
echo "json = ", json
593+
success($json)
594+
489595
proc setupRequest(
490596
self: CodexNodeRef,
491597
cid: Cid,

codex/rest/api.nim

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import ../manifest
3939
import ../streams/asyncstreamwrapper
4040
import ../stores
4141
import ../utils/options
42+
# tarballs
43+
import ../tarballs/directorymanifest
4244

4345
import ./coders
4446
import ./json
@@ -52,6 +54,11 @@ declareCounter(codex_api_downloads, "codex API downloads")
5254
proc validate(pattern: string, value: string): int {.gcsafe, raises: [Defect].} =
5355
0
5456

57+
proc formatDirectoryManifest(
58+
cid: Cid, manifest: DirectoryManifest
59+
): RestDirectoryContent =
60+
return RestDirectoryContent.init(cid, manifest)
61+
5562
proc formatManifest(cid: Cid, manifest: Manifest): RestContent =
5663
return RestContent.init(cid, manifest)
5764

@@ -179,7 +186,76 @@ proc getFilenameFromContentDisposition(contentDisposition: string): ?string =
179186
return filename[0 ..^ 2].some
180187

181188
proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRouter) =
182-
let allowedOrigin = router.allowedOrigin # prevents capture inside of api defintion
189+
let allowedOrigin = router.allowedOrigin # prevents capture inside of api definition
190+
191+
router.api(MethodOptions, "/api/codex/v1/tar") do(
192+
resp: HttpResponseRef
193+
) -> RestApiResponse:
194+
if corsOrigin =? allowedOrigin:
195+
resp.setCorsHeaders("POST", corsOrigin)
196+
resp.setHeader(
197+
"Access-Control-Allow-Headers", "content-type, content-disposition"
198+
)
199+
200+
resp.status = Http204
201+
await resp.sendBody("")
202+
203+
router.rawApi(MethodPost, "/api/codex/v1/tar") do() -> RestApiResponse:
204+
## Upload a file in a streaming manner
205+
##
206+
207+
trace "Handling upload of a tar file"
208+
var bodyReader = request.getBodyReader()
209+
if bodyReader.isErr():
210+
return RestApiResponse.error(Http500, msg = bodyReader.error())
211+
212+
# Attempt to handle `Expect` header
213+
# some clients (curl), wait 1000ms
214+
# before giving up
215+
#
216+
await request.handleExpect()
217+
218+
var mimetype = request.headers.getString(ContentTypeHeader).some
219+
220+
if mimetype.get() != "":
221+
let mimetypeVal = mimetype.get()
222+
var m = newMimetypes()
223+
let extension = m.getExt(mimetypeVal, "")
224+
if extension == "":
225+
return RestApiResponse.error(
226+
Http422, "The MIME type '" & mimetypeVal & "' is not valid."
227+
)
228+
else:
229+
mimetype = string.none
230+
231+
const ContentDispositionHeader = "Content-Disposition"
232+
let contentDisposition = request.headers.getString(ContentDispositionHeader)
233+
let filename = getFilenameFromContentDisposition(contentDisposition)
234+
235+
if filename.isSome and not isValidFilename(filename.get()):
236+
return RestApiResponse.error(Http422, "The filename is not valid.")
237+
238+
# Here we could check if the extension matches the filename if needed
239+
240+
let reader = bodyReader.get()
241+
let stream = AsyncStreamReader(reader)
242+
243+
try:
244+
without json =? (await node.storeTarball(stream = stream)), error:
245+
error "Error uploading tarball", exc = error.msg
246+
return RestApiResponse.error(Http500, error.msg)
247+
248+
codex_api_uploads.inc()
249+
trace "Uploaded tarball", result = json
250+
return RestApiResponse.response(json, contentType = "application/json")
251+
except CancelledError:
252+
trace "Upload cancelled error"
253+
return RestApiResponse.error(Http500)
254+
except AsyncStreamError:
255+
trace "Async stream error"
256+
return RestApiResponse.error(Http500)
257+
finally:
258+
await reader.closeWait()
183259

184260
router.api(MethodOptions, "/api/codex/v1/data") do(
185261
resp: HttpResponseRef
@@ -363,6 +439,24 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
363439
let json = %formatManifest(cid.get(), manifest)
364440
return RestApiResponse.response($json, contentType = "application/json")
365441

442+
router.api(MethodGet, "/api/codex/v1/data/{cid}/network/dirmanifest") do(
443+
cid: Cid, resp: HttpResponseRef
444+
) -> RestApiResponse:
445+
## Download only the directory manifest.
446+
##
447+
448+
var headers = buildCorsHeaders("GET", allowedOrigin)
449+
450+
if cid.isErr:
451+
return RestApiResponse.error(Http400, $cid.error(), headers = headers)
452+
453+
without manifest =? (await node.fetchDirectoryManifest(cid.get())), err:
454+
error "Failed to fetch directory manifest", err = err.msg
455+
return RestApiResponse.error(Http404, err.msg, headers = headers)
456+
457+
let json = %formatDirectoryManifest(cid.get(), manifest)
458+
return RestApiResponse.response($json, contentType = "application/json")
459+
366460
router.api(MethodGet, "/api/codex/v1/space") do() -> RestApiResponse:
367461
let json =
368462
%RestRepoStore(

codex/rest/json.nim

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import ../utils/json
99
import ../manifest
1010
import ../units
1111

12+
import ../tarballs/directorymanifest
13+
1214
export json
1315

1416
type
@@ -47,6 +49,10 @@ type
4749
cid* {.serialize.}: Cid
4850
manifest* {.serialize.}: Manifest
4951

52+
RestDirectoryContent* = object
53+
cid* {.serialize.}: Cid
54+
manifest* {.serialize.}: DirectoryManifest
55+
5056
RestContentList* = object
5157
content* {.serialize.}: seq[RestContent]
5258

@@ -81,6 +87,11 @@ proc init*(_: type RestContentList, content: seq[RestContent]): RestContentList
8187
proc init*(_: type RestContent, cid: Cid, manifest: Manifest): RestContent =
8288
RestContent(cid: cid, manifest: manifest)
8389

90+
proc init*(
91+
_: type RestDirectoryContent, cid: Cid, manifest: DirectoryManifest
92+
): RestDirectoryContent =
93+
RestDirectoryContent(cid: cid, manifest: manifest)
94+
8495
proc init*(_: type RestNode, node: dn.Node): RestNode =
8596
RestNode(
8697
nodeId: RestNodeId.init(node.id),

codex/tarballs/decoding.nim

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
## Nim-Codex
2+
## Copyright (c) 2021 Status Research & Development GmbH
3+
## Licensed under either of
4+
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
5+
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
6+
## at your option.
7+
## This file may not be copied, modified, or distributed except according to
8+
## those terms.
9+
10+
{.push raises: [].}
11+
12+
import pkg/libp2p/cid
13+
import pkg/libp2p/multihash
14+
import pkg/libp2p/protobuf/minprotobuf
15+
16+
import pkg/questionable/results
17+
18+
import ../blocktype
19+
import ./directorymanifest
20+
21+
func decode*(_: type DirectoryManifest, data: openArray[byte]): ?!DirectoryManifest =
22+
# ```protobuf
23+
# Message DirectoryManifest {
24+
# Message Cid {
25+
# bytes data = 1;
26+
# }
27+
# string name = 1;
28+
# repeated Cid cids = 2;
29+
# ```
30+
31+
var
32+
pbNode = initProtoBuffer(data)
33+
pbInfo: ProtoBuffer
34+
name: string
35+
cids: seq[Cid]
36+
cidsBytes: seq[seq[byte]]
37+
38+
if pbNode.getField(1, name).isErr:
39+
return failure("Unable to decode `name` from DirectoryManifest")
40+
41+
if ?pbNode.getRepeatedField(2, cidsBytes).mapFailure:
42+
for cidEntry in cidsBytes:
43+
var pbCid = initProtoBuffer(cidEntry)
44+
var dataBuf = newSeq[byte]()
45+
if pbCid.getField(1, dataBuf).isErr:
46+
return failure("Unable to decode piece `data` to Cid")
47+
without cid =? Cid.init(dataBuf).mapFailure, err:
48+
return failure(err.msg)
49+
cids.add(cid)
50+
51+
DirectoryManifest(name: name, cids: cids).success
52+
53+
func decode*(_: type DirectoryManifest, blk: Block): ?!DirectoryManifest =
54+
## Decode a directory manifest using `decoder`
55+
##
56+
57+
if not ?blk.cid.isManifest:
58+
return failure "Cid is not a Directory Manifest Cid"
59+
60+
DirectoryManifest.decode(blk.data)

0 commit comments

Comments
 (0)