Skip to content

Commit 1925cfe

Browse files
committedAug 7, 2019
chore: adding prettier and improving tooling
1 parent 9e96c6e commit 1925cfe

26 files changed

+4077
-2344
lines changed
 

‎.eslintignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
coverage

‎.eslintrc

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
2+
{
3+
"env": {
4+
"node": true,
5+
"es6": true
6+
},
7+
"plugins": [
8+
"prettier"
9+
],
10+
"extends": [
11+
"standard",
12+
"plugin:prettier/recommended"
13+
]
14+
}

‎.prettierrc

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"semi": false,
3+
"singleQuote": true
4+
}

‎package-lock.json

+3,249-1,799
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎package.json

+42-6
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,37 @@
44
"description": "A libp2p compatible pubsub module of the pulsarcast system",
55
"main": "src/index.js",
66
"scripts": {
7+
"fmt": "prettier --write '{,!(coverage)/**/}*.js' && eslint --fix .",
8+
"fmt:ci": "prettier -l '{,!(coverage)/**/}*.js'",
9+
"lint": "eslint .",
710
"test": "nyc --reporter=html --reporter=text mocha --recursive --exit",
8-
"test:ci": "npm run lint && npm test",
9-
"lint": "standard . | snazzy"
11+
"test:ci": "npm run lint && npm run fmt:ci && npm test"
12+
},
13+
"nyc": {
14+
"exclude": [
15+
"test/**",
16+
"coverage/*",
17+
"npm-debug.log",
18+
".nyc_output"
19+
],
20+
"report-dir": "./coverage",
21+
"cache": true,
22+
"all": true,
23+
"extension": [
24+
".js"
25+
]
26+
},
27+
"husky": {
28+
"hooks": {
29+
"pre-commit": "lint-staged && npm run test"
30+
}
31+
},
32+
"lint-staged": {
33+
"*.js": [
34+
"eslint --fix",
35+
"prettier --write",
36+
"git add"
37+
]
1038
},
1139
"pre-commit": [
1240
"lint",
@@ -49,9 +77,17 @@
4977
"libp2p-spdy": "^0.13.1",
5078
"libp2p-tcp": "jgantunes/js-libp2p-tcp",
5179
"mocha": "^5.2.0",
52-
"nyc": "^13.3.0",
53-
"pre-commit": "^1.2.2",
54-
"snazzy": "^7.1.1",
55-
"standard": "^11.0.1"
80+
"nyc": "^14.0.0",
81+
"eslint": "^5.11.1",
82+
"eslint-config-prettier": "^3.3.0",
83+
"eslint-config-standard": "^12.0.0",
84+
"eslint-plugin-import": "^2.14.0",
85+
"eslint-plugin-node": "^8.0.0",
86+
"eslint-plugin-prettier": "^3.0.1",
87+
"eslint-plugin-promise": "^4.0.1",
88+
"eslint-plugin-standard": "^4.0.0",
89+
"husky": "^1.3.0",
90+
"lint-staged": "^8.1.0",
91+
"prettier": "^1.15.3"
5692
}
5793
}

‎src/dag/event-node.js

+13-14
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,10 @@ const dagCBOR = require('ipld-dag-cbor')
66
const CID = require('cids')
77

88
const config = require('../config')
9-
const {
10-
linkUnmarshalling,
11-
linkMarshalling
12-
} = require('./utils')
9+
const { linkUnmarshalling, linkMarshalling } = require('./utils')
1310

1411
class EventNode {
15-
constructor (topicCID, author, payload, options = {}) {
12+
constructor(topicCID, author, payload, options = {}) {
1613
// TODO check it is a CID maybe?
1714
assert(topicCID, 'Need a topicCID object to create an event node')
1815
assert(author, 'Need an author to create an event node')
@@ -27,9 +24,11 @@ class EventNode {
2724
this.metadata = createMetadata(options.metadata)
2825
}
2926

30-
static deserialize (event) {
27+
static deserialize(event) {
3128
const topicCID = linkUnmarshalling(event.topic)
32-
const publisher = event.publisher ? PeerId.createFromBytes(event.publisher) : null
29+
const publisher = event.publisher
30+
? PeerId.createFromBytes(event.publisher)
31+
: null
3332
const author = PeerId.createFromBytes(event.author)
3433
const payload = event.payload
3534
const parent = linkUnmarshalling(event.parent)
@@ -41,18 +40,18 @@ class EventNode {
4140
})
4241
}
4342

44-
static deserializeCBOR (event, cb) {
43+
static deserializeCBOR(event, cb) {
4544
dagCBOR.util.deserialize(event, (err, result) => {
4645
if (err) return cb(err)
4746
cb(null, EventNode.deserialize(result))
4847
})
4948
}
5049

51-
get isPublished () {
50+
get isPublished() {
5251
return Boolean(this.publisher)
5352
}
5453

55-
getReadableFormat () {
54+
getReadableFormat() {
5655
return {
5756
topicCID: this.topicCID.toBaseEncodedString(),
5857
author: this.author.toB58String(),
@@ -64,11 +63,11 @@ class EventNode {
6463
}
6564
}
6665

67-
getCID (cb) {
66+
getCID(cb) {
6867
dagCBOR.util.cid(this.serialize(), cb)
6968
}
7069

71-
serialize () {
70+
serialize() {
7271
return {
7372
topic: linkMarshalling(this.topicCID),
7473
publisher: this.isPublished ? this.publisher.toBytes() : null,
@@ -82,13 +81,13 @@ class EventNode {
8281
}
8382
}
8483

85-
serializeCBOR (cb) {
84+
serializeCBOR(cb) {
8685
const serialized = this.serialize()
8786
dagCBOR.util.serialize(serialized, cb)
8887
}
8988
}
9089

91-
function createMetadata ({
90+
function createMetadata({
9291
created = new Date(),
9392
protocolVersion = config.protocol
9493
} = {}) {

‎src/dag/event-tree.js

+7-10
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const log = require('../utils/logger')
77
// TODO right now memory usage grows indefinitely
88
class EventTree {
99
// TODO probably need access to the DHT?
10-
constructor (topicNode) {
10+
constructor(topicNode) {
1111
// TODO check it is a CID maybe?
1212
assert(topicNode, 'Need a topicNode object to create an event tree')
1313
log.trace(`New event tree for topic ${topicNode.name}`)
@@ -21,24 +21,22 @@ class EventTree {
2121
})
2222
}
2323

24-
addNew (eventNode, options, cb) {
24+
addNew(eventNode, options, cb) {
2525
const done = cb || options
26-
const {parent} = options
26+
const { parent } = options
2727
const eventLinking = this.topicNode.metadata.eventLinking
2828

2929
if (eventLinking === 'custom' && !parent) {
3030
return done(new Error('Event requires custom parent to be provided'))
3131
}
3232

3333
// Set the parent link
34-
eventNode.parent = eventLinking === 'custom'
35-
? parent
36-
: this.mostRecent
34+
eventNode.parent = eventLinking === 'custom' ? parent : this.mostRecent
3735

3836
this.add(eventNode, done)
3937
}
4038

41-
add (eventNode, cb) {
39+
add(eventNode, cb) {
4240
eventNode.getCID((err, eventCID) => {
4341
if (err) return cb(err)
4442

@@ -59,14 +57,13 @@ class EventTree {
5957
})
6058
}
6159

62-
get (eventCID) {
60+
get(eventCID) {
6361
return this.eventTree.get(eventCID.toBaseEncodedString())
6462
}
6563

6664
// TODO to should be optional and could be a CID or a number
6765
// defaults to 1 level of resolution
68-
resolve (fromCID, to) {
69-
}
66+
resolve(fromCID, to) {}
7067
}
7168

7269
module.exports = EventTree

‎src/dag/topic-node.js

+41-30
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@ const dagCBOR = require('ipld-dag-cbor')
66
const CID = require('cids')
77

88
const config = require('../config')
9-
const eventLinkTypes = require('../messages/protobuffers').TopicDescriptor.MetaData.EventLinking
10-
const {
11-
linkUnmarshalling,
12-
linkMarshalling
13-
} = require('./utils')
9+
const eventLinkTypes = require('../messages/protobuffers').TopicDescriptor
10+
.MetaData.EventLinking
11+
const { linkUnmarshalling, linkMarshalling } = require('./utils')
1412

1513
class TopicNode {
16-
constructor (name, author, options = {}) {
14+
constructor(name, author, options = {}) {
1715
assert(author, 'Need an author to create a topic node')
1816

1917
this.name = name
@@ -23,19 +21,21 @@ class TopicNode {
2321
this.parent = options.parent ? new CID(options.parent) : null
2422
if (options.subTopics) {
2523
this.subTopics = Object.entries(options.subTopics)
26-
.map(([name, topicB58Str]) => ({[name]: new CID(topicB58Str)}))
27-
.reduce((topics, topic) => ({...topic, ...topics}), {})
24+
.map(([name, topicB58Str]) => ({ [name]: new CID(topicB58Str) }))
25+
.reduce((topics, topic) => ({ ...topic, ...topics }), {})
2826
}
2927

3028
this.metadata = createMetadata(options.metadata)
3129
}
3230

33-
static deserialize (topic) {
31+
static deserialize(topic) {
3432
const author = PeerId.createFromBytes(topic.author)
3533
const parent = linkUnmarshalling(topic.parent)
36-
const subTopics = Object.entries(topic['#']).map(([name, dagLink]) => {
37-
return {[name]: linkUnmarshalling(dagLink)}
38-
}).reduce((topics, topic) => ({...topic, ...topics}), {})
34+
const subTopics = Object.entries(topic['#'])
35+
.map(([name, dagLink]) => {
36+
return { [name]: linkUnmarshalling(dagLink) }
37+
})
38+
.reduce((topics, topic) => ({ ...topic, ...topics }), {})
3939

4040
return new TopicNode(topic.name, author, {
4141
subTopics,
@@ -44,14 +44,14 @@ class TopicNode {
4444
})
4545
}
4646

47-
static deserializeCBOR (topic, cb) {
47+
static deserializeCBOR(topic, cb) {
4848
dagCBOR.util.deserialize(topic, (err, result) => {
4949
if (err) return cb(err)
5050
cb(null, TopicNode.deserialize(result))
5151
})
5252
}
5353

54-
getReadableFormat () {
54+
getReadableFormat() {
5555
const allowedPublishers = Array.isArray(this.metadata.allowedPublishers)
5656
? this.metadata.allowedPublishers.map(p => p.toB58String())
5757
: this.metadata.allowedPublishers
@@ -70,40 +70,44 @@ class TopicNode {
7070
}
7171
}
7272

73-
getCID (cb) {
73+
getCID(cb) {
7474
dagCBOR.util.cid(this.serialize(), cb)
7575
}
7676

77-
serialize () {
77+
serialize() {
7878
return {
7979
name: this.name,
8080
author: this.author.toBytes(),
8181
parent: linkMarshalling(this.parent),
82-
'#': Object.entries(this.subTopics).map(([name, cid]) => {
83-
return {[name]: linkMarshalling(cid)}
84-
}).reduce((topics, topic) => ({...topic, ...topics}), {}),
82+
'#': Object.entries(this.subTopics)
83+
.map(([name, cid]) => {
84+
return { [name]: linkMarshalling(cid) }
85+
})
86+
.reduce((topics, topic) => ({ ...topic, ...topics }), {}),
8587
metadata: serializeMetadata(this.metadata)
8688
}
8789
}
8890

89-
serializeCBOR (cb) {
91+
serializeCBOR(cb) {
9092
const serialized = this.serialize()
9193
dagCBOR.util.serialize(serialized, cb)
9294
}
9395
}
9496

95-
function serializeMetadata (metadata) {
96-
const allowedPublishers = {enabled: false, peers: []}
97+
function serializeMetadata(metadata) {
98+
const allowedPublishers = { enabled: false, peers: [] }
9799
if (metadata.allowedPublishers) {
98100
allowedPublishers.enabled = true
99-
allowedPublishers.peers = metadata.allowedPublishers.map((peer) => peer.toBytes())
101+
allowedPublishers.peers = metadata.allowedPublishers.map(peer =>
102+
peer.toBytes()
103+
)
100104
}
101105

102-
const requestToPublish = {enabled: false, peers: []}
106+
const requestToPublish = { enabled: false, peers: [] }
103107
if (metadata.requestToPublish) {
104108
requestToPublish.enabled = true
105109
requestToPublish.peers = Array.isArray(metadata.requestToPublish)
106-
? metadata.requestToPublish.map((peer) => peer.toBytes())
110+
? metadata.requestToPublish.map(peer => peer.toBytes())
107111
: []
108112
}
109113

@@ -116,15 +120,22 @@ function serializeMetadata (metadata) {
116120
}
117121
}
118122

119-
function deserializeMetadata (metadata) {
123+
function deserializeMetadata(metadata) {
120124
const allowedPublishers = metadata.allowedPublishers.enabled
121-
? metadata.allowedPublishers.peers.map((peer) => PeerId.createFromBytes(peer))
125+
? metadata.allowedPublishers.peers.map(peer => PeerId.createFromBytes(peer))
122126
: false
123127

124128
let requestToPublish
125129
if (!metadata.requestToPublish.enabled) requestToPublish = false
126-
if (metadata.requestToPublish.enabled && !metadata.requestToPublish.peers.length) requestToPublish = true
127-
else requestToPublish = metadata.requestToPublish.peers.map((peer) => PeerId.createFromBytes(peer))
130+
if (
131+
metadata.requestToPublish.enabled &&
132+
!metadata.requestToPublish.peers.length
133+
)
134+
requestToPublish = true
135+
else
136+
requestToPublish = metadata.requestToPublish.peers.map(peer =>
137+
PeerId.createFromBytes(peer)
138+
)
128139

129140
return {
130141
...metadata,
@@ -136,7 +147,7 @@ function deserializeMetadata (metadata) {
136147
}
137148
}
138149

139-
function createMetadata ({
150+
function createMetadata({
140151
allowedPublishers = false,
141152
requestToPublish = true,
142153
eventLinking = 'LAST_SEEN',

‎src/dag/topic-tree.js

+4-5
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
// TODO right now memory usage grows indefinitely
1010
class TopicTree {
1111
// TODO probably need access to the DHT?
12-
constructor () {
12+
constructor() {
1313
this.topicTree = new Map()
1414
}
1515

16-
add (topicNode, cb) {
16+
add(topicNode, cb) {
1717
topicNode.getCID((err, topicCID) => {
1818
if (err) return cb(err)
1919

@@ -22,14 +22,13 @@ class TopicTree {
2222
})
2323
}
2424

25-
get (topicCID) {
25+
get(topicCID) {
2626
return this.eventTree.get(topicCID.toBaseEncodedString())
2727
}
2828

2929
// TODO to should be optional and could be a CID or a number
3030
// defaults to 1 level of resolution
31-
resolve (fromCID, to) {
32-
}
31+
resolve(fromCID, to) {}
3332
}
3433

3534
module.exports = TopicTree

‎src/dag/utils.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@
22

33
const CID = require('cids')
44

5-
function linkMarshalling (link) {
5+
function linkMarshalling(link) {
66
// No link
77
if (!link) return {}
88
// It's a CID already
9-
if (CID.isCID(link)) return {'/': link.buffer}
9+
if (CID.isCID(link)) return { '/': link.buffer }
1010
// It's an object but empty
1111
if (!link['/'] && typeof link === 'object') return {}
1212
// It can be a link already or just the multihash
1313
const newCID = new CID(link['/'] || link)
14-
return {'/': newCID.buffer}
14+
return { '/': newCID.buffer }
1515
}
1616

17-
function linkUnmarshalling (link) {
17+
function linkUnmarshalling(link) {
1818
// Link is already a CID
1919
// happens when dagcbor utils is used directly
2020
if (CID.isCID(link)) return link

‎src/index.js

+184-122
Large diffs are not rendered by default.

‎src/messages/create-rpc.js

+7-7
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const config = require('../config')
66

77
// Update RPC message will handle neighbourhood
88
// updates
9-
function update (topic, {parents, children}) {
9+
function update(topic, { parents, children }) {
1010
const metadata = createMetadata()
1111
return {
1212
op: 'UPDATE',
@@ -19,47 +19,47 @@ function update (topic, {parents, children}) {
1919
}
2020
}
2121

22-
function publish (eventNode) {
22+
function publish(eventNode) {
2323
return {
2424
op: 'PUBLISH_EVENT',
2525
metadata: createMetadata(),
2626
event: eventNode
2727
}
2828
}
2929

30-
function requestToPublish (eventNode) {
30+
function requestToPublish(eventNode) {
3131
return {
3232
op: 'REQUEST_TO_PUBLISH',
3333
metadata: createMetadata(),
3434
event: eventNode
3535
}
3636
}
3737

38-
function joinTopic (topic) {
38+
function joinTopic(topic) {
3939
return {
4040
op: 'JOIN_TOPIC',
4141
topicId: new CID(topic),
4242
metadata: createMetadata()
4343
}
4444
}
4545

46-
function leaveTopic (topic) {
46+
function leaveTopic(topic) {
4747
return {
4848
op: 'LEAVE_TOPIC',
4949
topicId: new CID(topic),
5050
metadata: createMetadata()
5151
}
5252
}
5353

54-
function newTopic (topicNode) {
54+
function newTopic(topicNode) {
5555
return {
5656
op: 'NEW_TOPIC',
5757
topic: topicNode,
5858
metadata: createMetadata()
5959
}
6060
}
6161

62-
function createMetadata () {
62+
function createMetadata() {
6363
const now = new Date()
6464
return {
6565
protocolVersion: config.protocol,

‎src/messages/marshalling.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const EventNode = require('../dag/event-node')
77
const TopicNode = require('../dag/topic-node')
88
const ops = require('./protobuffers').RPC.Operation
99

10-
function unmarshall (message) {
10+
function unmarshall(message) {
1111
// TODO improve code unmarshalling
1212
const result = {}
1313
result.metadata = message.metadata
@@ -41,7 +41,7 @@ function unmarshall (message) {
4141
return result
4242
}
4343

44-
function marshall (message) {
44+
function marshall(message) {
4545
// TODO improve code marshalling
4646
const result = {}
4747
result.metadata = message.metadata

‎src/messages/schemas/event-descriptor.js

+22-12
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,32 @@
22

33
const Joi = require('joi')
44

5-
const metadata = Joi.object().keys({
6-
created: Joi.date().iso().required(),
7-
protocolVersion: Joi.string().required()
8-
// signature: Joi.binary()
9-
}).required()
5+
const metadata = Joi.object()
6+
.keys({
7+
created: Joi.date()
8+
.iso()
9+
.required(),
10+
protocolVersion: Joi.string().required()
11+
// signature: Joi.binary()
12+
})
13+
.required()
1014

1115
const eventDescriptor = Joi.object().keys({
12-
publisher: Joi.binary().required().allow(null),
16+
publisher: Joi.binary()
17+
.required()
18+
.allow(null),
1319
author: Joi.binary().required(),
14-
parent: Joi.object().keys({
15-
'/': Joi.binary().allow(null)
16-
}).required(),
20+
parent: Joi.object()
21+
.keys({
22+
'/': Joi.binary().allow(null)
23+
})
24+
.required(),
1725
payload: Joi.binary().required(),
18-
topic: Joi.object().keys({
19-
'/': Joi.binary().required()
20-
}).required(),
26+
topic: Joi.object()
27+
.keys({
28+
'/': Joi.binary().required()
29+
})
30+
.required(),
2131
metadata
2232
})
2333

‎src/messages/schemas/peer-tree.js

+6-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@ const Joi = require('joi')
44

55
const peerTree = Joi.object().keys({
66
topic: Joi.binary().required(),
7-
parents: Joi.array().items(Joi.binary()).required(),
8-
children: Joi.array().items(Joi.binary()).required()
7+
parents: Joi.array()
8+
.items(Joi.binary())
9+
.required(),
10+
children: Joi.array()
11+
.items(Joi.binary())
12+
.required()
913
})
1014

1115
module.exports = peerTree

‎src/messages/schemas/rpc.js

+19-12
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,25 @@ const eventDescriptor = require('./event-descriptor')
66
const peerTree = require('./peer-tree')
77
const ops = require('../protobuffers').RPC.Operation
88

9-
const metadata = Joi.object().keys({
10-
created: Joi.date().iso(),
11-
protocolVersion: Joi.string()
12-
}).required()
9+
const metadata = Joi.object()
10+
.keys({
11+
created: Joi.date().iso(),
12+
protocolVersion: Joi.string()
13+
})
14+
.required()
1315

14-
const rpc = Joi.object().keys({
15-
op: Joi.number().integer().valid(Object.values(ops)).required(),
16-
metadata,
17-
topicId: Joi.binary(),
18-
event: eventDescriptor,
19-
topic: topicDescriptor,
20-
peerTree: peerTree
21-
}).required()
16+
const rpc = Joi.object()
17+
.keys({
18+
op: Joi.number()
19+
.integer()
20+
.valid(Object.values(ops))
21+
.required(),
22+
metadata,
23+
topicId: Joi.binary(),
24+
event: eventDescriptor,
25+
topic: topicDescriptor,
26+
peerTree: peerTree
27+
})
28+
.required()
2229

2330
module.exports = rpc

‎src/messages/schemas/topic-descriptor.js

+44-22
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,55 @@
11
'use strict'
22

33
const Joi = require('joi')
4-
const eventLinking = require('../protobuffers').TopicDescriptor.MetaData.EventLinking
4+
const eventLinking = require('../protobuffers').TopicDescriptor.MetaData
5+
.EventLinking
56

6-
const metadata = Joi.object().keys({
7-
created: Joi.date().iso().required(),
8-
protocolVersion: Joi.string().required(),
9-
allowedPublishers: Joi.object().keys({
10-
enabled: Joi.boolean(),
11-
peers: Joi.alternatives()
12-
.when('enabled', {is: true, then: Joi.array().items(Joi.binary()).min(1)})
13-
}).required(),
14-
requestToPublish: Joi.object().keys({
15-
enabled: Joi.boolean(),
16-
peers: Joi.array().items(Joi.binary())
17-
}).required(),
18-
eventLinking: Joi.string().valid(Object.values(eventLinking)).required()
19-
// signature: Joi.binary()
20-
}).required()
7+
const metadata = Joi.object()
8+
.keys({
9+
created: Joi.date()
10+
.iso()
11+
.required(),
12+
protocolVersion: Joi.string().required(),
13+
allowedPublishers: Joi.object()
14+
.keys({
15+
enabled: Joi.boolean(),
16+
peers: Joi.alternatives().when('enabled', {
17+
is: true,
18+
then: Joi.array()
19+
.items(Joi.binary())
20+
.min(1)
21+
})
22+
})
23+
.required(),
24+
requestToPublish: Joi.object()
25+
.keys({
26+
enabled: Joi.boolean(),
27+
peers: Joi.array().items(Joi.binary())
28+
})
29+
.required(),
30+
eventLinking: Joi.string()
31+
.valid(Object.values(eventLinking))
32+
.required()
33+
// signature: Joi.binary()
34+
})
35+
.required()
2136

2237
const topicDescriptor = Joi.object().keys({
2338
name: Joi.string().required(),
2439
author: Joi.binary().required(),
25-
parent: Joi.object().keys({
26-
'/': Joi.binary()
27-
}).required(),
28-
'#': Joi.object().pattern(Joi.string(), Joi.object().keys({
29-
'/': Joi.binary().required()
30-
})).required(),
40+
parent: Joi.object()
41+
.keys({
42+
'/': Joi.binary()
43+
})
44+
.required(),
45+
'#': Joi.object()
46+
.pattern(
47+
Joi.string(),
48+
Joi.object().keys({
49+
'/': Joi.binary().required()
50+
})
51+
)
52+
.required(),
3153
metadata
3254
})
3355

‎src/peer.js

+24-22
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ const assert = require('assert')
99
const log = require('./utils/logger')
1010

1111
class Peer extends EventEmitter {
12-
constructor (peerInfo, conn = null) {
13-
log.trace('New peer registered %j', {peer: peerInfo.id.toB58String()})
12+
constructor(peerInfo, conn = null) {
13+
log.trace('New peer registered %j', { peer: peerInfo.id.toB58String() })
1414
assert(peerInfo, 'Need a peerInfo object to initiate the peer')
1515
super()
1616

@@ -24,11 +24,11 @@ class Peer extends EventEmitter {
2424
}
2525
}
2626

27-
isConnected () {
27+
isConnected() {
2828
return !!this.conn
2929
}
3030

31-
attachConnection (conn) {
31+
attachConnection(conn) {
3232
if (this.conn) {
3333
// TODO close previously existing connection
3434
}
@@ -50,31 +50,33 @@ class Peer extends EventEmitter {
5050
this.emit('connection')
5151
}
5252

53-
sendMessages (messages) {
54-
log.trace('Pushing message to peer %j', {peer: this.info.id.toB58String()})
53+
sendMessages(messages) {
54+
log.trace('Pushing message to peer %j', {
55+
peer: this.info.id.toB58String()
56+
})
5557
this.stream.push(messages)
5658
}
5759

58-
updateTree (topic, {parents = [], children = []}) {
59-
this.trees.set(topic, {parents, children})
60+
updateTree(topic, { parents = [], children = [] }) {
61+
this.trees.set(topic, { parents, children })
6062
}
6163

62-
removeTree (topic) {
64+
removeTree(topic) {
6365
const tree = this.trees.get(topic)
6466
if (!tree) return
6567

6668
this.trees.delete(topic)
6769
return tree
6870
}
6971

70-
addChildren (topic, children) {
72+
addChildren(topic, children) {
7173
const tree = this.trees.get(topic)
7274
if (!tree) {
73-
this.trees.set(topic, {children, parents: []})
75+
this.trees.set(topic, { children, parents: [] })
7476
return
7577
}
76-
children.forEach((child) => {
77-
const exists = tree.children.find((peer) => {
78+
children.forEach(child => {
79+
const exists = tree.children.find(peer => {
7880
return peer.info.id.isEqual(child.info.id)
7981
})
8082
if (!exists) {
@@ -83,14 +85,14 @@ class Peer extends EventEmitter {
8385
})
8486
}
8587

86-
addParents (topic, parents) {
88+
addParents(topic, parents) {
8789
const tree = this.trees.get(topic)
8890
if (!tree) {
89-
this.trees.set(topic, {parents, children: []})
91+
this.trees.set(topic, { parents, children: [] })
9092
return
9193
}
92-
parents.forEach((parent) => {
93-
const exists = tree.parents.find((peer) => {
94+
parents.forEach(parent => {
95+
const exists = tree.parents.find(peer => {
9496
return peer.info.id.isEqual(parent.info.id)
9597
})
9698
if (!exists) {
@@ -99,18 +101,18 @@ class Peer extends EventEmitter {
99101
})
100102
}
101103

102-
removePeer (topic, peerId) {
104+
removePeer(topic, peerId) {
103105
const tree = this.trees.get(topic)
104106
const newParents = []
105107
const newChildren = []
106108
if (!tree) return
107109

108-
tree.parents.forEach((parent) => {
110+
tree.parents.forEach(parent => {
109111
if (parent.info.id.isEqual(peerId)) return
110112
newParents.push(parent)
111113
})
112114

113-
tree.children.forEach((child) => {
115+
tree.children.forEach(child => {
114116
if (child.info.id.isEqual(peerId)) return
115117
newChildren.push(child)
116118
})
@@ -119,7 +121,7 @@ class Peer extends EventEmitter {
119121
tree.parents = newParents
120122
}
121123

122-
close (callback) {
124+
close(callback) {
123125
// End the pushable
124126
if (this.stream) {
125127
this.stream.end()
@@ -135,7 +137,7 @@ class Peer extends EventEmitter {
135137

136138
// Only close the connection if no other
137139
// topic subscriptions are kept by this peer
138-
gracefulClose (callback) {
140+
gracefulClose(callback) {
139141
if (this.trees.size === 0) this.close(callback)
140142
}
141143
}

‎src/rpc/index.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ const receive = require('./receive')
55

66
module.exports = rpc
77

8-
function rpc (pulsarcastNode) {
8+
function rpc(pulsarcastNode) {
99
return {
1010
send: send(pulsarcastNode),
1111
receive: receive(pulsarcastNode)

‎src/rpc/receive.js

+78-36
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ const { protobuffers, schemas, marshalling } = require('../messages')
99

1010
const ops = protobuffers.RPC.Operation
1111

12-
function createRPCHandlers (pulsarcastNode) {
12+
function createRPCHandlers(pulsarcastNode) {
1313
const dht = pulsarcastNode.libp2p._dht
1414

1515
return {
@@ -25,10 +25,10 @@ function createRPCHandlers (pulsarcastNode) {
2525
genericHandler
2626
}
2727

28-
function publish (idB58Str, eventNode, callback) {
28+
function publish(idB58Str, eventNode, callback) {
2929
// Only consider the message if we have data
3030
if (!eventNode) return callback()
31-
const {me} = pulsarcastNode
31+
const { me } = pulsarcastNode
3232
const myId = me.info.id
3333

3434
// Publish is from this node so it's a new event
@@ -38,67 +38,94 @@ function createRPCHandlers (pulsarcastNode) {
3838
getTopic(dht, eventNode.topicCID, (err, topicNode) => {
3939
if (err) return callback(err)
4040

41-
const {
42-
allowedPublishers
43-
} = topicNode.metadata
41+
const { allowedPublishers } = topicNode.metadata
4442

4543
// New event published at this node and not allowed to publish
46-
if (newEvent && allowedPublishers && !allowedPublishers.find((peer) => peer.isEqual(myId))) {
47-
return requestToPublish(myId.toB58String(), eventNode, (err, topicNode, eventNode) => {
48-
callback(err, null, topicNode, eventNode)
49-
})
44+
if (
45+
newEvent &&
46+
allowedPublishers &&
47+
!allowedPublishers.find(peer => peer.isEqual(myId))
48+
) {
49+
return requestToPublish(
50+
myId.toB58String(),
51+
eventNode,
52+
(err, topicNode, eventNode) => {
53+
callback(err, null, topicNode, eventNode)
54+
}
55+
)
5056
}
5157

5258
// TODO check publisher is allowed
5359

5460
const options = { isNewEvent: newEvent }
55-
pulsarcastNode.rpc.send.event.publish(topicNode, eventNode, idB58Str, options, callback)
61+
pulsarcastNode.rpc.send.event.publish(
62+
topicNode,
63+
eventNode,
64+
idB58Str,
65+
options,
66+
callback
67+
)
5668
})
5769
}
5870

59-
function requestToPublish (idB58Str, eventNode, callback) {
71+
function requestToPublish(idB58Str, eventNode, callback) {
6072
// Only consider the message if we have data
6173
if (!eventNode) return
62-
const {me} = pulsarcastNode
74+
const { me } = pulsarcastNode
6375
const myId = me.info.id
6476

6577
log.trace('Got request to publish %j', eventNode)
6678

6779
getTopic(dht, eventNode.topicCID, (err, topicNode) => {
6880
if (err) return callback(err)
6981

70-
const {
71-
allowedPublishers,
72-
requestToPublish
73-
} = topicNode.metadata
82+
const { allowedPublishers, requestToPublish } = topicNode.metadata
7483

7584
// TODO better handling for this case
7685
if (!requestToPublish) return callback(null, topicNode, eventNode)
7786
// TODO better handling for this case
78-
if (Array.isArray(requestToPublish) && !requestToPublish.find((peer) => peer.isEqual(topicNode.author))) return callback(null, topicNode, eventNode)
87+
if (
88+
Array.isArray(requestToPublish) &&
89+
!requestToPublish.find(peer => peer.isEqual(topicNode.author))
90+
)
91+
return callback(null, topicNode, eventNode)
7992

8093
// Publish if I'm allowed to
81-
if (!allowedPublishers || allowedPublishers.find((peer) => peer.isEqual(myId))) {
82-
return pulsarcastNode.rpc.send.event.publish(topicNode, eventNode, myId.toB58String(), {isNewEvent: true}, callback)
94+
if (
95+
!allowedPublishers ||
96+
allowedPublishers.find(peer => peer.isEqual(myId))
97+
) {
98+
return pulsarcastNode.rpc.send.event.publish(
99+
topicNode,
100+
eventNode,
101+
myId.toB58String(),
102+
{ isNewEvent: true },
103+
callback
104+
)
83105
}
84106
// Propagate request to publish
85-
pulsarcastNode.rpc.send.event.requestToPublish(topicNode, eventNode, idB58Str, callback)
107+
pulsarcastNode.rpc.send.event.requestToPublish(
108+
topicNode,
109+
eventNode,
110+
idB58Str,
111+
callback
112+
)
86113
})
87114
}
88115

89-
function update (idB58Str, peerTree) {
116+
function update(idB58Str, peerTree) {
90117
// Only consider the emessage if we have data
91118
if (!peerTree) return
92119
const topicCID = peerTree.topicId.toBaseEncodedString()
93120

94121
log.trace('Got update %j', peerTree)
95122

96-
const {peers} = pulsarcastNode
123+
const { peers } = pulsarcastNode
97124
peers.get(idB58Str).updateTree(topicCID, peerTree)
98125
}
99126

100-
function join (idB58Str, topicCID, callback) {
101-
const {me, peers} = pulsarcastNode
127+
function join(idB58Str, topicCID, callback) {
128+
const { me, peers } = pulsarcastNode
102129
const topicB58Str = topicCID.toBaseEncodedString()
103130

104131
// The peer should already be in the list given that
@@ -117,17 +144,19 @@ function createRPCHandlers (pulsarcastNode) {
117144
child.addParents(topicB58Str, [me])
118145
// TODO take care of delivering initial state
119146
// This node is the root node for the topic
120-
if (me.info.id.isEqual(topicNode.author)) return callback(null, topicNode)
147+
if (me.info.id.isEqual(topicNode.author))
148+
return callback(null, topicNode)
121149
// Check if we have a set of parents for this topic
122-
if (me.trees.get(topicB58Str).parents > 0) return callback(null, topicNode)
150+
if (me.trees.get(topicB58Str).parents > 0)
151+
return callback(null, topicNode)
123152
}
124153

125154
pulsarcastNode.rpc.send.topic.join(topicNode, callback)
126155
})
127156
}
128157

129-
function leave (idB58Str, topicCID, callback) {
130-
const {me, peers} = pulsarcastNode
158+
function leave(idB58Str, topicCID, callback) {
159+
const { me, peers } = pulsarcastNode
131160
const topicB58Str = topicCID.toBaseEncodedString()
132161
const myId = me.info.id
133162

@@ -154,9 +183,14 @@ function createRPCHandlers (pulsarcastNode) {
154183

155184
// Need to forward the leave rpc
156185
const peers = tree.children.concat(tree.parents)
157-
return eachLimit(peers, 20, (peer, done) => {
158-
pulsarcastNode.rpc.send.topic.leave(topicNode, peer, done)
159-
}, err => callback(err))
186+
return eachLimit(
187+
peers,
188+
20,
189+
(peer, done) => {
190+
pulsarcastNode.rpc.send.topic.leave(topicNode, peer, done)
191+
},
192+
err => callback(err)
193+
)
160194
}
161195

162196
// Remove the peer from our tree
@@ -170,14 +204,18 @@ function createRPCHandlers (pulsarcastNode) {
170204
if (err) return callback(err)
171205

172206
// If this node is a child, we need to rejoin the topic
173-
if (me.trees.get(topicB58Str).children.find((peer) => peer.info.id.isEqual(myId))) {
207+
if (
208+
me.trees
209+
.get(topicB58Str)
210+
.children.find(peer => peer.info.id.isEqual(myId))
211+
) {
174212
pulsarcastNode.rpc.send.topic.join(topicNode, callback)
175213
}
176214
})
177215
})
178216
}
179217

180-
function genericHandler (idB58Str, message) {
218+
function genericHandler(idB58Str, message) {
181219
const result = Joi.validate(message, schemas.rpc, {
182220
abortEarly: true,
183221
allowUnknown: false,
@@ -191,8 +229,12 @@ function createRPCHandlers (pulsarcastNode) {
191229
// with type coercion
192230
const jsonMessage = marshalling.unmarshall(result.value)
193231

194-
log.trace('Received rpc %j', {handler: 'in', op: jsonMessage.op, from: idB58Str})
195-
const errorHandler = (err) => {
232+
log.trace('Received rpc %j', {
233+
handler: 'in',
234+
op: jsonMessage.op,
235+
from: idB58Str
236+
})
237+
const errorHandler = err => {
196238
if (err) log.error('%j', err)
197239
}
198240

‎src/rpc/send.js

+73-57
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ const { closestPeerToPeer, store } = require('../utils/dht-helpers')
99

1010
const RPC = protobuffers.RPC
1111

12-
function createRPCHandlers (pulsarcastNode) {
12+
function createRPCHandlers(pulsarcastNode) {
1313
const dht = pulsarcastNode.libp2p._dht
1414

1515
return {
@@ -24,13 +24,13 @@ function createRPCHandlers (pulsarcastNode) {
2424
}
2525
}
2626

27-
function publish (topicNode, eventNode, fromIdB58Str, options, callback) {
27+
function publish(topicNode, eventNode, fromIdB58Str, options, callback) {
2828
if (!callback) {
2929
callback = options
3030
options = {}
3131
}
3232

33-
const {me} = pulsarcastNode
33+
const { me } = pulsarcastNode
3434
const myId = me.info.id
3535
const isNewEvent = options.isNewEvent
3636

@@ -39,43 +39,52 @@ function createRPCHandlers (pulsarcastNode) {
3939
eventNode.publisher = myId
4040
}
4141

42-
waterfall([
43-
topicNode.getCID.bind(topicNode),
44-
(topicCID, cb) => {
45-
addEvent(topicCID, topicNode, eventNode, {createLink: isNewEvent}, cb)
46-
},
47-
(linkedEvent, cb) => {
48-
if (!isNewEvent) return cb(null, null, linkedEvent)
49-
50-
// Publish is being created at this node, not just forwardind,
51-
// so add it to DHT and propagate it through our whole topic tree
52-
store(dht, linkedEvent, cb)
53-
}
54-
], (err, eventCID, linkedEvent) => {
55-
if (err) return callback(err)
56-
57-
const topicB58Str = linkedEvent.topicCID.toBaseEncodedString()
58-
const rpc = createRPC.event.publish(linkedEvent)
59-
const trees = pulsarcastNode.me.trees.get(topicB58Str)
42+
waterfall(
43+
[
44+
topicNode.getCID.bind(topicNode),
45+
(topicCID, cb) => {
46+
addEvent(
47+
topicCID,
48+
topicNode,
49+
eventNode,
50+
{ createLink: isNewEvent },
51+
cb
52+
)
53+
},
54+
(linkedEvent, cb) => {
55+
if (!isNewEvent) return cb(null, null, linkedEvent)
56+
57+
// Publish is being created at this node, not just forwardind,
58+
// so add it to DHT and propagate it through our whole topic tree
59+
store(dht, linkedEvent, cb)
60+
}
61+
],
62+
(err, eventCID, linkedEvent) => {
63+
if (err) return callback(err)
6064

61-
// We're subscribed to this topic, emit the message
62-
if (pulsarcastNode.subscriptions.has(topicB58Str)) {
63-
pulsarcastNode.emit(topicB58Str, linkedEvent)
65+
const topicB58Str = linkedEvent.topicCID.toBaseEncodedString()
66+
const rpc = createRPC.event.publish(linkedEvent)
67+
const trees = pulsarcastNode.me.trees.get(topicB58Str)
68+
69+
// We're subscribed to this topic, emit the message
70+
if (pulsarcastNode.subscriptions.has(topicB58Str)) {
71+
pulsarcastNode.emit(topicB58Str, linkedEvent)
72+
}
73+
// TODO handle publishing to an event we're not subscribed to
74+
if (!trees) return callback(null, eventCID, topicNode, linkedEvent)
75+
const { parents, children } = trees
76+
77+
const peers = [...parents, ...children]
78+
peers.forEach(peer => {
79+
// Don't forward the message back
80+
if (peer.info.id.toB58String() !== fromIdB58Str) send(peer, rpc)
81+
return callback(null, eventCID, topicNode, linkedEvent)
82+
})
6483
}
65-
// TODO handle publishing to an event we're not subscribed to
66-
if (!trees) return callback(null, eventCID, topicNode, linkedEvent)
67-
const { parents, children } = trees
68-
69-
const peers = [...parents, ...children]
70-
peers.forEach(peer => {
71-
// Don't forward the message back
72-
if (peer.info.id.toB58String() !== fromIdB58Str) send(peer, rpc)
73-
return callback(null, eventCID, topicNode, linkedEvent)
74-
})
75-
})
84+
)
7685
}
7786

78-
function requestToPublish (topicNode, eventNode, fromIdB58Str, callback) {
87+
function requestToPublish(topicNode, eventNode, fromIdB58Str, callback) {
7988
const rpc = createRPC.event.requestToPublish(eventNode)
8089

8190
topicNode.getCID((err, topicCID) => {
@@ -98,30 +107,33 @@ function createRPCHandlers (pulsarcastNode) {
98107

99108
// Join finds the closest peer to the topic CID
100109
// and sends the rpc join message
101-
function joinTopic (topicNode, callback) {
110+
function joinTopic(topicNode, callback) {
102111
const { me } = pulsarcastNode
103112
topicNode.getCID((err, topicCID) => {
104113
if (err) return callback(err)
105114

106115
const rpc = createRPC.topic.join(topicCID)
107116
// Get the closest peer to the topic author stored locally
108-
waterfall([
109-
closestPeerToPeer.bind(null, dht, topicNode.author),
110-
pulsarcastNode._getPeer.bind(pulsarcastNode)
111-
], (err, peer) => {
112-
if (err) return callback(err)
113-
// Add peer to my tree
114-
me.addParents(topicCID.toBaseEncodedString(), [peer])
115-
// Add me to peer's tree
116-
peer.addChildren(topicCID.toBaseEncodedString(), [me])
117-
send(peer, rpc)
118-
119-
callback(null, topicNode)
120-
})
117+
waterfall(
118+
[
119+
closestPeerToPeer.bind(null, dht, topicNode.author),
120+
pulsarcastNode._getPeer.bind(pulsarcastNode)
121+
],
122+
(err, peer) => {
123+
if (err) return callback(err)
124+
// Add peer to my tree
125+
me.addParents(topicCID.toBaseEncodedString(), [peer])
126+
// Add me to peer's tree
127+
peer.addChildren(topicCID.toBaseEncodedString(), [me])
128+
send(peer, rpc)
129+
130+
callback(null, topicNode)
131+
}
132+
)
121133
})
122134
}
123135

124-
function leaveTopic (topicNode, toPeer, callback) {
136+
function leaveTopic(topicNode, toPeer, callback) {
125137
topicNode.getCID((err, topicCID) => {
126138
if (err) return callback(err)
127139

@@ -132,7 +144,7 @@ function createRPCHandlers (pulsarcastNode) {
132144
}
133145

134146
// TODO for now only store topic descriptor
135-
function newTopic (topicNode, options, callback) {
147+
function newTopic(topicNode, options, callback) {
136148
// check if options exist
137149
if (!callback) {
138150
callback = options
@@ -142,19 +154,23 @@ function createRPCHandlers (pulsarcastNode) {
142154
store(dht, topicNode, callback)
143155
}
144156

145-
function send (peer, rpc) {
146-
log.trace('Sending rpc %j', {handler: 'out', op: rpc.op, to: peer.info.id.toB58String()})
157+
function send(peer, rpc) {
158+
log.trace('Sending rpc %j', {
159+
handler: 'out',
160+
op: rpc.op,
161+
to: peer.info.id.toB58String()
162+
})
147163

148164
const rpcToSend = marshalling.marshall(rpc)
149-
const encodedMessage = RPC.encode({msgs: [rpcToSend]})
165+
const encodedMessage = RPC.encode({ msgs: [rpcToSend] })
150166

151167
peer.sendMessages(encodedMessage)
152168
}
153169

154170
// Helper funcs
155-
function addEvent (topicCID, topicNode, eventNode, {createLink}, cb) {
171+
function addEvent(topicCID, topicNode, eventNode, { createLink }, cb) {
156172
const topicB58Str = topicCID.toBaseEncodedString()
157-
const {eventTrees} = pulsarcastNode
173+
const { eventTrees } = pulsarcastNode
158174
let eventTree
159175

160176
// Add event tree if it does not exist

‎src/utils/dht-helpers.js

+29-25
Original file line numberDiff line numberDiff line change
@@ -8,42 +8,46 @@ const log = require('./logger')
88
const TopicNode = require('../dag/topic-node')
99
const EventNode = require('../dag/event-node')
1010

11-
function closestPeerToPeer (dht, peerId, cb) {
11+
function closestPeerToPeer(dht, peerId, cb) {
1212
convertPeerId(peerId, (err, key) => {
1313
if (err) cb(err)
1414
cb(null, dht.routingTable.closestPeer(key))
1515
})
1616
}
1717

1818
// Store a event node or topic node
19-
function store (dht, dagNode, cb) {
20-
waterfall([
21-
(done) => parallel([
22-
dagNode.getCID.bind(dagNode),
23-
dagNode.serializeCBOR.bind(dagNode)
24-
], done),
25-
([cid, serialized], done) => {
26-
log.trace('Storing node %j', {
27-
cid: cid.toBaseEncodedString(),
28-
...dagNode.getReadableFormat()
29-
})
30-
dht.put(cid.buffer, serialized, (err) => done(err, cid))
31-
}
32-
], (err, cid) => cb(err, cid, dagNode))
19+
function store(dht, dagNode, cb) {
20+
waterfall(
21+
[
22+
done =>
23+
parallel(
24+
[dagNode.getCID.bind(dagNode), dagNode.serializeCBOR.bind(dagNode)],
25+
done
26+
),
27+
([cid, serialized], done) => {
28+
log.trace('Storing node %j', {
29+
cid: cid.toBaseEncodedString(),
30+
...dagNode.getReadableFormat()
31+
})
32+
dht.put(cid.buffer, serialized, err => done(err, cid))
33+
}
34+
],
35+
(err, cid) => cb(err, cid, dagNode)
36+
)
3337
}
3438

35-
function getTopic (dht, topicCID, cb) {
36-
waterfall([
37-
dht.get.bind(dht, topicCID.buffer, null),
38-
TopicNode.deserializeCBOR
39-
], cb)
39+
function getTopic(dht, topicCID, cb) {
40+
waterfall(
41+
[dht.get.bind(dht, topicCID.buffer, null), TopicNode.deserializeCBOR],
42+
cb
43+
)
4044
}
4145

42-
function getEvent (dht, eventCID, cb) {
43-
waterfall([
44-
dht.get.bind(dht, eventCID.buffer, null),
45-
EventNode.deserializeCBOR
46-
], cb)
46+
function getEvent(dht, eventCID, cb) {
47+
waterfall(
48+
[dht.get.bind(dht, eventCID.buffer, null), EventNode.deserializeCBOR],
49+
cb
50+
)
4751
}
4852

4953
module.exports = {

‎test/integration/2-nodes.js

+64-51
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,27 @@ describe('2 nodes', () => {
1717
let topic
1818
let topicCID
1919

20-
before((done) => {
20+
before(done => {
2121
createNodes(2, (err, p2pNodes) => {
2222
expect(err).not.to.exist
23-
nodes = p2pNodes.map((node) => new Pulsarcast(node))
23+
nodes = p2pNodes.map(node => new Pulsarcast(node))
2424
done()
2525
})
2626
})
2727

28-
it('starts both nodes', (done) => {
29-
parallel([
30-
nodes[0].start.bind(nodes[0]),
31-
nodes[1].start.bind(nodes[1])
32-
], (err) => {
33-
expect(err).to.not.exist
34-
expect(nodes[0].started).to.be.true
35-
expect(nodes[1].started).to.be.true
36-
done()
37-
})
28+
it('starts both nodes', done => {
29+
parallel(
30+
[nodes[0].start.bind(nodes[0]), nodes[1].start.bind(nodes[1])],
31+
err => {
32+
expect(err).to.not.exist
33+
expect(nodes[0].started).to.be.true
34+
expect(nodes[1].started).to.be.true
35+
done()
36+
}
37+
)
3838
})
3939

40-
it('creates a topic', (done) => {
40+
it('creates a topic', done => {
4141
nodes[0].createTopic('test', (err, savedCID, topicNode) => {
4242
expect(err).to.not.exist
4343
expect(topicNode).to.be.an.instanceof(TopicNode)
@@ -56,26 +56,31 @@ describe('2 nodes', () => {
5656
})
5757
})
5858

59-
it('creates a new topic with a parent', (done) => {
60-
nodes[0].createTopic('test-2.0', {parent: topicCID.toBaseEncodedString()}, (err, savedCID, childTopicNode) => {
61-
expect(err).to.not.exist
62-
expect(childTopicNode).to.be.an.instanceof(TopicNode)
63-
expect(childTopicNode.subTopics.meta).to.be.an.instanceof(CID)
64-
expect(childTopicNode.parent.equals(topicCID)).to.be.true
65-
childTopicNode.getCID((err, cid) => {
59+
it('creates a new topic with a parent', done => {
60+
nodes[0].createTopic(
61+
'test-2.0',
62+
{ parent: topicCID.toBaseEncodedString() },
63+
(err, savedCID, childTopicNode) => {
6664
expect(err).to.not.exist
67-
const topicB58Str = cid.toBaseEncodedString()
68-
expect(cid.equals(savedCID)).to.be.true
69-
expect(nodes[0].subscriptions.size).to.equal(4)
70-
expect(nodes[0].subscriptions.has(topicB58Str)).to.be.true
71-
done()
72-
})
73-
})
65+
expect(childTopicNode).to.be.an.instanceof(TopicNode)
66+
expect(childTopicNode.subTopics.meta).to.be.an.instanceof(CID)
67+
expect(childTopicNode.parent.equals(topicCID)).to.be.true
68+
childTopicNode.getCID((err, cid) => {
69+
expect(err).to.not.exist
70+
const topicB58Str = cid.toBaseEncodedString()
71+
expect(cid.equals(savedCID)).to.be.true
72+
expect(nodes[0].subscriptions.size).to.equal(4)
73+
expect(nodes[0].subscriptions.has(topicB58Str)).to.be.true
74+
done()
75+
})
76+
}
77+
)
7478
})
7579

76-
it('creates a new topic with a subTopic', (done) => {
77-
nodes[0].createTopic('test-with-subtopic',
78-
{subTopics: {'some-topic': topicCID.toBaseEncodedString()}},
80+
it('creates a new topic with a subTopic', done => {
81+
nodes[0].createTopic(
82+
'test-with-subtopic',
83+
{ subTopics: { 'some-topic': topicCID.toBaseEncodedString() } },
7984
(err, savedCID, newTopicNode) => {
8085
expect(err).to.not.exist
8186
expect(newTopicNode.subTopics['some-topic'].equals(topicCID)).to.be.true
@@ -89,28 +94,30 @@ describe('2 nodes', () => {
8994
expect(nodes[0].subscriptions.has(topicB58Str)).to.be.true
9095
done()
9196
})
92-
})
97+
}
98+
)
9399
})
94100

95-
it('subscribes to the previously created topic', (done) => {
101+
it('subscribes to the previously created topic', done => {
96102
const topicB58Str = topicCID.toBaseEncodedString()
97103

98104
nodes[1].subscribe(topicB58Str, (err, topicNode) => {
99105
expect(err).to.not.exist
100106
expect(topicNode).to.be.an.instanceof(TopicNode)
101107
expect(nodes[1].subscriptions.size).to.equal(1)
102-
expect(nodes[1].subscriptions.has(topicCID.toBaseEncodedString())).to.be.true
108+
expect(nodes[1].subscriptions.has(topicCID.toBaseEncodedString())).to.be
109+
.true
103110
expect(topicNode.serialize()).to.deep.equal(topic.serialize())
104111

105112
// Check tree
106-
const {parents} = nodes[1].me.trees.get(topicB58Str)
113+
const { parents } = nodes[1].me.trees.get(topicB58Str)
107114
expect(parents).to.have.lengthOf.above(0)
108115
expect(parents[0].trees.get(topicB58Str).children).to.include(nodes[1].me)
109116
done()
110117
})
111118
})
112119

113-
it('publishes a message from the non author node', (done) => {
120+
it('publishes a message from the non author node', done => {
114121
const topicB58Str = topicCID.toBaseEncodedString()
115122
const message = 'foobar'
116123
// Helper func to run all the expects
@@ -121,7 +128,7 @@ describe('2 nodes', () => {
121128
}
122129
let firstEventNode
123130
// Event listener
124-
const listener = (eventNode) => {
131+
const listener = eventNode => {
125132
// Compare serializes of both events
126133
if (!firstEventNode) {
127134
firstEventNode = eventNode
@@ -144,30 +151,36 @@ describe('2 nodes', () => {
144151
nodes[1].once(topicB58Str, listener)
145152
nodes[0].once(topicB58Str, listener)
146153

147-
nodes[1].publish(topicB58Str, message, (err, eventCID, topicNode, eventNode) => {
148-
expect(err).to.not.exist
149-
expect(eventNode).to.be.an.instanceof(EventNode)
150-
expect(topicNode).to.be.an.instanceof(TopicNode)
151-
expect(eventNode.topicCID.equals(topicCID)).to.be.true
152-
expect(eventNode.payload.toString()).to.be.equal(message)
153-
expect(eventNode.author.isEqual(nodes[1].me.info.id)).to.be.true
154-
// Should be a request to publish
155-
expect(eventCID).to.be.null
156-
expect(eventNode.isPublished).to.be.false
157-
checkAllDone()
158-
})
154+
nodes[1].publish(
155+
topicB58Str,
156+
message,
157+
(err, eventCID, topicNode, eventNode) => {
158+
expect(err).to.not.exist
159+
expect(eventNode).to.be.an.instanceof(EventNode)
160+
expect(topicNode).to.be.an.instanceof(TopicNode)
161+
expect(eventNode.topicCID.equals(topicCID)).to.be.true
162+
expect(eventNode.payload.toString()).to.be.equal(message)
163+
expect(eventNode.author.isEqual(nodes[1].me.info.id)).to.be.true
164+
// Should be a request to publish
165+
expect(eventCID).to.be.null
166+
expect(eventNode.isPublished).to.be.false
167+
checkAllDone()
168+
}
169+
)
159170
})
160171

161-
it('unsubscribe from the topic', (done) => {
172+
it('unsubscribe from the topic', done => {
162173
const topicB58Str = topicCID.toBaseEncodedString()
163-
nodes[1].unsubscribe(topicB58Str, (err) => {
174+
nodes[1].unsubscribe(topicB58Str, err => {
164175
expect(err).to.not.exist
165176

166177
expect(nodes[1].subscriptions.size).to.equal(0)
167178

168179
eventually(() => {
169180
const topicChildren = nodes[0].me.trees.get(topicB58Str).children
170-
expect(topicChildren.find(peer => peer.info.id.isEqual(nodes[1].me.info.id))).to.not.exist
181+
expect(
182+
topicChildren.find(peer => peer.info.id.isEqual(nodes[1].me.info.id))
183+
).to.not.exist
171184
}, done)
172185
})
173186
})

‎test/integration/multiple-nodes.js

+116-84
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const TopicNode = require('../../src/dag/topic-node')
1212
const EventNode = require('../../src/dag/event-node')
1313
const { eventually, createNodes } = require('../utils')
1414

15-
describe('multiple nodes', function () {
15+
describe('multiple nodes', function() {
1616
this.timeout(50000)
1717
let nodes
1818
// Topics
@@ -27,72 +27,85 @@ describe('multiple nodes', function () {
2727
const subscriber = 80
2828
const subscriberNum = 5
2929

30-
before((done) => {
30+
before(done => {
3131
createNodes(nodeNumber, (err, p2pNodes) => {
3232
expect(err).not.to.exist
33-
nodes = p2pNodes.map((node) => new Pulsarcast(node))
33+
nodes = p2pNodes.map(node => new Pulsarcast(node))
3434
done()
3535
})
3636
})
3737

38-
it(`starts ${nodeNumber} nodes`, (done) => {
39-
each(nodes, (node, cb) => node.start(cb), (err) => {
40-
expect(err).to.not.exist
41-
nodes.forEach(node => {
42-
expect(node.started).to.be.true
43-
})
44-
done()
45-
})
38+
it(`starts ${nodeNumber} nodes`, done => {
39+
each(
40+
nodes,
41+
(node, cb) => node.start(cb),
42+
err => {
43+
expect(err).to.not.exist
44+
nodes.forEach(node => {
45+
expect(node.started).to.be.true
46+
})
47+
done()
48+
}
49+
)
4650
})
4751

48-
it('creates a simple parent topic', (done) => {
49-
nodes[publisher + 1].createTopic('test-parent', (err, savedCID, topicNode) => {
50-
expect(err).to.not.exist
51-
expect(topicNode).to.be.an.instanceof(TopicNode)
52-
expect(topicNode.subTopics.meta).to.be.an.instanceof(CID)
53-
topicNode.getCID((err, cid) => {
52+
it('creates a simple parent topic', done => {
53+
nodes[publisher + 1].createTopic(
54+
'test-parent',
55+
(err, savedCID, topicNode) => {
5456
expect(err).to.not.exist
55-
const topicB58Str = cid.toBaseEncodedString()
56-
expect(cid.equals(savedCID)).to.be.true
57-
// Topic and meta topic
58-
expect(nodes[publisher + 1].subscriptions.size).to.equal(2)
59-
expect(nodes[publisher + 1].subscriptions.has(topicB58Str)).to.be.true
60-
parentTopicCID = cid
61-
done()
62-
})
63-
})
57+
expect(topicNode).to.be.an.instanceof(TopicNode)
58+
expect(topicNode.subTopics.meta).to.be.an.instanceof(CID)
59+
topicNode.getCID((err, cid) => {
60+
expect(err).to.not.exist
61+
const topicB58Str = cid.toBaseEncodedString()
62+
expect(cid.equals(savedCID)).to.be.true
63+
// Topic and meta topic
64+
expect(nodes[publisher + 1].subscriptions.size).to.equal(2)
65+
expect(nodes[publisher + 1].subscriptions.has(topicB58Str)).to.be.true
66+
parentTopicCID = cid
67+
done()
68+
})
69+
}
70+
)
6471
})
6572

66-
it('creates a simple sub-topic', (done) => {
67-
nodes[publisher + 1].createTopic('test-subtopic', (err, savedCID, topicNode) => {
68-
expect(err).to.not.exist
69-
expect(topicNode).to.be.an.instanceof(TopicNode)
70-
expect(topicNode.subTopics.meta).to.be.an.instanceof(CID)
71-
topicNode.getCID((err, cid) => {
73+
it('creates a simple sub-topic', done => {
74+
nodes[publisher + 1].createTopic(
75+
'test-subtopic',
76+
(err, savedCID, topicNode) => {
7277
expect(err).to.not.exist
73-
const topicB58Str = cid.toBaseEncodedString()
74-
expect(cid.equals(savedCID)).to.be.true
75-
expect(nodes[publisher + 1].subscriptions.size).to.equal(4)
76-
expect(nodes[publisher + 1].subscriptions.has(topicB58Str)).to.be.true
77-
subTopicCID = cid
78-
done()
79-
})
80-
})
78+
expect(topicNode).to.be.an.instanceof(TopicNode)
79+
expect(topicNode.subTopics.meta).to.be.an.instanceof(CID)
80+
topicNode.getCID((err, cid) => {
81+
expect(err).to.not.exist
82+
const topicB58Str = cid.toBaseEncodedString()
83+
expect(cid.equals(savedCID)).to.be.true
84+
expect(nodes[publisher + 1].subscriptions.size).to.equal(4)
85+
expect(nodes[publisher + 1].subscriptions.has(topicB58Str)).to.be.true
86+
subTopicCID = cid
87+
done()
88+
})
89+
}
90+
)
8191
})
8292

83-
it('creates a new topic with a parent and sub-topic', (done) => {
84-
nodes[publisher].createTopic('test-2.0',
93+
it('creates a new topic with a parent and sub-topic', done => {
94+
nodes[publisher].createTopic(
95+
'test-2.0',
8596
{
8697
parent: parentTopicCID.toBaseEncodedString(),
8798
subTopics: {
8899
'test-subtopic': subTopicCID.toBaseEncodedString()
89100
}
90-
}, (err, savedCID, topicNode) => {
101+
},
102+
(err, savedCID, topicNode) => {
91103
expect(err).to.not.exist
92104
expect(topicNode).to.be.an.instanceof(TopicNode)
93105
expect(topicNode.subTopics.meta).to.be.an.instanceof(CID)
94106
expect(topicNode.parent.equals(parentTopicCID)).to.be.true
95-
expect(topicNode.subTopics['test-subtopic'].equals(subTopicCID)).to.be.true
107+
expect(topicNode.subTopics['test-subtopic'].equals(subTopicCID)).to.be
108+
.true
96109
topicNode.getCID((err, cid) => {
97110
expect(err).to.not.exist
98111
const topicB58Str = cid.toBaseEncodedString()
@@ -103,31 +116,37 @@ describe('multiple nodes', function () {
103116
topicCID = cid
104117
done()
105118
})
106-
})
119+
}
120+
)
107121
})
108122

109-
it(`subscribes ${subscriberNum} nodes to the created topic`, (done) => {
123+
it(`subscribes ${subscriberNum} nodes to the created topic`, done => {
110124
const topicB58Str = topicCID.toBaseEncodedString()
111125
const subscriberNodes = nodes.slice(subscriber, subscriberNum + subscriber)
112126

113-
each(subscriberNodes, (node, cb) => {
114-
node.subscribe(topicB58Str, (err, topicNode) => {
115-
expect(err).to.not.exist
116-
expect(topicNode).to.be.an.instanceof(TopicNode)
117-
expect(node.subscriptions.size).to.equal(1)
118-
expect(node.subscriptions.has(topicCID.toBaseEncodedString())).to.be.true
119-
expect(topicNode.serialize()).to.deep.equal(topic.serialize())
120-
121-
// Check tree
122-
const {parents} = node.me.trees.get(topicB58Str)
123-
expect(parents).to.have.lengthOf.above(0)
124-
expect(parents[0].trees.get(topicB58Str).children).to.include(node.me)
125-
cb()
126-
})
127-
}, done)
127+
each(
128+
subscriberNodes,
129+
(node, cb) => {
130+
node.subscribe(topicB58Str, (err, topicNode) => {
131+
expect(err).to.not.exist
132+
expect(topicNode).to.be.an.instanceof(TopicNode)
133+
expect(node.subscriptions.size).to.equal(1)
134+
expect(node.subscriptions.has(topicCID.toBaseEncodedString())).to.be
135+
.true
136+
expect(topicNode.serialize()).to.deep.equal(topic.serialize())
137+
138+
// Check tree
139+
const { parents } = node.me.trees.get(topicB58Str)
140+
expect(parents).to.have.lengthOf.above(0)
141+
expect(parents[0].trees.get(topicB58Str).children).to.include(node.me)
142+
cb()
143+
})
144+
},
145+
done
146+
)
128147
})
129148

130-
it('publishes a message from the non author node', (done) => {
149+
it('publishes a message from the non author node', done => {
131150
const topicB58Str = topicCID.toBaseEncodedString()
132151
const message = 'foobar'
133152
// Helper func to run all the expects
@@ -138,7 +157,7 @@ describe('multiple nodes', function () {
138157
}
139158
let firstEventNode
140159
// Event listener
141-
const listener = (eventNode) => {
160+
const listener = eventNode => {
142161
// Compare serializes of both events
143162
if (!firstEventNode) {
144163
firstEventNode = eventNode
@@ -153,44 +172,53 @@ describe('multiple nodes', function () {
153172
// Should match subscribed author
154173
expect(eventNode.author.isEqual(nodes[subscriber].me.info.id)).to.be.true
155174
// Should match topic author
156-
expect(eventNode.publisher.isEqual(nodes[publisher].me.info.id)).to.be.true
175+
expect(eventNode.publisher.isEqual(nodes[publisher].me.info.id)).to.be
176+
.true
157177

158178
checkAllDone()
159179
}
160180
// Setup event listeners
161181
nodes[subscriber].once(topicB58Str, listener)
162182
nodes[publisher].once(topicB58Str, listener)
163183

164-
nodes[subscriber].publish(topicB58Str, message, (err, eventCID, topicNode, eventNode) => {
165-
expect(err).to.not.exist
166-
expect(eventNode).to.be.an.instanceof(EventNode)
167-
expect(topicNode).to.be.an.instanceof(TopicNode)
168-
expect(eventNode.topicCID.equals(topicCID)).to.be.true
169-
expect(eventNode.payload.toString()).to.be.equal(message)
170-
expect(eventNode.author.isEqual(nodes[subscriber].me.info.id)).to.be.true
171-
// Should be a request to publish
172-
expect(eventCID).to.be.null
173-
expect(eventNode.isPublished).to.be.false
174-
175-
checkAllDone()
176-
})
184+
nodes[subscriber].publish(
185+
topicB58Str,
186+
message,
187+
(err, eventCID, topicNode, eventNode) => {
188+
expect(err).to.not.exist
189+
expect(eventNode).to.be.an.instanceof(EventNode)
190+
expect(topicNode).to.be.an.instanceof(TopicNode)
191+
expect(eventNode.topicCID.equals(topicCID)).to.be.true
192+
expect(eventNode.payload.toString()).to.be.equal(message)
193+
expect(eventNode.author.isEqual(nodes[subscriber].me.info.id)).to.be
194+
.true
195+
// Should be a request to publish
196+
expect(eventCID).to.be.null
197+
expect(eventNode.isPublished).to.be.false
198+
199+
checkAllDone()
200+
}
201+
)
177202
})
178203

179-
it('unsubscribe from the topic', (done) => {
204+
it('unsubscribe from the topic', done => {
180205
const topicB58Str = topicCID.toBaseEncodedString()
181-
nodes[subscriber].unsubscribe(topicB58Str, (err) => {
206+
nodes[subscriber].unsubscribe(topicB58Str, err => {
182207
expect(err).to.not.exist
183208

184209
expect(nodes[subscriber].subscriptions.size).to.equal(0)
185210

186211
eventually(() => {
187-
const topicChildren = nodes[publisher].me.trees.get(topicB58Str).children
188-
expect(topicChildren.find(peer => peer.info.id.isEqual(nodes[1].me.info.id))).to.not.exist
212+
const topicChildren = nodes[publisher].me.trees.get(topicB58Str)
213+
.children
214+
expect(
215+
topicChildren.find(peer => peer.info.id.isEqual(nodes[1].me.info.id))
216+
).to.not.exist
189217
}, done)
190218
})
191219
})
192220

193-
it('dissemination tress are cleaned up on connection close', (done) => {
221+
it('dissemination tress are cleaned up on connection close', done => {
194222
const topicB58Str = topicCID.toBaseEncodedString()
195223
const droppingNode = nodes[subscriber + 1]
196224
const droppingNodeId = droppingNode.me.info.id.toB58String()
@@ -211,10 +239,14 @@ describe('multiple nodes', function () {
211239
eventually(() => {
212240
// Check dropping node is not present in any tree
213241
childrenNodes.forEach(child => {
214-
expect(child.me.trees.get(topicB58Str).parents).to.not.include(child.peers.get(droppingNodeId))
242+
expect(child.me.trees.get(topicB58Str).parents).to.not.include(
243+
child.peers.get(droppingNodeId)
244+
)
215245
})
216246
parentNodes.forEach(parent => {
217-
expect(parent.me.trees.get(topicB58Str).children).to.not.include(parent.peers.get(droppingNodeId))
247+
expect(parent.me.trees.get(topicB58Str).children).to.not.include(
248+
parent.peers.get(droppingNodeId)
249+
)
218250
})
219251
}, done)
220252
})

‎test/test-node.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const DHT = require('libp2p-kad-dht')
77
const libp2p = require('libp2p')
88

99
class Node extends libp2p {
10-
constructor ({ peerInfo, peerBook }) {
10+
constructor({ peerInfo, peerBook }) {
1111
const modules = {
1212
transport: [TCP],
1313
streamMuxer: [spdy],

‎test/utils.js

+28-20
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,22 @@ const { waterfall, eachOfLimit, mapLimit } = require('async')
66

77
const Node = require('./test-node')
88

9-
function createNode (maddr, callback) {
10-
waterfall([
11-
(cb) => PeerId.create({ bits: 1024 }, cb),
12-
(id, cb) => PeerInfo.create(id, cb),
13-
(peerInfo, cb) => {
14-
peerInfo.multiaddrs.add(maddr)
15-
cb(null, new Node({ peerInfo }))
16-
},
17-
(node, cb) => node.start((err) => cb(err, node))
18-
], callback)
9+
function createNode(maddr, callback) {
10+
waterfall(
11+
[
12+
cb => PeerId.create({ bits: 1024 }, cb),
13+
(id, cb) => PeerInfo.create(id, cb),
14+
(peerInfo, cb) => {
15+
peerInfo.multiaddrs.add(maddr)
16+
cb(null, new Node({ peerInfo }))
17+
},
18+
(node, cb) => node.start(err => cb(err, node))
19+
],
20+
callback
21+
)
1922
}
2023

21-
function createNodes (nodeNumber, callback) {
24+
function createNodes(nodeNumber, callback) {
2225
const maddrs = []
2326
for (let i = 0; i < nodeNumber; i++) {
2427
maddrs.push('/ip4/127.0.0.1/tcp/0')
@@ -27,18 +30,23 @@ function createNodes (nodeNumber, callback) {
2730
if (err) return callback(err)
2831

2932
// Connect nodes sequentially, essentially creating a ring
30-
eachOfLimit(nodes, 10, (node, index, cb) => {
31-
let nextNode = nodes[index + 1]
32-
// End of node list
33-
if (!nextNode) return cb()
34-
node.dial(nextNode.peerInfo, cb)
35-
}, (err) => {
36-
callback(err, nodes)
37-
})
33+
eachOfLimit(
34+
nodes,
35+
10,
36+
(node, index, cb) => {
37+
let nextNode = nodes[index + 1]
38+
// End of node list
39+
if (!nextNode) return cb()
40+
node.dial(nextNode.peerInfo, cb)
41+
},
42+
err => {
43+
callback(err, nodes)
44+
}
45+
)
3846
})
3947
}
4048

41-
function eventually (func, callback) {
49+
function eventually(func, callback) {
4250
const polling = setInterval(() => {
4351
try {
4452
func()

0 commit comments

Comments
 (0)
Please sign in to comment.