diff --git a/Cargo.toml b/Cargo.toml index 08bdcbc6..3ea268d2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,3 +62,7 @@ env_logger = "0.10.0" tracing-subscriber = { version = "0.3.17", features = ["fmt", "env-filter"] } itertools = "0.11.0" autosurgeon = "0.8.0" +bolero = { version = "0.10.0", features = ["arbitrary"] } +arbitrary = { version = "1.3.1", features = ["derive"] } +bolero-generator = { version = "0.10.0", features = ["arbitrary"] } +rand = "0.8.5" diff --git a/examples/distributed_bakery.rs b/examples/distributed_bakery.rs index f957c2de..0f1be51f 100644 --- a/examples/distributed_bakery.rs +++ b/examples/distributed_bakery.rs @@ -399,7 +399,7 @@ async fn main() { } // The initial document. - let doc_handle = repo_handle.new_document(); + let doc_handle = repo_handle.new_document().await; doc_handle.with_doc_mut(|doc| { let mut tx = doc.transaction(); reconcile(&mut tx, &bakery).unwrap(); @@ -426,7 +426,11 @@ async fn main() { } assert!(doc_id.is_some()); // Get the document. - repo_handle.request_document(doc_id.unwrap()).await.unwrap() + repo_handle + .request_document(doc_id.unwrap()) + .await + .unwrap() + .expect("document not found") }; // Shutdown signals for background tasks. diff --git a/examples/tcp-example.rs b/examples/tcp-example.rs index f6ef2dab..6a72215f 100644 --- a/examples/tcp-example.rs +++ b/examples/tcp-example.rs @@ -43,7 +43,7 @@ async fn request_doc(State(state): State>, Json(document_id): Json #[debug_handler] async fn new_doc(State(state): State>) -> Json { - let doc_handle = state.repo_handle.new_document(); + let doc_handle = state.repo_handle.new_document().await; let our_id = state.repo_handle.get_repo_id(); doc_handle.with_doc_mut(|doc| { let mut tx = doc.transaction(); @@ -59,7 +59,12 @@ async fn get_doc( State(state): State>, Path(doc_id): Path, ) -> (StatusCode, Json) { - let doc_handle = state.repo_handle.request_document(doc_id).await.unwrap(); + let doc_handle = state + .repo_handle + .request_document(doc_id) + .await + .unwrap() + .expect("document not found"); let value = doc_handle.with_doc(|doc| serde_json::to_value(AutoSerde::from(doc)).unwrap()); (StatusCode::OK, Json(value)) } @@ -224,7 +229,8 @@ async fn main() { let doc = repo_handle_clone .request_document(doc_id.clone()) .await - .unwrap(); + .unwrap() + .expect("document not found"); doc.with_doc(|doc| { let val = doc .get(automerge::ROOT, "repo_id") diff --git a/interop-test-server/package-lock.json b/interop-test-server/package-lock.json index f39f7af0..da94253e 100644 --- a/interop-test-server/package-lock.json +++ b/interop-test-server/package-lock.json @@ -9,8 +9,8 @@ "version": "1.0.0", "license": "ISC", "dependencies": { - "@automerge/automerge-repo": "^0.1.2", - "@automerge/automerge-repo-network-websocket": "^0.1.2", + "@automerge/automerge-repo": "^1.0.19", + "@automerge/automerge-repo-network-websocket": "^1.0.19", "@types/ws": "^8.5.5", "express": "^4.18.2", "ws": "^8.13.0" @@ -23,56 +23,47 @@ } }, "node_modules/@automerge/automerge": { - "version": "2.1.0-alpha.8", - "resolved": "https://registry.npmjs.org/@automerge/automerge/-/automerge-2.1.0-alpha.8.tgz", - "integrity": "sha512-KyeHWgw8QxZt/Txgfjm7OMFRPomcsYgRoywWpr0l+AzUtQb0mpLi50bWX85dTOIBpJR6UQ8cLotACXY8qJIZHw==", - "peer": true, + "version": "2.1.9", + "resolved": "https://registry.npmjs.org/@automerge/automerge/-/automerge-2.1.9.tgz", + "integrity": "sha512-KcSJaCGDaWU7wuqBoM8pmrnFhsp0g7Bs6Smf0HtaBLS7BjtRK9l/PQNXtSEwAdsfoWhA8XjYDkIpOpKMY1pmpA==", "dependencies": { - "@automerge/automerge-wasm": "^0.2.7", + "@automerge/automerge-wasm": "0.7.0", "uuid": "^9.0.0" } }, "node_modules/@automerge/automerge-repo": { - "version": "0.1.2", - "resolved": "https://registry.npmjs.org/@automerge/automerge-repo/-/automerge-repo-0.1.2.tgz", - "integrity": "sha512-PODfuP8vRnhTnFIsLPua9DyONxeGiah/zceCKl7XDYHlrsojgyQwUI9Tr0m/4g4Sm0fGRPfOH4NrCEqkZ9nHUQ==", + "version": "1.0.19", + "resolved": "https://registry.npmjs.org/@automerge/automerge-repo/-/automerge-repo-1.0.19.tgz", + "integrity": "sha512-IcfX9xkbk+YY7IhG+ZCoUVccou2MmHGXvu/XGOICFmUIaG1mkQAPzZU2u5r2JUX0cpr/2PcIPOeCletJMssnWw==", "dependencies": { + "@automerge/automerge": "^2.1.7", + "bs58check": "^3.0.1", "cbor-x": "^1.3.0", "debug": "^4.3.4", - "eventemitter3": "^4.0.7", + "eventemitter3": "^5.0.1", + "fast-sha256": "^1.3.0", "tiny-typed-emitter": "^2.1.0", "ts-node": "^10.9.1", - "uuid": "^8.3.2", + "uuid": "^9.0.0", "xstate": "^4.37.0" - }, - "peerDependencies": { - "@automerge/automerge": "^2.1.0-alpha.7" } }, "node_modules/@automerge/automerge-repo-network-websocket": { - "version": "0.1.2", - "resolved": "https://registry.npmjs.org/@automerge/automerge-repo-network-websocket/-/automerge-repo-network-websocket-0.1.2.tgz", - "integrity": "sha512-mFVKOcLM6IQT95yuQ62OeAWWcb1s0zHavlyt6EKLCHxY2gEsN2s4JUaqVZrzH1MKdtITUXpoXfGTVW6rLOb9NQ==", + "version": "1.0.19", + "resolved": "https://registry.npmjs.org/@automerge/automerge-repo-network-websocket/-/automerge-repo-network-websocket-1.0.19.tgz", + "integrity": "sha512-he/cRcZ9byNHZN0ws/c7rwjT6ocbAdPr2EGvTUp6QeIhRlcICqyTuIJjUSG0MyxckO8FbqcBKnfsiaTRCx/b3Q==", "dependencies": { - "@automerge/automerge-repo": "^0.1.2", + "@automerge/automerge-repo": "^1.0.19", "cbor-x": "^1.3.0", - "eventemitter3": "^4.0.7", - "isomorphic-ws": "^5.0.0" - } - }, - "node_modules/@automerge/automerge-repo/node_modules/uuid": { - "version": "8.3.2", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", - "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==", - "bin": { - "uuid": "dist/bin/uuid" + "eventemitter3": "^5.0.1", + "isomorphic-ws": "^5.0.0", + "ws": "^8.7.0" } }, "node_modules/@automerge/automerge-wasm": { - "version": "0.2.7", - "resolved": "https://registry.npmjs.org/@automerge/automerge-wasm/-/automerge-wasm-0.2.7.tgz", - "integrity": "sha512-z0LTg8+6kowLOhv7AO3sIU2l9SL9EOiO/eMPOgujQZtbNjiV74wOIlYptHYSl5lOg1dvUDFcor6sMNq9+l6cWA==", - "peer": true + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/@automerge/automerge-wasm/-/automerge-wasm-0.7.0.tgz", + "integrity": "sha512-nql3kAO0xAYslEfrr2uTQe595fasKsOKxXEkNpilP97F3T0PD7n0YP9MirLiAytNRBXYccMayhN+hP5Af4xprA==" }, "node_modules/@cbor-extract/cbor-extract-darwin-arm64": { "version": "2.1.1", @@ -179,6 +170,17 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "node_modules/@noble/hashes": { + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.3.2.tgz", + "integrity": "sha512-MVC8EAQp7MvEcm30KWENFjgR+Mkmf+D189XJTkFIlwohU5hcBbn1ZkKq7KVTi2Hme3PMGF390DaL52beVrIihQ==", + "engines": { + "node": ">= 16" + }, + "funding": { + "url": "https://paulmillr.com/funding/" + } + }, "node_modules/@tsconfig/node10": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.9.tgz", @@ -333,6 +335,11 @@ "dev": true, "peer": true }, + "node_modules/base-x": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/base-x/-/base-x-4.0.0.tgz", + "integrity": "sha512-FuwxlW4H5kh37X/oW59pwTzzTKRzfrrQwhmyspRM7swOEZcHtDZSCt45U6oKgtuFE+WYPblePMVIPR4RZrh/hw==" + }, "node_modules/binary-extensions": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.2.0.tgz", @@ -409,6 +416,23 @@ "dev": true, "peer": true }, + "node_modules/bs58": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/bs58/-/bs58-5.0.0.tgz", + "integrity": "sha512-r+ihvQJvahgYT50JD05dyJNKlmmSlMoOGwn1lCcEzanPglg7TxYjioQUYehQ9mAR/+hOSd2jRc/Z2y5UxBymvQ==", + "dependencies": { + "base-x": "^4.0.0" + } + }, + "node_modules/bs58check": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/bs58check/-/bs58check-3.0.1.tgz", + "integrity": "sha512-hjuuJvoWEybo7Hn/0xOrczQKKEKD63WguEjlhLExYs2wUBcebDC1jDNK17eEAD2lYfw82d5ASC1d7K3SWszjaQ==", + "dependencies": { + "@noble/hashes": "^1.2.0", + "bs58": "^5.0.0" + } + }, "node_modules/buffer-from": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.2.tgz", @@ -724,9 +748,9 @@ } }, "node_modules/eventemitter3": { - "version": "4.0.7", - "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", - "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" }, "node_modules/express": { "version": "4.18.2", @@ -782,6 +806,11 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==" }, + "node_modules/fast-sha256": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/fast-sha256/-/fast-sha256-1.3.0.tgz", + "integrity": "sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ==" + }, "node_modules/fill-range": { "version": "7.0.1", "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", @@ -1991,10 +2020,13 @@ } }, "node_modules/uuid": { - "version": "9.0.0", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.0.tgz", - "integrity": "sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==", - "peer": true, + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], "bin": { "uuid": "dist/bin/uuid" } @@ -2152,52 +2184,47 @@ }, "dependencies": { "@automerge/automerge": { - "version": "2.1.0-alpha.8", - "resolved": "https://registry.npmjs.org/@automerge/automerge/-/automerge-2.1.0-alpha.8.tgz", - "integrity": "sha512-KyeHWgw8QxZt/Txgfjm7OMFRPomcsYgRoywWpr0l+AzUtQb0mpLi50bWX85dTOIBpJR6UQ8cLotACXY8qJIZHw==", - "peer": true, + "version": "2.1.9", + "resolved": "https://registry.npmjs.org/@automerge/automerge/-/automerge-2.1.9.tgz", + "integrity": "sha512-KcSJaCGDaWU7wuqBoM8pmrnFhsp0g7Bs6Smf0HtaBLS7BjtRK9l/PQNXtSEwAdsfoWhA8XjYDkIpOpKMY1pmpA==", "requires": { - "@automerge/automerge-wasm": "^0.2.7", + "@automerge/automerge-wasm": "0.7.0", "uuid": "^9.0.0" } }, "@automerge/automerge-repo": { - "version": "0.1.2", - "resolved": "https://registry.npmjs.org/@automerge/automerge-repo/-/automerge-repo-0.1.2.tgz", - "integrity": "sha512-PODfuP8vRnhTnFIsLPua9DyONxeGiah/zceCKl7XDYHlrsojgyQwUI9Tr0m/4g4Sm0fGRPfOH4NrCEqkZ9nHUQ==", + "version": "1.0.19", + "resolved": "https://registry.npmjs.org/@automerge/automerge-repo/-/automerge-repo-1.0.19.tgz", + "integrity": "sha512-IcfX9xkbk+YY7IhG+ZCoUVccou2MmHGXvu/XGOICFmUIaG1mkQAPzZU2u5r2JUX0cpr/2PcIPOeCletJMssnWw==", "requires": { + "@automerge/automerge": "^2.1.7", + "bs58check": "^3.0.1", "cbor-x": "^1.3.0", "debug": "^4.3.4", - "eventemitter3": "^4.0.7", + "eventemitter3": "^5.0.1", + "fast-sha256": "^1.3.0", "tiny-typed-emitter": "^2.1.0", "ts-node": "^10.9.1", - "uuid": "^8.3.2", + "uuid": "^9.0.0", "xstate": "^4.37.0" - }, - "dependencies": { - "uuid": { - "version": "8.3.2", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-8.3.2.tgz", - "integrity": "sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==" - } } }, "@automerge/automerge-repo-network-websocket": { - "version": "0.1.2", - "resolved": "https://registry.npmjs.org/@automerge/automerge-repo-network-websocket/-/automerge-repo-network-websocket-0.1.2.tgz", - "integrity": "sha512-mFVKOcLM6IQT95yuQ62OeAWWcb1s0zHavlyt6EKLCHxY2gEsN2s4JUaqVZrzH1MKdtITUXpoXfGTVW6rLOb9NQ==", + "version": "1.0.19", + "resolved": "https://registry.npmjs.org/@automerge/automerge-repo-network-websocket/-/automerge-repo-network-websocket-1.0.19.tgz", + "integrity": "sha512-he/cRcZ9byNHZN0ws/c7rwjT6ocbAdPr2EGvTUp6QeIhRlcICqyTuIJjUSG0MyxckO8FbqcBKnfsiaTRCx/b3Q==", "requires": { - "@automerge/automerge-repo": "^0.1.2", + "@automerge/automerge-repo": "^1.0.19", "cbor-x": "^1.3.0", - "eventemitter3": "^4.0.7", - "isomorphic-ws": "^5.0.0" + "eventemitter3": "^5.0.1", + "isomorphic-ws": "^5.0.0", + "ws": "^8.7.0" } }, "@automerge/automerge-wasm": { - "version": "0.2.7", - "resolved": "https://registry.npmjs.org/@automerge/automerge-wasm/-/automerge-wasm-0.2.7.tgz", - "integrity": "sha512-z0LTg8+6kowLOhv7AO3sIU2l9SL9EOiO/eMPOgujQZtbNjiV74wOIlYptHYSl5lOg1dvUDFcor6sMNq9+l6cWA==", - "peer": true + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/@automerge/automerge-wasm/-/automerge-wasm-0.7.0.tgz", + "integrity": "sha512-nql3kAO0xAYslEfrr2uTQe595fasKsOKxXEkNpilP97F3T0PD7n0YP9MirLiAytNRBXYccMayhN+hP5Af4xprA==" }, "@cbor-extract/cbor-extract-darwin-arm64": { "version": "2.1.1", @@ -2262,6 +2289,11 @@ "@jridgewell/sourcemap-codec": "^1.4.10" } }, + "@noble/hashes": { + "version": "1.3.2", + "resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.3.2.tgz", + "integrity": "sha512-MVC8EAQp7MvEcm30KWENFjgR+Mkmf+D189XJTkFIlwohU5hcBbn1ZkKq7KVTi2Hme3PMGF390DaL52beVrIihQ==" + }, "@tsconfig/node10": { "version": "1.0.9", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.9.tgz", @@ -2386,6 +2418,11 @@ "dev": true, "peer": true }, + "base-x": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/base-x/-/base-x-4.0.0.tgz", + "integrity": "sha512-FuwxlW4H5kh37X/oW59pwTzzTKRzfrrQwhmyspRM7swOEZcHtDZSCt45U6oKgtuFE+WYPblePMVIPR4RZrh/hw==" + }, "binary-extensions": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.2.0.tgz", @@ -2454,6 +2491,23 @@ "dev": true, "peer": true }, + "bs58": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/bs58/-/bs58-5.0.0.tgz", + "integrity": "sha512-r+ihvQJvahgYT50JD05dyJNKlmmSlMoOGwn1lCcEzanPglg7TxYjioQUYehQ9mAR/+hOSd2jRc/Z2y5UxBymvQ==", + "requires": { + "base-x": "^4.0.0" + } + }, + "bs58check": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/bs58check/-/bs58check-3.0.1.tgz", + "integrity": "sha512-hjuuJvoWEybo7Hn/0xOrczQKKEKD63WguEjlhLExYs2wUBcebDC1jDNK17eEAD2lYfw82d5ASC1d7K3SWszjaQ==", + "requires": { + "@noble/hashes": "^1.2.0", + "bs58": "^5.0.0" + } + }, "buffer-from": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.2.tgz", @@ -2682,9 +2736,9 @@ "integrity": "sha512-aIL5Fx7mawVa300al2BnEE4iNvo1qETxLrPI/o05L7z6go7fCw1J6EQmbK4FmJ2AS7kgVF/KEZWufBfdClMcPg==" }, "eventemitter3": { - "version": "4.0.7", - "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", - "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.1.tgz", + "integrity": "sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==" }, "express": { "version": "4.18.2", @@ -2739,6 +2793,11 @@ } } }, + "fast-sha256": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/fast-sha256/-/fast-sha256-1.3.0.tgz", + "integrity": "sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ==" + }, "fill-range": { "version": "7.0.1", "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", @@ -3613,10 +3672,9 @@ "integrity": "sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA==" }, "uuid": { - "version": "9.0.0", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.0.tgz", - "integrity": "sha512-MXcSTerfPa4uqyzStbRoTgt5XIe3x5+42+q1sDuy3R5MDk66URdLMOZe5aPX/SQd+kuYAh0FdP/pO28IkQyTeg==", - "peer": true + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==" }, "v8-compile-cache-lib": { "version": "3.0.1", diff --git a/interop-test-server/package.json b/interop-test-server/package.json index 834e452d..152ec8da 100644 --- a/interop-test-server/package.json +++ b/interop-test-server/package.json @@ -10,8 +10,8 @@ "build": "tsc" }, "dependencies": { - "@automerge/automerge-repo": "^0.1.2", - "@automerge/automerge-repo-network-websocket": "^0.1.2", + "@automerge/automerge-repo": "^1.0.19", + "@automerge/automerge-repo-network-websocket": "^1.0.19", "@types/ws": "^8.5.5", "express": "^4.18.2", "ws": "^8.13.0" diff --git a/interop-test-server/server.ts b/interop-test-server/server.ts index 2b0202dd..a17fd590 100644 --- a/interop-test-server/server.ts +++ b/interop-test-server/server.ts @@ -1,13 +1,12 @@ import express from "express" import { WebSocketServer } from "ws" -import { DocumentId, Repo, StorageAdapter } from "@automerge/automerge-repo" +import { Repo } from "@automerge/automerge-repo" import { NodeWSServerAdapter } from "@automerge/automerge-repo-network-websocket" class Server { #socket: WebSocketServer #server: ReturnType - #storage: InMemoryStorageAdapter #repo: Repo @@ -17,11 +16,9 @@ class Server { const PORT = port const app = express() app.use(express.static("public")) - this.#storage = new InMemoryStorageAdapter() const config = { network: [new NodeWSServerAdapter(this.#socket)], - storage: this.#storage, /** @ts-ignore @type {(import("automerge-repo").PeerId)} */ peerId: `storage-server` as PeerId, // Since this is a server, we don't share generously — meaning we only sync documents they already @@ -48,39 +45,11 @@ class Server { } close() { - this.#storage.log() this.#socket.close() this.#server.close() } } -class InMemoryStorageAdapter implements StorageAdapter { - #data: Record = {} - - load(docId: DocumentId) { - return new Promise(resolve => - resolve(this.#data[docId] || null) - ) - } - - save(docId: DocumentId, binary: Uint8Array) { - console.log(`saving ${docId}`) - this.#data[docId] = binary - } - - remove(docId: DocumentId) { - delete this.#data[docId] - } - - keys() { - return Object.keys(this.#data) - } - - log() { - console.log(JSON.stringify(this.#data)) - } -} - const port = process.argv[2] ? parseInt(process.argv[2]) : 8080 const server = new Server(port) diff --git a/src/interfaces.rs b/src/interfaces.rs index 8c74bfd5..cb629641 100644 --- a/src/interfaces.rs +++ b/src/interfaces.rs @@ -2,10 +2,15 @@ use futures::future::BoxFuture; use serde::{Deserialize, Serialize}; use std::{ fmt::{Display, Formatter}, + num::NonZeroU64, str::FromStr, }; +#[cfg(test)] +use arbitrary::Arbitrary; + #[derive(Debug, Eq, Hash, PartialEq, Clone)] +#[cfg_attr(test, derive(Arbitrary))] pub struct RepoId(pub String); impl Display for RepoId { @@ -21,6 +26,7 @@ impl<'a> From<&'a str> for RepoId { } #[derive(Eq, Hash, PartialEq, Clone, Deserialize, Serialize)] +#[cfg_attr(test, derive(Arbitrary))] pub struct DocumentId([u8; 16]); impl DocumentId { @@ -37,7 +43,7 @@ impl DocumentId { impl AsRef<[u8]> for DocumentId { fn as_ref(&self) -> &[u8] { - self.0.as_ref() + &self.0 } } @@ -102,6 +108,7 @@ impl Display for NetworkError { } #[derive(Clone, PartialEq)] +#[cfg_attr(test, derive(Arbitrary))] pub enum RepoMessage { /// A sync message for a particular document Sync { @@ -110,12 +117,30 @@ pub enum RepoMessage { document_id: DocumentId, message: Vec, }, + /// A request to begin sync for a document the sender does not have + Request { + sender_id: RepoId, + target_id: RepoId, + /// The document ID to request + document_id: DocumentId, + /// The initial sync message + sync_message: Vec, + }, + /// Notify a peer who has requsted a document that we don't have it and + /// none of our peers have it either. + Unavailable { + document_id: DocumentId, + sender_id: RepoId, + target_id: RepoId, + }, /// An ephemeral message for a particular document. Ephemeral { from_repo_id: RepoId, to_repo_id: RepoId, document_id: DocumentId, message: Vec, + session_id: EphemeralSessionId, + count: NonZeroU64, }, } @@ -132,15 +157,36 @@ impl std::fmt::Debug for RepoMessage { "Sync {{ from_repo_id: {:?}, to_repo_id: {:?}, document_id: {:?} }}", from_repo_id, to_repo_id, document_id ), + RepoMessage::Request { + sender_id, + target_id, + document_id, + sync_message: _, + } => write!( + f, + "Request {{ sender_id: {:?}, target_id: {:?}, document_id: {:?} }}", + sender_id, target_id, document_id + ), RepoMessage::Ephemeral { from_repo_id, to_repo_id, document_id, message: _, + session_id, + count, } => write!( f, - "Ephemeral {{ from_repo_id: {:?}, to_repo_id: {:?}, document_id: {:?} }}", - from_repo_id, to_repo_id, document_id + "Ephemeral {{ from_repo_id: {:?}, to_repo_id: {:?}, document_id: {:?}, session_id: {:?}, count: {:?} }}", + from_repo_id, to_repo_id, document_id, session_id, count, + ), + RepoMessage::Unavailable { + document_id, + sender_id, + target_id, + } => write!( + f, + "Unavailable {{ document_id: {:?}, sender_id: {:?}, target_id: {:?} }}", + document_id, sender_id, target_id ), } } @@ -151,18 +197,27 @@ impl std::fmt::Debug for RepoMessage { /// The multi-doc sync protocol works like this: /// /// 1. The connecting peer sends a `Message::Join` containing its repo ID -/// 2. The accepting peer sends a `Message::Joined` containing its repo ID +/// 2. The accepting peer sends a `Message::Peer` containing its repo ID /// 3. Sync message exchange can proceed, by exchanging Message::Repo(_). -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] +#[cfg_attr(test, derive(Arbitrary))] pub enum Message { /// Sent by the connecting peer on opening a connection to tell the other /// end their repo ID - Join(RepoId), + Join { + sender: RepoId, + supported_protocol_versions: Vec, + }, /// Sent by the accepting peer after having received [`Self::Join`] to tell the /// connecting peer their repo ID. - Peer(RepoId), + Peer { + sender: RepoId, + selected_protocol_version: ProtocolVersion, + }, /// A repo message for a particular document Repo(RepoMessage), + /// An error report + Error { message: String }, } /// Errors used by storage. @@ -193,3 +248,66 @@ pub trait Storage: Send { _full_doc: Vec, ) -> BoxFuture<'static, Result<(), StorageError>>; } + +#[derive(Debug, Clone, PartialEq)] +#[cfg_attr(test, derive(Arbitrary))] +pub struct EphemeralSessionId(String); + +impl<'a> From<&'a str> for EphemeralSessionId { + fn from(s: &'a str) -> Self { + Self(s.to_string()) + } +} + +impl AsRef for EphemeralSessionId { + fn as_ref(&self) -> &str { + self.0.as_ref() + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum ProtocolVersion { + V1, + Other(String), +} + +#[cfg(test)] +impl<'a> Arbitrary<'a> for ProtocolVersion { + fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result { + let is_v1 = bool::arbitrary(u)?; + if is_v1 { + Ok(ProtocolVersion::V1) + } else { + let s = String::arbitrary(u)?; + if s == "1" { + Ok(ProtocolVersion::Other("2".to_string())) + } else { + Ok(s.into()) + } + } + } +} + +impl From for ProtocolVersion { + fn from(s: String) -> Self { + match s.as_str() { + "1" => ProtocolVersion::V1, + _ => ProtocolVersion::Other(s), + } + } +} + +impl From for String { + fn from(p: ProtocolVersion) -> Self { + p.as_ref().to_string() + } +} + +impl AsRef for ProtocolVersion { + fn as_ref(&self) -> &str { + match self { + ProtocolVersion::V1 => "1", + ProtocolVersion::Other(s) => s.as_ref(), + } + } +} diff --git a/src/message.rs b/src/message.rs index de86a62c..6eeb7a8b 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,4 +1,11 @@ -use crate::{DocumentId, Message, RepoId, RepoMessage}; +use std::num::NonZeroU64; + +use minicbor::encode::Write; + +use crate::{ + interfaces::{EphemeralSessionId, ProtocolVersion}, + DocumentId, Message, RepoId, RepoMessage, +}; impl Message { pub fn decode(data: &[u8]) -> Result { @@ -7,7 +14,12 @@ impl Message { let mut target_id: Option = None; let mut document_id: Option = None; let mut type_name: Option<&str> = None; - let mut message: Option> = None; + let mut data: Option> = None; + let mut ephemeral_session: Option = None; + let mut ephemeral_count: Option = None; + let mut err_message: Option = None; + let mut supported_protocol_versions: Option> = None; + let mut selected_protocol_version: Option = None; let len = decoder.map()?.ok_or(DecodeError::MissingLen)?; for _ in 0..len { let k = decoder.str()?; @@ -15,47 +27,87 @@ impl Message { //match decoder.str()? { "senderId" => sender_id = Some(decoder.str()?.into()), "targetId" => target_id = Some(decoder.str()?.into()), - "channelId" => { + "documentId" => { if decoder.probe().str().is_ok() { - let doc_str = decoder.str()?; - if doc_str == "sync" { - // automerge-repo-network-websocket encodes the channel id as "sync" - // for join messages, we just ignore this - continue; - } + let doc_id_str = decoder.str()?; document_id = Some( - doc_str + doc_id_str .parse() .map_err(|_| DecodeError::InvalidDocumentId)?, ); + } else { + document_id = Some( + DocumentId::try_from(decoder.bytes()?.to_vec()) + .map_err(|_| DecodeError::InvalidDocumentId)?, + ) } } "type" => type_name = Some(decoder.str()?), - "message" => { - message = { - // automerge-repo-network-websocket encodes the message using CBOR tag 64, - // which is a "uint8 typed array". We have to consume this tag or minicbor - // chokes. - decoder.tag()?; - Some(decoder.bytes()?.to_vec()) + "data" => data = Some(decoder.bytes()?.to_vec()), + "sessionId" => ephemeral_session = Some(decoder.str()?.into()), + "count" => { + ephemeral_count = Some( + NonZeroU64::new(decoder.u64()?) + .ok_or(DecodeError::NonPositiveEphemeralCount)?, + ) + } + "selectedProtocolVersion" => { + selected_protocol_version = { + let raw = decoder.str()?; + Some(ProtocolVersion::from(raw.to_string())) } } + "supportedProtocolVersions" => { + let seq = decoder.array_iter::()?; + supported_protocol_versions = Some( + seq.map(|raw| raw.map(ProtocolVersion::from)) + .collect::, _>>()?, + ) + } + "message" => err_message = Some(decoder.str()?.into()), _ => decoder.skip()?, } } match type_name { None => Err(DecodeError::MissingType), - Some("join") => Ok(Self::Join(sender_id.ok_or(DecodeError::MissingSenderId)?)), - // Some JS implementations are a bit confused about which field contains the message. - // This code is compatible with either implementation. - // See https://github.com/automerge/automerge-repo/pull/111 - Some("sync") | Some("message") => Ok(Self::Repo(RepoMessage::Sync { + Some("join") => Ok(Self::Join { + sender: sender_id.ok_or(DecodeError::MissingSenderId)?, + supported_protocol_versions: supported_protocol_versions + .ok_or(DecodeError::MissingSupportedProtocolVersions)?, + }), + Some("peer") => Ok(Self::Peer { + sender: sender_id.ok_or(DecodeError::MissingSenderId)?, + selected_protocol_version: selected_protocol_version + .ok_or(DecodeError::MissingSelectedProtocolVersion)?, + }), + Some("sync") => Ok(Self::Repo(RepoMessage::Sync { + from_repo_id: sender_id.ok_or(DecodeError::MissingSenderId)?, + to_repo_id: target_id.ok_or(DecodeError::MissingTargetId)?, + document_id: document_id.ok_or(DecodeError::MissingDocumentId)?, + message: data.ok_or(DecodeError::MissingData)?, + })), + Some("ephemeral") => Ok(Self::Repo(RepoMessage::Ephemeral { from_repo_id: sender_id.ok_or(DecodeError::MissingSenderId)?, to_repo_id: target_id.ok_or(DecodeError::MissingTargetId)?, document_id: document_id.ok_or(DecodeError::MissingDocumentId)?, - message: message.ok_or(DecodeError::MissingData)?, + session_id: ephemeral_session.ok_or(DecodeError::MissingEphemeralSession)?, + count: ephemeral_count.ok_or(DecodeError::MissingEphemeralCount)?, + message: data.ok_or(DecodeError::MissingData)?, + })), + Some("request") => Ok(Self::Repo(RepoMessage::Request { + sender_id: sender_id.ok_or(DecodeError::MissingSenderId)?, + target_id: target_id.ok_or(DecodeError::MissingTargetId)?, + document_id: document_id.ok_or(DecodeError::MissingDocumentId)?, + sync_message: data.ok_or(DecodeError::MissingData)?, })), - Some("peer") => Ok(Self::Peer(sender_id.ok_or(DecodeError::MissingSenderId)?)), + Some("doc-unavailable") => Ok(Self::Repo(RepoMessage::Unavailable { + document_id: document_id.ok_or(DecodeError::MissingDocumentId)?, + sender_id: sender_id.ok_or(DecodeError::MissingSenderId)?, + target_id: target_id.ok_or(DecodeError::MissingTargetId)?, + })), + Some("error") => Ok(Self::Error { + message: err_message.ok_or(DecodeError::MissingErrorMessage)?, + }), Some(other) => Err(DecodeError::UnknownType(other.to_string())), } } @@ -64,14 +116,52 @@ impl Message { let out: Vec = Vec::new(); let mut encoder = minicbor::Encoder::new(out); match self { - Self::Join(repo_id) => { + Self::Join { + sender, + supported_protocol_versions, + } => { encoder.map(3).unwrap(); encoder.str("type").unwrap(); encoder.str("join").unwrap(); encoder.str("senderId").unwrap(); + encoder.str(sender.0.as_str()).unwrap(); + encoder.str("supportedProtocolVersions").unwrap(); + encoder + .array(supported_protocol_versions.len() as u64) + .unwrap(); + for version in supported_protocol_versions { + encoder.str(version.as_ref()).unwrap(); + } + } + Self::Peer { + sender: repo_id, + selected_protocol_version, + } => { + encoder.map(3).unwrap(); + encoder.str("type").unwrap(); + encoder.str("peer").unwrap(); + encoder.str("selectedProtocolVersion").unwrap(); + encoder.str(selected_protocol_version.as_ref()).unwrap(); + encoder.str("senderId").unwrap(); encoder.str(repo_id.0.as_str()).unwrap(); - encoder.str("channelId").unwrap(); - encoder.str("sync").unwrap(); + } + Self::Repo(RepoMessage::Request { + sender_id, + target_id, + document_id, + sync_message, + }) => { + encoder.map(5).unwrap(); + encoder.str("type").unwrap(); + encoder.str("request").unwrap(); + encoder.str("senderId").unwrap(); + encoder.str(sender_id.0.as_str()).unwrap(); + encoder.str("targetId").unwrap(); + encoder.str(target_id.0.as_str()).unwrap(); + encoder.str("documentId").unwrap(); + encode_doc_id(&mut encoder, document_id); + encoder.str("data").unwrap(); + encoder.bytes(sync_message.as_slice()).unwrap(); } Self::Repo(RepoMessage::Sync { from_repo_id, @@ -81,30 +171,74 @@ impl Message { }) => { encoder.map(5).unwrap(); encoder.str("type").unwrap(); - encoder.str("message").unwrap(); + encoder.str("sync").unwrap(); encoder.str("senderId").unwrap(); encoder.str(from_repo_id.0.as_str()).unwrap(); encoder.str("targetId").unwrap(); encoder.str(to_repo_id.0.as_str()).unwrap(); - encoder.str("channelId").unwrap(); - encoder.str(document_id.as_uuid_str().as_str()).unwrap(); - encoder.str("message").unwrap(); - encoder.tag(minicbor::data::Tag::Unassigned(64)).unwrap(); + encoder.str("documentId").unwrap(); + encode_doc_id(&mut encoder, document_id); + encoder.str("data").unwrap(); encoder.bytes(message.as_slice()).unwrap(); } - Self::Peer(repo_id) => { - encoder.map(2).unwrap(); + Self::Repo(RepoMessage::Ephemeral { + from_repo_id, + to_repo_id, + document_id, + message, + session_id, + count, + }) => { + encoder.map(7).unwrap(); encoder.str("type").unwrap(); - encoder.str("peer").unwrap(); + encoder.str("ephemeral").unwrap(); encoder.str("senderId").unwrap(); - encoder.str(repo_id.0.as_str()).unwrap(); + encoder.str(from_repo_id.0.as_str()).unwrap(); + encoder.str("targetId").unwrap(); + encoder.str(to_repo_id.0.as_str()).unwrap(); + encoder.str("documentId").unwrap(); + encode_doc_id(&mut encoder, document_id); + encoder.str("sessionId").unwrap(); + encoder.str(session_id.as_ref()).unwrap(); + encoder.str("data").unwrap(); + encoder.bytes(message.as_slice()).unwrap(); + encoder.str("count").unwrap(); + encoder.u64(u64::from(*count)).unwrap(); + } + Self::Repo(RepoMessage::Unavailable { + sender_id, + target_id, + document_id, + }) => { + encoder.map(4).unwrap(); + encoder.str("type").unwrap(); + encoder.str("doc-unavailable").unwrap(); + encoder.str("senderId").unwrap(); + encoder.str(sender_id.0.as_str()).unwrap(); + encoder.str("targetId").unwrap(); + encoder.str(target_id.0.as_str()).unwrap(); + encoder.str("documentId").unwrap(); + encode_doc_id(&mut encoder, document_id); + } + Self::Error { message } => { + encoder.map(2).unwrap(); + encoder.str("type").unwrap(); + encoder.str("error").unwrap(); + encoder.str("message").unwrap(); + encoder.str(message).unwrap(); } - _ => todo!(), } encoder.into_writer() } } +fn encode_doc_id(encoder: &mut minicbor::Encoder, doc_id: &DocumentId) +where + W::Error: std::fmt::Debug, +{ + encoder.str(format!("{}", doc_id).as_str()).unwrap(); +} + #[derive(Debug, thiserror::Error)] pub enum DecodeError { #[error("missing len")] @@ -113,20 +247,30 @@ pub enum DecodeError { Minicbor(String), #[error("no type field")] MissingType, - #[error("no channel_id field")] - MissingChannelId, + #[error("no documentId field")] + MissingDocumentId, #[error("no sender_id field")] MissingSenderId, #[error("no target_id field")] MissingTargetId, #[error("no document_id field")] - MissingDocumentId, - #[error("no data field")] MissingData, #[error("no broadcast field")] MissingBroadcast, #[error("unknown type {0}")] UnknownType(String), + #[error("non-positive ephemeral session count")] + NonPositiveEphemeralCount, + #[error("no ephemeral session id")] + MissingEphemeralSession, + #[error("no ephemeral session count")] + MissingEphemeralCount, + #[error("no error message")] + MissingErrorMessage, + #[error("missing selectedProtocolVersion")] + MissingSelectedProtocolVersion, + #[error("missing supportedProtocolVersions")] + MissingSupportedProtocolVersions, #[error("invalid document id")] InvalidDocumentId, } @@ -136,3 +280,19 @@ impl From for DecodeError { Self::Minicbor(e.to_string()) } } + +#[cfg(test)] +mod tests { + use bolero::check; + + use super::Message; + + #[test] + fn test_message_encode_decode() { + check!().with_arbitrary::().for_each(|message| { + let encoded = message.encode(); + let decoded = Message::decode(&encoded).unwrap(); + assert_eq!(message, &decoded); + }) + } +} diff --git a/src/network_connect.rs b/src/network_connect.rs index f1d442ce..6361c4e4 100644 --- a/src/network_connect.rs +++ b/src/network_connect.rs @@ -1,4 +1,4 @@ -use crate::interfaces::{Message, NetworkError, RepoId, RepoMessage}; +use crate::interfaces::{Message, NetworkError, ProtocolVersion, RepoId}; use crate::repo::RepoHandle; use futures::{Sink, SinkExt, Stream, StreamExt}; @@ -22,17 +22,17 @@ impl RepoHandle { Str: Stream> + Send + 'static + Unpin, { let other_id = self.handshake(&mut stream, &mut sink, direction).await?; - tracing::trace!(?other_id, repo_id=?self.get_repo_id(), "Handshake complete"); + tracing::trace!(?other_id, repo_id=?self.get_repo_id(), "handshake complete"); let stream = stream.map({ let repo_id = self.get_repo_id().clone(); move |msg| match msg { Ok(Message::Repo(repo_msg)) => { - tracing::trace!(?repo_msg, repo_id=?repo_id, "Received repo message"); + tracing::trace!(?repo_msg, repo_id=?repo_id, "received repo message"); Ok(repo_msg) } Ok(m) => { - tracing::warn!(?m, repo_id=?repo_id, "Received non-repo message"); + tracing::warn!(?m, repo_id=?repo_id, "received non-repo message"); Err(NetworkError::Error( "unexpected non-repo message".to_string(), )) @@ -48,12 +48,9 @@ impl RepoHandle { }); let sink = sink - .with_flat_map::(|msg| match msg { - RepoMessage::Sync { .. } => futures::stream::iter(vec![Ok(Message::Repo(msg))]), - _ => futures::stream::iter(vec![]), - }) + .with::<_, _, _, SendErr>(move |msg| futures::future::ready(Ok(Message::Repo(msg)))) .sink_map_err(|e| { - tracing::error!(?e, "Error sending repo message"); + tracing::error!(?e, "error sending repo message"); NetworkError::Error(format!("error sending repo message: {}", e)) }); @@ -78,7 +75,9 @@ impl RepoHandle { ConnDirection::Incoming => { if let Some(msg) = stream.next().await { let other_id = match msg { - Ok(Message::Join(other_id)) => other_id, + Ok(Message::Join { + sender: other_id, .. + }) => other_id, Ok(other) => { return Err(NetworkError::Error(format!( "unexpected message (expecting join): {:?}", @@ -89,7 +88,10 @@ impl RepoHandle { return Err(NetworkError::Error(format!("error reciving: {}", e))) } }; - let msg = Message::Peer(self.get_repo_id().clone()); + let msg = Message::Peer { + sender: self.get_repo_id().clone(), + selected_protocol_version: ProtocolVersion::V1, + }; sink.send(msg) .await .map_err(|e| NetworkError::Error(format!("error sending: {}", e)))?; @@ -101,13 +103,16 @@ impl RepoHandle { } } ConnDirection::Outgoing => { - let msg = Message::Join(self.get_repo_id().clone()); + let msg = Message::Join { + sender: self.get_repo_id().clone(), + supported_protocol_versions: vec![ProtocolVersion::V1], + }; sink.send(msg) .await .map_err(|e| NetworkError::Error(format!("send error: {}", e)))?; let msg = stream.next().await; match msg { - Some(Ok(Message::Peer(sender))) => Ok(sender), + Some(Ok(Message::Peer { sender, .. })) => Ok(sender), Some(Ok(other)) => Err(NetworkError::Error(format!( "unexpected message (expecting peer): {:?}", other diff --git a/src/repo.rs b/src/repo.rs index bfafcb92..4d1eacc9 100644 --- a/src/repo.rs +++ b/src/repo.rs @@ -4,7 +4,7 @@ use crate::interfaces::{NetworkError, RepoMessage, Storage, StorageError}; use crate::share_policy::ShareDecision; use crate::{share_policy, SharePolicy, SharePolicyError}; use automerge::sync::{Message as SyncMessage, State as SyncState, SyncDoc}; -use automerge::{Automerge, ChangeHash}; +use automerge::{Automerge, ChangeHash, ReadDoc}; use core::pin::Pin; use crossbeam_channel::{select, unbounded, Receiver, Sender}; use futures::future::{BoxFuture, Future}; @@ -23,6 +23,8 @@ use std::sync::Arc; use std::thread::{self, JoinHandle}; use uuid::Uuid; +mod request; + /// Front-end of the repo. #[derive(Debug, Clone)] pub struct RepoHandle { @@ -59,6 +61,18 @@ enum NetworkEvent { document_id: DocumentId, message: SyncMessage, }, + /// A repo requested a document + Request { + from_repo_id: RepoId, + to_repo_id: RepoId, + document_id: DocumentId, + message: SyncMessage, + }, + Unavailable { + from_repo_id: RepoId, + to_repo_id: RepoId, + document_id: DocumentId, + }, } impl std::fmt::Debug for NetworkEvent { @@ -75,6 +89,27 @@ impl std::fmt::Debug for NetworkEvent { .field("to_repo_id", to_repo_id) .field("document_id", document_id) .finish(), + NetworkEvent::Request { + from_repo_id, + to_repo_id, + document_id, + message: _, + } => f + .debug_struct("NetworkEvent::Request") + .field("from_repo_id", from_repo_id) + .field("to_repo_id", to_repo_id) + .field("document_id", document_id) + .finish(), + NetworkEvent::Unavailable { + from_repo_id, + to_repo_id, + document_id, + } => f + .debug_struct("NetworkEvent::Unavailable") + .field("from_repo_id", from_repo_id) + .field("to_repo_id", to_repo_id) + .field("document_id", document_id) + .finish(), } } } @@ -90,6 +125,55 @@ enum NetworkMessage { document_id: DocumentId, message: SyncMessage, }, + Request { + from_repo_id: RepoId, + to_repo_id: RepoId, + document_id: DocumentId, + message: SyncMessage, + }, + Unavailable { + from_repo_id: RepoId, + to_repo_id: RepoId, + document_id: DocumentId, + }, +} + +impl From for RepoMessage { + fn from(msg: NetworkMessage) -> Self { + match msg { + NetworkMessage::Sync { + from_repo_id, + to_repo_id, + document_id, + message, + } => RepoMessage::Sync { + from_repo_id, + to_repo_id, + document_id, + message: message.encode(), + }, + NetworkMessage::Request { + from_repo_id, + to_repo_id, + document_id, + message, + } => RepoMessage::Request { + sender_id: from_repo_id, + target_id: to_repo_id, + document_id, + sync_message: message.encode(), + }, + NetworkMessage::Unavailable { + from_repo_id, + to_repo_id, + document_id, + } => RepoMessage::Unavailable { + document_id, + sender_id: from_repo_id, + target_id: to_repo_id, + }, + } + } } /// Create a pair of repo future and resolver. @@ -134,40 +218,30 @@ impl RepoHandle { } /// Create a new document. - pub fn new_document(&self) -> DocHandle { + pub fn new_document(&self) -> impl Future { let document_id = DocumentId::random(); let document = new_document(); - let doc_info = self.new_document_info(document, DocState::Sync(vec![])); - let handle = DocHandle::new( - self.repo_sender.clone(), - document_id.clone(), - doc_info.document.clone(), - doc_info.handle_count.clone(), - self.repo_id.clone(), - ); + let (future, resolver) = new_repo_future_with_resolver(); self.repo_sender - .send(RepoEvent::NewDoc(document_id, doc_info)) + .send(RepoEvent::NewDoc( + document_id, + SharedDocument { + automerge: document, + }, + resolver, + )) .expect("Failed to send repo event."); - // TODO: return a future to make-up for the unboundedness of the channel. - handle + future } /// Boostrap a document, first from storage, and if not found over the network. pub fn request_document( &self, document_id: DocumentId, - ) -> RepoFuture> { - let document = new_document(); + ) -> impl Future, RepoError>> { let (fut, resolver) = new_repo_future_with_resolver(); - let doc_info = self.new_document_info( - document, - DocState::Bootstrap { - resolvers: vec![resolver], - storage_fut: None, - }, - ); self.repo_sender - .send(RepoEvent::NewDoc(document_id, doc_info)) + .send(RepoEvent::RequestDoc(document_id, resolver)) .expect("Failed to send repo event."); fut } @@ -186,15 +260,6 @@ impl RepoHandle { fut } - fn new_document_info(&self, document: Automerge, state: DocState) -> DocumentInfo { - let document = SharedDocument { - automerge: document, - }; - let document = Arc::new(RwLock::new(document)); - let handle_count = Arc::new(AtomicUsize::new(1)); - DocumentInfo::new(state, document, handle_count) - } - /// Add a network adapter, representing a connection with a remote repo. pub fn new_remote_repo( &self, @@ -215,7 +280,12 @@ impl RepoHandle { /// Events sent by repo or doc handles to the repo. pub(crate) enum RepoEvent { /// Start processing a new document. - NewDoc(DocumentId, DocumentInfo), + NewDoc(DocumentId, SharedDocument, RepoFutureResolver), + /// Request a document we don't have + RequestDoc( + DocumentId, + RepoFutureResolver, RepoError>>, + ), /// A document changed. DocChange(DocumentId), /// A document was closed(all doc handles dropped). @@ -246,7 +316,8 @@ pub(crate) enum RepoEvent { impl fmt::Debug for RepoEvent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - RepoEvent::NewDoc(_, _) => f.write_str("RepoEvent::NewDoc"), + RepoEvent::NewDoc(_, _, _) => f.write_str("RepoEvent::NewDoc"), + RepoEvent::RequestDoc(_, _) => f.write_str("RepoEvent::RequestDoc"), RepoEvent::DocChange(_) => f.write_str("RepoEvent::DocChange"), RepoEvent::DocClosed(_) => f.write_str("RepoEvent::DocClosed"), RepoEvent::AddChangeObserver(_, _, _) => f.write_str("RepoEvent::AddChangeObserver"), @@ -308,12 +379,12 @@ pub(crate) enum DocState { /// Bootstrapping will resolve into a future doc handle, /// the optional storage fut represents first checking storage before the network. Bootstrap { - resolvers: Vec>>, + resolvers: Vec, RepoError>>>, storage_fut: BootstrapStorageFut, }, /// Pending a load from storage, not attempting to sync over network. LoadPending { - resolver: RepoFutureResolver, RepoError>>, + resolvers: Vec, RepoError>>>, storage_fut: BoxFuture<'static, Result>, StorageError>>, }, /// The doc is syncing(can be edited locally), @@ -348,10 +419,6 @@ impl DocState { matches!(self, DocState::Bootstrap { .. }) } - fn is_pending_load(&self) -> bool { - matches!(self, DocState::LoadPending { .. }) - } - fn should_sync(&self) -> bool { matches!(self, DocState::Sync(_)) || matches!( @@ -371,20 +438,11 @@ impl DocState { } } - fn get_bootstrap_resolvers(&mut self) -> Vec>> { - match self { - DocState::Bootstrap { resolvers, .. } => mem::take(resolvers), - _ => unreachable!( - "Trying to get boostrap resolvers from a document that cannot have any." - ), - } - } - fn resolve_bootstrap_fut(&mut self, doc_handle: Result) { match self { DocState::Bootstrap { resolvers, .. } => { for mut resolver in resolvers.drain(..) { - resolver.resolve_fut(doc_handle.clone()); + resolver.resolve_fut(doc_handle.clone().map(Some)); } } _ => unreachable!( @@ -396,9 +454,13 @@ impl DocState { fn resolve_load_fut(&mut self, doc_handle: Result, RepoError>) { match self { DocState::LoadPending { - resolver, + resolvers, storage_fut: _, - } => resolver.resolve_fut(doc_handle), + } => { + for mut resolver in resolvers.drain(..) { + resolver.resolve_fut(doc_handle.clone()); + } + } _ => unreachable!( "Trying to resolve a load future for a document that does not have one." ), @@ -408,9 +470,13 @@ impl DocState { fn resolve_any_fut_for_shutdown(&mut self) { match self { DocState::LoadPending { - resolver, + resolvers, storage_fut: _, - } => resolver.resolve_fut(Err(RepoError::Shutdown)), + } => { + for mut resolver in resolvers.drain(..) { + resolver.resolve_fut(Err(RepoError::Shutdown)) + } + } DocState::Bootstrap { resolvers, .. } => { for mut resolver in resolvers.drain(..) { resolver.resolve_fut(Err(RepoError::Shutdown)); @@ -427,7 +493,7 @@ impl DocState { assert!(matches!(*waker, RepoWaker::Storage { .. })); match self { DocState::LoadPending { - resolver: _, + resolvers: _, storage_fut, } => { let waker = waker_ref(&waker); @@ -462,38 +528,6 @@ impl DocState { } } - fn add_boostrap_storage_fut( - &mut self, - fut: BoxFuture<'static, Result>, StorageError>>, - ) { - match self { - DocState::Bootstrap { - resolvers: _, - ref mut storage_fut, - } => { - assert!(storage_fut.is_none()); - *storage_fut = Some(fut); - } - _ => unreachable!( - "Trying to add a boostrap load future for a document that does not need one." - ), - } - } - - fn add_boostrap_resolvers( - &mut self, - incoming: &mut Vec>>, - ) { - match self { - DocState::Bootstrap { - ref mut resolvers, .. - } => { - resolvers.append(incoming); - } - _ => unreachable!("Unexpected adding of boostrap resolvers."), - } - } - fn poll_pending_save(&mut self, waker: Arc) { assert!(matches!(*waker, RepoWaker::Storage { .. })); match self { @@ -579,7 +613,10 @@ enum PeerConnection { /// we've accepted the peer and are syncing with them Accepted(SyncState), /// We're waiting for a response from the share policy - PendingAuth { received_messages: Vec }, + PendingAuth { + /// Messages received while we were waiting for a response from the share policy + received_messages: Vec, + }, } impl PeerConnection { @@ -589,32 +626,34 @@ impl PeerConnection { } } - fn receive_sync_message( - &mut self, - doc: &mut Automerge, - msg: SyncMessage, - ) -> Result<(), automerge::AutomergeError> { - match self { - PeerConnection::Accepted(sync_state) => doc.receive_sync_message(sync_state, msg), - PeerConnection::PendingAuth { received_messages } => { - received_messages.push(msg); - Ok(()) - } - } - } - - fn generate_sync_message(&mut self, doc: &Automerge) -> Option { - match self { - Self::Accepted(sync_state) => doc.generate_sync_message(sync_state), - Self::PendingAuth { .. } => None, + fn up_to_date(&self, doc: &Automerge) -> bool { + if let Self::Accepted(SyncState { + their_heads: Some(their_heads), + .. + }) = self + { + their_heads + .iter() + .all(|h| doc.get_change_by_hash(h).is_some()) + } else { + false } } } /// A change requested by a peer connection +#[derive(Debug)] enum PeerConnCommand { /// Request authorization from the share policy - RequestAuth(RepoId), + RequestAuth(RepoId, ShareType), + SendRequest { + message: SyncMessage, + to: RepoId, + }, + SendSyncMessage { + message: SyncMessage, + to: RepoId, + }, } impl DocumentInfo { @@ -639,14 +678,6 @@ impl DocumentInfo { } } - fn is_boostrapping(&self) -> bool { - self.state.is_bootstrapping() - } - - fn is_pending_load(&self) -> bool { - self.state.is_pending_load() - } - fn start_pending_removal(&mut self) { self.state = match &mut self.state { DocState::Error | DocState::LoadPending { .. } | DocState::Bootstrap { .. } => { @@ -762,12 +793,14 @@ impl DocumentInfo { let count = { let doc = self.document.read(); let changes = doc.automerge.get_changes(&self.last_heads); + let count = changes.len(); tracing::trace!( last_heads=?self.last_heads, current_heads=?doc.automerge.get_heads(), + num_new_changes=count, "checking for changes since last save" ); - changes.len() + count }; let has_patches = count > 0; self.changes_since_last_compact = self.changes_since_last_compact.saturating_add(count); @@ -791,6 +824,7 @@ impl DocumentInfo { } let should_compact = self.changes_since_last_compact > self.allowable_changes_until_compaction; + tracing::trace!(%document_id, will_compact=?should_compact, "saving document"); let (storage_fut, new_heads) = if should_compact { let (to_save, new_heads) = { let doc = self.document.read(); @@ -815,7 +849,7 @@ impl DocumentInfo { DocState::PendingRemoval(ref mut futs) => { futs.push(storage_fut); } - _ => unreachable!("Unexpected doc state on save."), + _ => {} } let waker = Arc::new(RepoWaker::Storage(wake_sender.clone(), document_id)); self.state.poll_pending_save(waker); @@ -826,98 +860,156 @@ impl DocumentInfo { /// /// # Returns /// - /// A tuple of `(has_changes, commands)` where `has_changes` is true if the document changed as - /// a result of applying the sync message and `commands` is a list of changes requested by the - /// peer connections for this document (e.g. requesting authorization from the share policy). - fn receive_sync_message( - &mut self, - per_remote: HashMap>, - ) -> (bool, Vec) { + /// A `Vec` which is a list of changes requested by the peer connections for + /// this document (e.g. requesting authorization from the share policy). + fn receive_sync_message(&mut self, per_remote: P) -> Vec + where + P: IntoIterator, + I: IntoIterator, + { let mut commands = Vec::new(); - let (start_heads, new_heads) = { - let mut document = self.document.write(); - let start_heads = document.automerge.get_heads(); - for (repo_id, messages) in per_remote { - let conn = match self.peer_connections.entry(repo_id.clone()) { - Entry::Vacant(entry) => { - // if this is a new peer, request authorization - commands.push(PeerConnCommand::RequestAuth(repo_id.clone())); - entry.insert(PeerConnection::pending()) + let mut document = self.document.write(); + for (repo_id, messages) in per_remote { + let conn = match self.peer_connections.entry(repo_id.clone()) { + Entry::Vacant(entry) => { + // if this is a new peer, request authorization + commands.push(PeerConnCommand::RequestAuth( + repo_id.clone(), + ShareType::Synchronize, + )); + entry.insert(PeerConnection::pending()) + } + Entry::Occupied(entry) => entry.into_mut(), + }; + match conn { + PeerConnection::PendingAuth { + ref mut received_messages, + } => { + received_messages.extend(messages); + } + PeerConnection::Accepted(ref mut sync_state) => { + for message in messages { + document + .automerge + .receive_sync_message(sync_state, message) + .expect("Failed to receive sync message."); + } + if let Some(msg) = document.automerge.generate_sync_message(sync_state) { + commands.push(PeerConnCommand::SendSyncMessage { + message: msg, + to: repo_id.clone(), + }); } - Entry::Occupied(entry) => entry.into_mut(), - }; - for message in messages { - conn.receive_sync_message(&mut document.automerge, message) - .expect("Failed to receive sync message."); } } - let new_heads = document.automerge.get_heads(); - (start_heads, new_heads) - }; - (start_heads != new_heads, commands) + } + commands } - /// Promote a peer awaiting authorization to a full peer - /// - /// Returns any messages which the peer sent while we were waiting for authorization - fn promote_pending_peer(&mut self, repo_id: &RepoId) -> Option> { - if let Some(PeerConnection::PendingAuth { received_messages }) = - self.peer_connections.remove(repo_id) - { - self.peer_connections - .insert(repo_id.clone(), PeerConnection::Accepted(SyncState::new())); - Some(received_messages) - } else { - tracing::warn!(remote=%repo_id, "Tried to promote a peer which was not pending authorization"); - None - } + /// Generate outgoing sync message for all repos we are syncing with. + fn generate_sync_messages(&mut self) -> Vec<(RepoId, SyncMessage)> { + let document = self.document.read(); + self.peer_connections + .iter_mut() + .filter_map(|(repo_id, conn)| { + if let PeerConnection::Accepted(ref mut sync_state) = conn { + let message = document.automerge.generate_sync_message(sync_state); + message.map(|msg| (repo_id.clone(), msg)) + } else { + None + } + }) + .collect() } - /// Potentially generate an outgoing sync message. - fn generate_first_sync_message(&mut self, repo_id: RepoId) -> Option { - match self.peer_connections.entry(repo_id) { + fn begin_request(&mut self, remote: &RepoId) -> BeginRequest { + match self.peer_connections.entry(remote.clone()) { Entry::Vacant(entry) => { - let mut sync_state = SyncState::new(); - let document = self.document.read(); - let message = document.automerge.generate_sync_message(&mut sync_state); - entry.insert(PeerConnection::Accepted(sync_state)); - message + entry.insert(PeerConnection::pending()); + BeginRequest::RequiresAuth } Entry::Occupied(mut entry) => match entry.get_mut() { - PeerConnection::PendingAuth { received_messages } => { - let mut document = self.document.write(); - let mut sync_state = SyncState::new(); - for msg in received_messages.drain(..) { - document - .automerge - .receive_sync_message(&mut sync_state, msg) - .expect("Failed to receive sync message."); - } - let message = document.automerge.generate_sync_message(&mut sync_state); - entry.insert(PeerConnection::Accepted(sync_state)); - message - } + PeerConnection::PendingAuth { .. } => BeginRequest::AwaitingAuth, PeerConnection::Accepted(ref mut sync_state) => { + if sync_state.in_flight || sync_state.have_responded { + return BeginRequest::AlreadySyncing; + } let document = self.document.read(); - document.automerge.generate_sync_message(sync_state) + let message = document.automerge.generate_sync_message(sync_state); + if let Some(msg) = message { + BeginRequest::Request(msg) + } else { + BeginRequest::AlreadySyncing + } } }, } } - /// Generate outgoing sync message for all repos we are syncing with. - fn generate_sync_messages(&mut self) -> Vec<(RepoId, SyncMessage)> { - let document = self.document.read(); + fn begin_requests<'a, I: Iterator + 'a>( + &'a mut self, + to_peers: I, + ) -> impl Iterator + 'a { + to_peers.filter_map(|peer| match self.begin_request(peer) { + BeginRequest::AlreadySyncing => { + tracing::debug!(remote=%peer, "not sending request as we are already syncing"); + None + } + BeginRequest::Request(message) => Some(PeerConnCommand::SendRequest { + message, + to: peer.clone(), + }), + BeginRequest::AwaitingAuth => None, + BeginRequest::RequiresAuth => Some(PeerConnCommand::RequestAuth( + peer.clone(), + ShareType::Request, + )), + }) + } + + fn authorize_peer(&mut self, remote: &RepoId) -> Option { + if let Some(PeerConnection::PendingAuth { received_messages }) = + self.peer_connections.remove(remote) + { + let mut doc = self.document.write(); + let mut sync_state = SyncState::new(); + for msg in received_messages { + doc.automerge + .receive_sync_message(&mut sync_state, msg) + .expect("Failed to receive sync message."); + } + let msg = doc.automerge.generate_sync_message(&mut sync_state); + self.peer_connections + .insert(remote.clone(), PeerConnection::Accepted(sync_state)); + msg + } else if !self.peer_connections.contains_key(remote) { + let mut sync_state = SyncState::new(); + let doc = self.document.write(); + let msg = doc.automerge.generate_sync_message(&mut sync_state); + self.peer_connections + .insert(remote.clone(), PeerConnection::Accepted(sync_state)); + msg + } else { + tracing::warn!(remote=%remote, "tried to authorize a peer which was not pending authorization"); + None + } + } + + fn has_up_to_date_peer(&self) -> bool { + let doc = self.document.read(); self.peer_connections - .iter_mut() - .filter_map(|(repo_id, conn)| { - let message = conn.generate_sync_message(&document.automerge); - message.map(|msg| (repo_id.clone(), msg)) - }) - .collect() + .iter() + .any(|(_, conn)| conn.up_to_date(&doc.automerge)) } } +enum BeginRequest { + AlreadySyncing, + Request(SyncMessage), + RequiresAuth, + AwaitingAuth, +} + /// Signal that the stream or sink on the network adapter is ready to be polled. #[derive(Debug)] enum WakeSignal { @@ -1036,6 +1128,9 @@ pub struct Repo { /// Pending share policy futures pending_share_decisions: HashMap>, + + /// Outstanding requests + requests: HashMap, } impl Repo { @@ -1064,6 +1159,7 @@ impl Repo { share_policy, pending_share_decisions: HashMap::new(), share_decisions_to_poll: HashSet::new(), + requests: HashMap::new(), } } @@ -1079,12 +1175,45 @@ impl Repo { /// Save documents that have changed to storage, /// resolve change observers. fn process_changed_document(&mut self) { + let mut commands_by_doc = HashMap::new(); for doc_id in mem::take(&mut self.documents_with_changes) { - if let Some(info) = self.documents.get_mut(&doc_id) { + let Some(info) = self.documents.get_mut(&doc_id) else { + continue; + }; + + if info.has_up_to_date_peer() && info.state.is_bootstrapping() { + tracing::trace!(%doc_id, "bootstrapping complete"); + info.handle_count.fetch_add(1, Ordering::SeqCst); + let handle = DocHandle::new( + self.repo_sender.clone(), + doc_id.clone(), + info.document.clone(), + info.handle_count.clone(), + self.repo_id.clone(), + ); + info.state.resolve_bootstrap_fut(Ok(handle)); + info.state = DocState::Sync(vec![]); + + if let Some(req) = self.requests.remove(&doc_id) { + tracing::trace!(%doc_id, "resolving request"); + let awaiting_response = req.fulfilled(); + let commands = info.receive_sync_message( + awaiting_response + .into_iter() + .map(|(repo, msg)| (repo, std::iter::once(msg))), + ); + commands_by_doc.insert(doc_id.clone(), commands); + } + } + + if info.note_changes() { info.resolve_change_observers(Ok(())); - info.save_document(doc_id, self.storage.as_ref(), &self.wake_sender); + info.save_document(doc_id.clone(), self.storage.as_ref(), &self.wake_sender); } } + for (doc_id, commands) in commands_by_doc { + self.dispatch_peer_conn_commands(&doc_id, commands) + } } /// Remove sync states for repos for which we do not have an adapter anymore. @@ -1179,9 +1308,46 @@ impl Repo { break true; } }, - Ok(RepoMessage::Ephemeral { .. }) => { - tracing::warn!("received ephemeral message, ignoring."); + Ok(RepoMessage::Request { + sender_id, + target_id, + document_id, + sync_message, + }) => match SyncMessage::decode(&sync_message) { + Ok(message) => { + let event = NetworkEvent::Request { + from_repo_id: sender_id, + to_repo_id: target_id, + document_id, + message, + }; + new_messages.push(event); + } + Err(e) => { + tracing::error!(error = ?e, "error decoding sync message"); + break true; + } + }, + Ok(RepoMessage::Unavailable { + document_id, + sender_id, + target_id, + }) => { + let event = NetworkEvent::Unavailable { + document_id, + from_repo_id: sender_id, + to_repo_id: target_id, + }; + new_messages.push(event); } + Ok(RepoMessage::Ephemeral { + from_repo_id: _, + to_repo_id: _, + document_id: _, + message: _, + session_id: _, + count: _, + }) => {} Err(e) => { tracing::error!(error = ?e, "Error on network stream."); break true; @@ -1255,20 +1421,11 @@ impl Repo { Poll::Pending => break, Poll::Ready(Ok(())) => { let pinned_sink = Pin::new(&mut remote_repo.sink); - let NetworkMessage::Sync { - from_repo_id, - to_repo_id, - document_id, - message, - } = pending_messages + let msg = pending_messages .pop_front() .expect("Empty pending messages."); - let outgoing = RepoMessage::Sync { - from_repo_id, - to_repo_id, - document_id, - message: message.encode(), - }; + let outgoing = RepoMessage::from(msg); + tracing::debug!(message = ?outgoing, remote=%repo_id, "sending message."); let result = pinned_sink.start_send(outgoing); if let Err(e) = result { tracing::error!(error = ?e, "Error on network sink."); @@ -1312,34 +1469,49 @@ impl Repo { fn handle_repo_event(&mut self, event: RepoEvent) { tracing::trace!(event = ?event, "Handling repo event"); match event { - // TODO: simplify handling of `RepoEvent::NewDoc`. - // `NewDoc` could be broken-up into two events: `RequestDoc` and `NewDoc`, - // the doc info could be created here. - RepoEvent::NewDoc(document_id, mut info) => { - if info.is_boostrapping() { - tracing::trace!("adding bootstrapping document"); - if let Some(existing_info) = self.documents.get_mut(&document_id) { - if matches!(existing_info.state, DocState::Bootstrap { .. }) { - let mut resolvers = info.state.get_bootstrap_resolvers(); - existing_info.state.add_boostrap_resolvers(&mut resolvers); - } else if matches!(existing_info.state, DocState::Sync(_)) { - existing_info.handle_count.fetch_add(1, Ordering::SeqCst); - let handle = DocHandle::new( - self.repo_sender.clone(), - document_id.clone(), - existing_info.document.clone(), - existing_info.handle_count.clone(), - self.repo_id.clone(), - ); - info.state.resolve_bootstrap_fut(Ok(handle)); - } else { - tracing::warn!(state=?info.state, "newdoc event received for existing document with incorrect state"); - info.state.resolve_bootstrap_fut(Err(RepoError::Incorrect(format!("newdoc event received for existing document with incorrect state: {:?}", info.state)))); - } - return; - } else { + RepoEvent::NewDoc(document_id, document, mut resolver) => { + assert!( + self.documents.get(&document_id).is_none(), + "NewDoc event should be sent with a fresh document ID and only be sent once" + ); + let shared = Arc::new(RwLock::new(document)); + let handle_count = Arc::new(AtomicUsize::new(1)); + let info = + DocumentInfo::new(DocState::Sync(vec![]), shared.clone(), handle_count.clone()); + self.documents.insert(document_id.clone(), info); + resolver.resolve_fut(DocHandle::new( + self.repo_sender.clone(), + document_id.clone(), + shared.clone(), + handle_count.clone(), + self.repo_id.clone(), + )); + Self::enqueue_share_decisions( + self.remote_repos.keys(), + &mut self.pending_share_decisions, + &mut self.share_decisions_to_poll, + self.share_policy.as_ref(), + document_id, + ShareType::Announce, + ); + } + RepoEvent::RequestDoc(document_id, mut resolver) => { + let info = self + .documents + .entry(document_id.clone()) + .or_insert_with(|| { + let handle_count = Arc::new(AtomicUsize::new(1)); let storage_fut = self.storage.get(document_id.clone()); - info.state.add_boostrap_storage_fut(storage_fut); + let mut info = DocumentInfo::new( + DocState::Bootstrap { + resolvers: vec![], + storage_fut: Some(storage_fut), + }, + Arc::new(RwLock::new(SharedDocument { + automerge: new_document(), + })), + handle_count.clone(), + ); info.poll_storage_operation( document_id.clone(), &self.wake_sender, @@ -1347,37 +1519,47 @@ impl Repo { &self.repo_id, ); - let share_type = if info.is_boostrapping() { - Some(ShareType::Request) - } else if info.state.should_sync() { - Some(ShareType::Announce) - } else { - None - }; - if let Some(share_type) = share_type { - Self::enqueue_share_decisions( - self.remote_repos.keys(), - &mut self.pending_share_decisions, - &mut self.share_decisions_to_poll, - self.share_policy.as_ref(), - document_id.clone(), - share_type, - ); - } + info + }); + + match &mut info.state { + DocState::Bootstrap { resolvers, .. } => resolvers.push(resolver), + DocState::Sync(_) => { + info.handle_count.fetch_add(1, Ordering::SeqCst); + let handle = DocHandle::new( + self.repo_sender.clone(), + document_id.clone(), + info.document.clone(), + info.handle_count.clone(), + self.repo_id.clone(), + ); + resolver.resolve_fut(Ok(Some(handle))); + } + DocState::LoadPending { resolvers, .. } => resolvers.push(resolver), + DocState::PendingRemoval(_) => resolver.resolve_fut(Ok(None)), + DocState::Error => { + resolver.resolve_fut(Err(RepoError::Incorrect( + "request event called for document which is in error state".to_string(), + ))); } } - self.documents.insert(document_id, info); + + let req = self.requests.entry(document_id.clone()).or_insert_with(|| { + tracing::trace!(%document_id, "creating new local request"); + request::Request::new(document_id.clone()) + }); + + if info.state.is_bootstrapping() { + let to_request = req.initiate_local(self.remote_repos.keys()); + let commands = info.begin_requests(to_request.iter()).collect::>(); + self.dispatch_peer_conn_commands(&document_id, commands); + } } RepoEvent::DocChange(doc_id) => { // Handle doc changes: sync the document. let local_repo_id = self.get_repo_id().clone(); if let Some(info) = self.documents.get_mut(&doc_id) { - // only run the documents_with_changes workflow if there - // was a change, but always generate potential sync messages - // (below) - if info.note_changes() { - self.documents_with_changes.push(doc_id.clone()); - } + self.documents_with_changes.push(doc_id.clone()); for (to_repo_id, message) in info.generate_sync_messages().into_iter() { let outgoing = NetworkMessage::Sync { from_repo_id: local_repo_id.clone(), @@ -1435,41 +1617,61 @@ impl Repo { }, RepoEvent::LoadDoc(doc_id, resolver) => { let mut resolver_clone = resolver.clone(); - let info = self.documents.entry(doc_id.clone()).or_insert_with(|| { - let storage_fut = self.storage.get(doc_id.clone()); - let shared_document = SharedDocument { - automerge: new_document(), - }; - let state = DocState::LoadPending { - storage_fut, - resolver, - }; - let document = Arc::new(RwLock::new(shared_document)); - let handle_count = Arc::new(AtomicUsize::new(0)); - DocumentInfo::new(state, document, handle_count) - }); + let entry = self.documents.entry(doc_id.clone()); + let info = match entry { + Entry::Occupied(mut entry) => { + let info = entry.get_mut(); + match &mut info.state { + DocState::Error => { + resolver_clone.resolve_fut(Err(RepoError::Incorrect( + "load event called for document which is in error state" + .to_string(), + ))); + } + DocState::LoadPending { resolvers, .. } => { + resolvers.push(resolver_clone); + } + DocState::Bootstrap { resolvers, .. } => { + resolvers.push(resolver_clone); + } + DocState::Sync(_) => { + info.handle_count.fetch_add(1, Ordering::SeqCst); + let handle = DocHandle::new( + self.repo_sender.clone(), + doc_id.clone(), + info.document.clone(), + info.handle_count.clone(), + self.repo_id.clone(), + ); + resolver_clone.resolve_fut(Ok(Some(handle))); + } + DocState::PendingRemoval(_) => resolver_clone.resolve_fut(Ok(None)), + } + entry.into_mut() + } + Entry::Vacant(entry) => { + let storage_fut = self.storage.get(doc_id.clone()); + let shared_document = SharedDocument { + automerge: new_document(), + }; + let state = DocState::LoadPending { + storage_fut, + resolvers: vec![resolver_clone], + }; + let document = Arc::new(RwLock::new(shared_document)); + let handle_count = Arc::new(AtomicUsize::new(0)); + entry.insert(DocumentInfo::new(state, document, handle_count)) + } + }; - // Note: unfriendly API. - // - // API currently assumes client makes a single `load` call, - // for a document that has not been created or requested before. - // - // If the doc is in memory, we could simply create a new handle for it. - // - // If the doc is bootstrapping, - // the resolver could be added to the list of resolvers. - if !info.is_pending_load() { - resolver_clone.resolve_fut(Err(RepoError::Incorrect( - "attempted to load a document which is already loading".to_string(), - ))); - return; + if matches!(info.state, DocState::LoadPending { .. }) { + info.poll_storage_operation( + doc_id, + &self.wake_sender, + &self.repo_sender, + &self.repo_id, + ); } - info.poll_storage_operation( - doc_id, - &self.wake_sender, - &self.repo_sender, - &self.repo_id, - ); } RepoEvent::AddChangeObserver(doc_id, last_heads, mut observer) => { if let Some(info) = self.documents.get_mut(&doc_id) { @@ -1528,14 +1730,32 @@ impl Repo { } } + fn new_document_info() -> DocumentInfo { + // Note: since the handle count is zero, + // the document will not be removed from memory until shutdown. + // Perhaps remove this and rely on `request_document` calls. + let shared_document = SharedDocument { + automerge: new_document(), + }; + let state = DocState::Bootstrap { + resolvers: vec![], + storage_fut: None, + }; + let document = Arc::new(RwLock::new(shared_document)); + let handle_count = Arc::new(AtomicUsize::new(0)); + DocumentInfo::new(state, document, handle_count) + } + /// Apply incoming sync messages, and generate outgoing ones. fn sync_documents(&mut self) { // Re-organize messages so as to acquire the write lock // on the document only once per document. let mut per_doc_messages: HashMap>> = Default::default(); + for event in mem::take(&mut self.pending_events) { tracing::trace!(message = ?event, "processing sync message"); + match event { NetworkEvent::Sync { from_repo_id, @@ -1544,38 +1764,99 @@ impl Repo { message, } => { assert_eq!(to_repo_id, self.repo_id); - - // If we don't know about the document, - // create a new sync state and start syncing. - // Note: this is the mirror of sending sync messages for - // all known documents when a remote repo connects. let info = self .documents .entry(document_id.clone()) - .or_insert_with(|| { - // Note: since the handle count is zero, - // the document will not be removed from memory until shutdown. - // Perhaps remove this and rely on `request_document` calls. - let shared_document = SharedDocument { - automerge: new_document(), - }; - let state = DocState::Bootstrap { - resolvers: vec![], - storage_fut: None, - }; - let document = Arc::new(RwLock::new(shared_document)); - let handle_count = Arc::new(AtomicUsize::new(0)); - DocumentInfo::new(state, document, handle_count) - }); + .or_insert_with(Self::new_document_info); if !info.state.should_sync() { continue; } - let per_doc = per_doc_messages.entry(document_id).or_default(); - let per_remote = per_doc.entry(from_repo_id).or_default(); + let per_doc = per_doc_messages.entry(document_id.clone()).or_default(); + let per_remote = per_doc.entry(from_repo_id.clone()).or_default(); per_remote.push_back(message.clone()); } + NetworkEvent::Request { + from_repo_id, + to_repo_id, + document_id, + message, + } => { + assert_eq!(to_repo_id, self.repo_id); + let info = self + .documents + .entry(document_id.clone()) + .or_insert_with(Self::new_document_info); + match info.state { + DocState::Sync(_) => { + tracing::trace!( + ?from_repo_id, + "responding to request with sync as we have the doc" + ); + // if we have this document then just start syncing + Self::enqueue_share_decisions( + std::iter::once(&from_repo_id), + &mut self.pending_share_decisions, + &mut self.share_decisions_to_poll, + self.share_policy.as_ref(), + document_id.clone(), + ShareType::Synchronize, + ); + } + _ => { + let req = + self.requests.entry(document_id.clone()).or_insert_with(|| { + tracing::trace!(?from_repo_id, "creating new remote request"); + request::Request::new(document_id.clone()) + }); + + let request_from = req.initiate_remote( + &from_repo_id, + message, + self.remote_repos.keys(), + ); + let commands = + info.begin_requests(request_from.iter()).collect::>(); + + if req.is_complete() { + let req = self.requests.remove(&document_id).unwrap(); + Self::fail_request( + req, + &mut self.documents, + &mut self.pending_messages, + &mut self.sinks_to_poll, + self.repo_id.clone(), + ); + } + + self.dispatch_peer_conn_commands(&document_id, commands.into_iter()); + } + } + } + NetworkEvent::Unavailable { + from_repo_id, + to_repo_id: _, + document_id, + } => match self.requests.entry(document_id.clone()) { + Entry::Occupied(mut entry) => { + let req = entry.get_mut(); + req.mark_unavailable(&from_repo_id); + if req.is_complete() { + let req = entry.remove(); + Self::fail_request( + req, + &mut self.documents, + &mut self.pending_messages, + &mut self.sinks_to_poll, + self.repo_id.clone(), + ); + } + } + Entry::Vacant(_) => { + tracing::trace!(?from_repo_id, "received unavailable for request we didnt send or are no longer tracking"); + } + }, } } @@ -1585,57 +1866,10 @@ impl Repo { .get_mut(&document_id) .expect("Doc should have an info by now."); - let (has_changes, peer_conn_commands) = info.receive_sync_message(per_remote); - if has_changes && info.note_changes() { - self.documents_with_changes.push(document_id.clone()); - } + let peer_conn_commands = info.receive_sync_message(per_remote); + self.documents_with_changes.push(document_id.clone()); - for cmd in peer_conn_commands { - match cmd { - PeerConnCommand::RequestAuth(peer_id) => Self::enqueue_share_decisions( - std::iter::once(&peer_id), - &mut self.pending_share_decisions, - &mut self.share_decisions_to_poll, - self.share_policy.as_ref(), - document_id.clone(), - ShareType::Synchronize, - ), - } - } - - // Note: since receiving and generating sync messages is done - // in two separate critical sections, - // local changes could be made in between those, - // which is a good thing(generated messages will include those changes). - let mut ready = true; - for (to_repo_id, message) in info.generate_sync_messages().into_iter() { - if message.heads.is_empty() && !message.need.is_empty() { - ready = false; - } - let outgoing = NetworkMessage::Sync { - from_repo_id: self.repo_id.clone(), - to_repo_id: to_repo_id.clone(), - document_id: document_id.clone(), - message, - }; - self.pending_messages - .entry(to_repo_id.clone()) - .or_default() - .push_back(outgoing); - self.sinks_to_poll.insert(to_repo_id); - } - if ready && info.state.is_bootstrapping() { - info.handle_count.fetch_add(1, Ordering::SeqCst); - let handle = DocHandle::new( - self.repo_sender.clone(), - document_id.clone(), - info.document.clone(), - info.handle_count.clone(), - self.repo_id.clone(), - ); - info.state.resolve_bootstrap_fut(Ok(handle)); - info.state = DocState::Sync(vec![]); - } + self.dispatch_peer_conn_commands(&document_id, peer_conn_commands.into_iter()); } } @@ -1666,15 +1900,15 @@ impl Repo { } } - fn collect_sharepolicy_responses(&mut self) { + fn poll_sharepolicy_responses(&mut self) { let mut decisions = Vec::new(); for repo_id in mem::take(&mut self.share_decisions_to_poll) { if let Some(pending) = self.pending_share_decisions.remove(&repo_id) { let mut still_pending = Vec::new(); for PendingShareDecision { doc_id, - mut future, share_type, + mut future, } in pending { let waker = Arc::new(RepoWaker::ShareDecision( @@ -1689,8 +1923,8 @@ impl Repo { Poll::Pending => { still_pending.push(PendingShareDecision { doc_id, - future, share_type, + future, }); } Poll::Ready(Ok(res)) => { @@ -1714,53 +1948,70 @@ impl Repo { return; }; if share_decision == ShareDecision::Share { - match share_type { - ShareType::Announce | ShareType::Request => { - tracing::debug!(%doc, remote=%peer, "sharing document with remote"); - if let Some(pending_messages) = info.promote_pending_peer(&peer) { - tracing::trace!(remote=%peer, %doc, "we already had pending messages for this peer when announcing so we just wait to generate a sync message"); - for message in pending_messages { - self.pending_events.push_back(NetworkEvent::Sync { - from_repo_id: peer.clone(), - to_repo_id: our_id.clone(), - document_id: doc.clone(), - message, - }); - } - } else if let Some(message) = info.generate_first_sync_message(peer.clone()) - { - tracing::trace!(remote=%peer, %doc, "sending first sync message"); - let outgoing = NetworkMessage::Sync { - from_repo_id: our_id.clone(), - to_repo_id: peer.clone(), - document_id: doc.clone(), - message, - }; - self.pending_messages - .entry(peer.clone()) - .or_default() - .push_back(outgoing); - self.sinks_to_poll.insert(peer); + let message = info.authorize_peer(&peer); + self.documents_with_changes.push(doc.clone()); + let outgoing = message.map(|message| match share_type { + ShareType::Announce => { + tracing::trace!(remote=%peer, %doc, "announcing document to remote"); + NetworkMessage::Sync { + from_repo_id: our_id.clone(), + to_repo_id: peer.clone(), + document_id: doc.clone(), + message, + } + } + ShareType::Request => { + tracing::trace!(remote=%peer, %doc, "requesting document from remote"); + NetworkMessage::Request { + from_repo_id: our_id.clone(), + to_repo_id: peer.clone(), + document_id: doc.clone(), + message, } } ShareType::Synchronize => { tracing::debug!(%doc, remote=%peer, "synchronizing document with remote"); - if let Some(pending_messages) = info.promote_pending_peer(&peer) { - let events = - pending_messages - .into_iter() - .map(|message| NetworkEvent::Sync { - from_repo_id: peer.clone(), - to_repo_id: our_id.clone(), - document_id: doc.clone(), - message, - }); - self.pending_events.extend(events); + NetworkMessage::Sync { + from_repo_id: our_id.clone(), + to_repo_id: peer.clone(), + document_id: doc.clone(), + message, } } + }); + if let Some(outgoing) = outgoing { + self.pending_messages + .entry(peer.clone()) + .or_default() + .push_back(outgoing); + self.sinks_to_poll.insert(peer); } } else { - tracing::debug!(?doc, ?peer, "refusing to share document with remote"); + match share_type { + ShareType::Request => { + tracing::debug!(%doc, remote=%peer, "refusing to request document from remote"); + } + ShareType::Announce => { + tracing::debug!(%doc, remote=%peer, "refusing to announce document to remote"); + } + ShareType::Synchronize => { + tracing::debug!(%doc, remote=%peer, "refusing to synchronize document with remote"); + } + } + if let Some(req) = self.requests.get_mut(&doc) { + tracing::trace!(request=?req, "marking request as unavailable due to rejected authorization"); + req.mark_unavailable(&peer); + if req.is_complete() { + let req = self.requests.remove(&doc).unwrap(); + Self::fail_request( + req, + &mut self.documents, + &mut self.pending_messages, + &mut self.sinks_to_poll, + self.repo_id.clone(), + ); + } + } } } } @@ -1781,7 +2032,7 @@ impl Repo { let handle = thread::spawn(move || { let _entered = span.entered(); loop { - self.collect_sharepolicy_responses(); + self.poll_sharepolicy_responses(); self.collect_network_events(); self.sync_documents(); self.process_outgoing_network_messages(); @@ -1789,6 +2040,9 @@ impl Repo { self.remove_unused_sync_states(); self.remove_unused_pending_messages(); self.gc_docs(); + if !self.share_decisions_to_poll.is_empty() { + continue; + } select! { recv(self.repo_receiver) -> repo_event => { if let Ok(event) = repo_event { @@ -1949,6 +2203,9 @@ impl Repo { share_type: ShareType, ) { let remote_repos = remote_repos.collect::>(); + if remote_repos.is_empty() { + return; + } match share_type { ShareType::Request => { tracing::debug!(remotes=?remote_repos, ?document_id, "checking if we should request this document from remotes"); @@ -1971,10 +2228,98 @@ impl Repo { .or_default() .push(PendingShareDecision { doc_id: document_id.clone(), - future, share_type, + future, }); share_decisions_to_poll.insert(repo_id.clone()); } } + + fn fail_request( + request: request::Request, + documents: &mut HashMap, + pending_messages: &mut HashMap>, + sinks_to_poll: &mut HashSet, + our_repo_id: RepoId, + ) { + tracing::debug!(?request, "request is complete"); + + match documents.entry(request.document_id().clone()) { + Entry::Occupied(entry) => { + if entry.get().state.is_bootstrapping() { + let info = entry.remove(); + if let DocState::Bootstrap { mut resolvers, .. } = info.state { + for mut resolver in resolvers.drain(..) { + tracing::trace!("resolving local process waiting for request to None"); + resolver.resolve_fut(Ok(None)); + } + } + } + } + Entry::Vacant(_) => { + tracing::trace!("no local proess is waiting for this request to complete"); + } + } + + let document_id = request.document_id().clone(); + for repo_id in request.unavailable() { + let outgoing = NetworkMessage::Unavailable { + from_repo_id: our_repo_id.clone(), + to_repo_id: repo_id.clone(), + document_id: document_id.clone(), + }; + pending_messages + .entry(repo_id.clone()) + .or_default() + .push_back(outgoing); + sinks_to_poll.insert(repo_id.clone()); + } + } + + fn dispatch_peer_conn_commands>( + &mut self, + document_id: &DocumentId, + commands: I, + ) { + for command in commands { + match command { + PeerConnCommand::RequestAuth(peer_id, share_type) => { + Self::enqueue_share_decisions( + std::iter::once(&peer_id), + &mut self.pending_share_decisions, + &mut self.share_decisions_to_poll, + self.share_policy.as_ref(), + document_id.clone(), + share_type, + ); + } + PeerConnCommand::SendRequest { message, to } => { + let outgoing = NetworkMessage::Request { + from_repo_id: self.repo_id.clone(), + to_repo_id: to.clone(), + document_id: document_id.clone(), + message, + }; + self.pending_messages + .entry(to.clone()) + .or_default() + .push_back(outgoing); + self.sinks_to_poll.insert(to); + } + PeerConnCommand::SendSyncMessage { message, to } => { + let outgoing = NetworkMessage::Sync { + from_repo_id: self.repo_id.clone(), + to_repo_id: to.clone(), + document_id: document_id.clone(), + message, + }; + self.pending_messages + .entry(to.clone()) + .or_default() + .push_back(outgoing); + self.sinks_to_poll.insert(to); + } + } + } + } } diff --git a/src/repo/request.rs b/src/repo/request.rs new file mode 100644 index 00000000..cdcc1894 --- /dev/null +++ b/src/repo/request.rs @@ -0,0 +1,83 @@ +use std::collections::{HashMap, HashSet}; + +use automerge::sync::Message as SyncMessage; + +use crate::{DocumentId, RepoId}; + +#[derive(Debug)] +pub(super) struct Request { + document_id: DocumentId, + awaiting_response_from: HashSet, + awaiting_our_response: HashMap, +} + +impl Request { + pub(super) fn new(doc_id: DocumentId) -> Self { + Request { + document_id: doc_id, + awaiting_response_from: HashSet::new(), + awaiting_our_response: HashMap::new(), + } + } + + pub(super) fn document_id(&self) -> &DocumentId { + &self.document_id + } + + pub(super) fn mark_unavailable(&mut self, repo_id: &RepoId) { + self.awaiting_our_response.remove(repo_id); + self.awaiting_response_from.remove(repo_id); + } + + pub(super) fn is_complete(&self) -> bool { + self.awaiting_response_from.is_empty() + } + + pub(super) fn initiate_local<'a, I: Iterator>( + &mut self, + connected_peers: I, + ) -> HashSet { + self.initiate_inner(None, connected_peers) + } + + pub(super) fn initiate_remote<'a, I: Iterator>( + &mut self, + from_peer: &RepoId, + request_sync_message: SyncMessage, + connected_peers: I, + ) -> HashSet { + self.initiate_inner(Some((from_peer, request_sync_message)), connected_peers) + } + + fn initiate_inner<'a, I: Iterator>( + &mut self, + from_repo_id: Option<(&RepoId, SyncMessage)>, + connected_peers: I, + ) -> HashSet { + if let Some((from_peer, initial_message)) = from_repo_id { + self.awaiting_our_response + .insert(from_peer.clone(), initial_message); + } + connected_peers + .filter(|remote| { + if self.awaiting_our_response.contains_key(remote) + || self.awaiting_response_from.contains(remote) + { + false + } else { + self.awaiting_response_from.insert((*remote).clone()); + true + } + }) + .cloned() + .collect() + } + + pub(super) fn fulfilled(self) -> HashMap { + self.awaiting_our_response + } + + pub(super) fn unavailable(self) -> impl Iterator { + self.awaiting_our_response.into_keys() + } +} diff --git a/tests/interop/main.rs b/tests/interop/main.rs index 31ea3d92..8bc99fac 100644 --- a/tests/interop/main.rs +++ b/tests/interop/main.rs @@ -2,15 +2,14 @@ use std::{panic::catch_unwind, path::PathBuf, process::Child, thread::sleep, tim use automerge::{transaction::Transactable, ReadDoc}; use automerge_repo::{ConnDirection, Repo}; +use test_log::test; use test_utils::storage_utils::InMemoryStorage; -//use test_log::test; const INTEROP_SERVER_PATH: &str = "interop-test-server"; const PORT: u16 = 8099; #[test] fn interop_test() { - env_logger::init(); tracing::trace!("we're starting up"); let mut server_process = start_js_server(); let result = catch_unwind(|| sync_two_repos(PORT)); @@ -27,7 +26,7 @@ fn sync_two_repos(port: u16) { let runtime = tokio::runtime::Runtime::new().unwrap(); runtime.block_on(async { let storage1 = Box::::default(); - let repo1 = Repo::new(None, storage1); + let repo1 = Repo::new(Some("repo1".to_string()), storage1); let repo1_handle = repo1.run(); let (conn, _) = tokio_tungstenite::connect_async(format!("ws://localhost:{}", port)) .await @@ -37,15 +36,15 @@ fn sync_two_repos(port: u16) { .connect_tungstenite(conn, ConnDirection::Outgoing) .await .expect("error connecting connection 1"); - tracing::trace!("connecting conn1"); + tokio::spawn(async { if let Err(e) = conn1_driver.await { tracing::error!("Error running repo 1 connection: {}", e); } + tracing::trace!("conn1 finished"); }); - tracing::trace!("connected conn1"); - let doc_handle_repo1 = repo1_handle.new_document(); + let doc_handle_repo1 = repo1_handle.new_document().await; doc_handle_repo1 .with_doc_mut(|doc| { doc.transact(|tx| { @@ -55,8 +54,10 @@ fn sync_two_repos(port: u16) { }) .unwrap(); + tokio::time::sleep(Duration::from_millis(1000)).await; + let storage2 = Box::::default(); - let repo2 = Repo::new(None, storage2); + let repo2 = Repo::new(Some("repo2".to_string()), storage2); let repo2_handle = repo2.run(); let (conn2, _) = tokio_tungstenite::connect_async(format!("ws://localhost:{}", port)) @@ -71,15 +72,15 @@ fn sync_two_repos(port: u16) { if let Err(e) = conn2_driver.await { tracing::error!("Error running repo 2 connection: {}", e); } + tracing::trace!("conn2 finished"); }); - tokio::time::sleep(Duration::from_millis(100)).await; - tracing::info!("Requesting"); //tokio::time::sleep(Duration::from_secs(1)).await; let doc_handle_repo2 = repo2_handle .request_document(doc_handle_repo1.document_id()) .await + .unwrap() .unwrap(); doc_handle_repo2.with_doc(|doc| { assert_eq!( @@ -129,7 +130,7 @@ fn start_js_server() -> Child { println!("Error connecting to server: {}", e); } } - sleep(Duration::from_millis(100)); + sleep(Duration::from_millis(1000)); } child } diff --git a/tests/network/document_changed.rs b/tests/network/document_changed.rs index f427ebe4..14f27855 100644 --- a/tests/network/document_changed.rs +++ b/tests/network/document_changed.rs @@ -22,14 +22,18 @@ async fn test_document_changed_over_sync() { let repo_handle_2_clone = repo_handle_2.clone(); // Create a document for one repo. - let document_handle_1 = repo_handle_1.new_document(); + let document_handle_1 = repo_handle_1.new_document().await; // Spawn a task that awaits the requested doc handle, // and then edits the document. let doc_id = document_handle_1.document_id(); tokio::spawn(async move { // Request the document. - let doc_handle = repo_handle_2.request_document(doc_id).await.unwrap(); + let doc_handle = repo_handle_2 + .request_document(doc_id) + .await + .unwrap() + .expect("document should be found"); doc_handle.with_doc_mut(|doc| { let val = doc .get(automerge::ROOT, "repo_id") @@ -95,7 +99,7 @@ async fn test_document_changed_locally() { let expected_repo_id = repo_handle_1.get_repo_id().clone(); // Create a document for the repo. - let doc_handle = repo_handle_1.new_document(); + let doc_handle = repo_handle_1.new_document().await; // spawn a task which edits the document tokio::spawn({ diff --git a/tests/network/document_list.rs b/tests/network/document_list.rs index 959ef3b0..6473648b 100644 --- a/tests/network/document_list.rs +++ b/tests/network/document_list.rs @@ -14,7 +14,7 @@ async fn test_list_all() { let repo_handle = repo.run(); // Create a document for one repo. - let document_handle = repo_handle.new_document(); + let document_handle = repo_handle.new_document().await; let document_id = document_handle.document_id(); // Edit the document. @@ -58,7 +58,7 @@ async fn test_list_all_errors_on_shutdown() { let repo_handle = repo.run(); // Create a document for one repo. - let document_handle = repo_handle.new_document(); + let document_handle = repo_handle.new_document().await; let document_id = document_handle.document_id(); // Edit the document. diff --git a/tests/network/document_load.rs b/tests/network/document_load.rs index a204f001..b0e176a9 100644 --- a/tests/network/document_load.rs +++ b/tests/network/document_load.rs @@ -15,7 +15,7 @@ async fn test_loading_document_found_immediately() { let repo_handle = repo.run(); // Create a document for one repo. - let document_handle = repo_handle.new_document(); + let document_handle = repo_handle.new_document().await; // Edit the document. let doc_data = document_handle.with_doc_mut(|doc| { @@ -66,7 +66,7 @@ async fn test_loading_document_found_async() { let repo_handle = repo.run(); // Create a document for one repo. - let document_handle = repo_handle.new_document(); + let document_handle = repo_handle.new_document().await; // Edit the document. let doc_data = document_handle.with_doc_mut(|doc| { diff --git a/tests/network/document_request.rs b/tests/network/document_request.rs index 91b0efa7..38960890 100644 --- a/tests/network/document_request.rs +++ b/tests/network/document_request.rs @@ -2,12 +2,16 @@ extern crate test_utils; use std::time::Duration; -use automerge::transaction::Transactable; -use automerge_repo::{DocumentId, Repo, RepoHandle, RepoId}; +use automerge::{transaction::Transactable, ReadDoc}; +use automerge_repo::{ + share_policy::ShareDecision, DocumentId, Repo, RepoHandle, RepoId, SharePolicy, + SharePolicyError, +}; +use futures::{future::BoxFuture, FutureExt}; use test_log::test; use test_utils::storage_utils::{InMemoryStorage, SimpleStorage}; -use crate::tincans::connect_repos; +use crate::tincans::{connect_repos, connect_to_nowhere}; #[test(tokio::test)] async fn test_requesting_document_connected_peers() { @@ -25,7 +29,7 @@ async fn test_requesting_document_connected_peers() { connect_repos(&repo_handle_1, &repo_handle_2); // Create a document for one repo. - let document_handle_1 = repo_handle_1.new_document(); + let document_handle_1 = repo_handle_1.new_document().await; // Edit the document. document_handle_1.with_doc_mut(|doc| { @@ -42,19 +46,18 @@ async fn test_requesting_document_connected_peers() { // Request the document. let doc_handle_future = tokio::spawn(repo_handle_2.request_document(document_handle_1.document_id())); - let load = repo_handle_2.load(document_handle_1.document_id()); + let _load = repo_handle_2.load(document_handle_1.document_id()); - assert_eq!( - tokio::time::timeout(Duration::from_millis(100), doc_handle_future) - .await - .expect("load future timed out") - .unwrap() - .expect("document should be found") - .document_id(), - document_handle_1.document_id() - ); + let doc_handle = tokio::time::timeout(Duration::from_millis(1000), doc_handle_future) + .await + .expect("load timed out") + .expect("doc handle spawn failed") + .expect("doc handle future failed") + .expect("doc handle should exist"); - let _ = tokio::task::spawn(async move { + assert_eq!(doc_handle.document_id(), document_handle_1.document_id()); + + let storage_complete = tokio::task::spawn(async move { // Check that the document has been saved in storage. // TODO: replace the loop with an async notification mechanism. loop { @@ -63,12 +66,11 @@ async fn test_requesting_document_connected_peers() { } tokio::time::sleep(Duration::from_millis(100)).await; } - }) - .await; - - // Load following a request fails, but this API should be improved. - // See comment at handling of `RepoEvent::LoadDoc`. - assert!(load.await.is_err()); + }); + tokio::time::timeout(Duration::from_millis(1000), storage_complete) + .await + .expect("storage complete timed out") + .expect("storage complete spawn failed"); // Stop the repos. tokio::task::spawn_blocking(|| { @@ -91,7 +93,7 @@ async fn test_requesting_document_unconnected_peers() { connect_repos(&repo_handle_1, &repo_handle_2); // Create a document for one repo. - let document_handle_1 = repo_handle_1.new_document(); + let document_handle_1 = repo_handle_1.new_document().await; // Edit the document. document_handle_1.with_doc_mut(|doc| { @@ -112,6 +114,7 @@ async fn test_requesting_document_unconnected_peers() { .request_document(document_handle_1.document_id()) .await .unwrap() + .expect("document should be found") .document_id(); assert_eq!(doc_id, document_handle_1.document_id()); @@ -130,7 +133,7 @@ async fn test_requesting_document_unconnected_peers_with_storage_load() { let repo_handle_1 = repo_1.run(); // Create a document for one repo. - let document_handle_1 = repo_handle_1.new_document(); + let document_handle_1 = repo_handle_1.new_document().await; // Edit the document. document_handle_1.with_doc_mut(|doc| { @@ -158,6 +161,7 @@ async fn test_requesting_document_unconnected_peers_with_storage_load() { .request_document(document_handle_1.document_id()) .await .unwrap() + .expect("document should be found") .document_id(); assert_eq!(doc_id, document_handle_1.document_id()); @@ -181,7 +185,7 @@ async fn test_request_with_repo_stop() { let repo_handle_2 = repo_2.run(); // Create a document for one repo. - let document_handle_1 = repo_handle_1.new_document(); + let document_handle_1 = repo_handle_1.new_document().await; // Edit the document. document_handle_1.with_doc_mut(|doc| { @@ -221,7 +225,7 @@ async fn test_request_twice_ok_bootstrap() { let repo_handle_1 = repo_1.run(); // Create a document for one repo. - let document_handle_1 = repo_handle_1.new_document(); + let document_handle_1 = repo_handle_1.new_document().await; // Edit the document. document_handle_1.with_doc_mut(|doc| { @@ -254,7 +258,11 @@ async fn test_request_twice_ok_bootstrap() { // Future should resolve from storage load(no peers are connected). assert_eq!( - doc_handle_future.await.unwrap().document_id(), + doc_handle_future + .await + .unwrap() + .expect("document should be found") + .document_id(), document_handle_1.document_id() ); @@ -276,7 +284,7 @@ async fn test_request_twice_ok() { let repo_handle = repo.run(); // Create a document for one repo. - let document_handle = repo_handle.new_document(); + let document_handle = repo_handle.new_document().await; // Edit the document. document_handle.with_doc_mut(|doc| { @@ -299,7 +307,11 @@ async fn test_request_twice_ok() { // Since the request was made twice, // but the document is ready, the future should resolve to ok. assert_eq!( - doc_handle_future.await.unwrap().document_id(), + doc_handle_future + .await + .unwrap() + .expect("document should be found") + .document_id(), document_handle.document_id() ); @@ -321,7 +333,7 @@ async fn test_request_unavailable_point_to_point() { let repo_handle_2 = repo_2.run(); // Create a document for one repo. - let document_handle_1 = repo_handle_1.new_document(); + let document_handle_1 = repo_handle_1.new_document().await; // Edit the document. document_handle_1.with_doc_mut(|doc| { @@ -362,18 +374,233 @@ async fn request_doc_which_is_not_shared_does_not_announce() { connect_repos(&repo_handle_1, &repo_handle_2); - let document_id = create_doc_with_contents(&repo_handle_1, "peer", "repo1"); + let document_id = create_doc_with_contents(&repo_handle_1, "peer", "repo1").await; // Wait for the announcement to have (maybe) taken place - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(1000)).await; // now try and resolve the document from storage of repo 2 let doc_handle = repo_handle_2.load(document_id).await.unwrap(); assert!(doc_handle.is_none()); } -fn create_doc_with_contents(handle: &RepoHandle, key: &str, value: &str) -> DocumentId { - let document_handle = handle.new_document(); +struct DontAnnounce; + +impl SharePolicy for DontAnnounce { + fn should_announce( + &self, + _doc_id: &DocumentId, + _with_peer: &RepoId, + ) -> BoxFuture<'static, Result> { + futures::future::ready(Ok(ShareDecision::DontShare)).boxed() + } + + fn should_sync( + &self, + _document_id: &DocumentId, + _with_peer: &RepoId, + ) -> BoxFuture<'static, Result> { + futures::future::ready(Ok(ShareDecision::Share)).boxed() + } + + fn should_request( + &self, + _document_id: &DocumentId, + _from_peer: &RepoId, + ) -> BoxFuture<'static, Result> { + futures::future::ready(Ok(ShareDecision::Share)).boxed() + } +} + +#[test(tokio::test)] +async fn request_document_transitive() { + // Test that requesting a document from a peer who doesn't have that document but who is + // connected to another peer that does have the document eventually resolves + + let repo_1 = Repo::new(Some("repo1".to_string()), Box::new(SimpleStorage)); + let repo_2 = Repo::new(Some("repo2".to_string()), Box::new(SimpleStorage)); + let repo_3 = Repo::new(Some("repo3".to_string()), Box::new(SimpleStorage)) + .with_share_policy(Box::new(DontAnnounce)); + + let repo_handle_1 = repo_1.run(); + let repo_handle_2 = repo_2.run(); + let repo_handle_3 = repo_3.run(); + + let document_id = create_doc_with_contents(&repo_handle_3, "peer", "repo3").await; + + connect_repos(&repo_handle_1, &repo_handle_2); + connect_repos(&repo_handle_2, &repo_handle_3); + + let doc_handle = match tokio::time::timeout( + Duration::from_millis(1000), + repo_handle_1.request_document(document_id), + ) + .await + { + Ok(d) => d.unwrap(), + Err(_e) => { + panic!("Request timed out"); + } + }; + + doc_handle.expect("doc should exist").with_doc(|doc| { + let val = doc.get(&automerge::ROOT, "peer").unwrap(); + assert_eq!(val.unwrap().0.into_string().unwrap(), "repo3"); + }); + + tokio::task::spawn_blocking(|| { + repo_handle_1.stop().unwrap(); + repo_handle_2.stop().unwrap(); + repo_handle_3.stop().unwrap(); + }) + .await + .unwrap(); +} + +#[test(tokio::test)] +async fn request_document_which_no_peer_has_returns_unavailable() { + let repo_1 = Repo::new(Some("repo1".to_string()), Box::new(SimpleStorage)); + let repo_2 = Repo::new(Some("repo2".to_string()), Box::new(SimpleStorage)); + let repo_3 = Repo::new(Some("repo3".to_string()), Box::new(SimpleStorage)); + + let repo_handle_1 = repo_1.run(); + let repo_handle_2 = repo_2.run(); + let repo_handle_3 = repo_3.run(); + + connect_repos(&repo_handle_1, &repo_handle_2); + connect_repos(&repo_handle_2, &repo_handle_3); + + let document_id = DocumentId::random(); + + let doc_handle = match tokio::time::timeout( + Duration::from_millis(1000), + repo_handle_1.request_document(document_id), + ) + .await + { + Ok(d) => d.unwrap(), + Err(_e) => { + panic!("Request timed out"); + } + }; + + assert!(doc_handle.is_none()); + + tokio::task::spawn_blocking(|| { + repo_handle_1.stop().unwrap(); + repo_handle_2.stop().unwrap(); + repo_handle_3.stop().unwrap(); + }) + .await + .unwrap(); +} + +#[test(tokio::test)] +async fn request_document_which_no_peer_has_but_peer_appears_after_request_starts_resolves_to_some() +{ + let repo_1 = Repo::new(Some("repo1".to_string()), Box::new(SimpleStorage)); + let repo_2 = Repo::new(Some("repo2".to_string()), Box::new(SimpleStorage)); + let repo_3 = Repo::new(Some("repo3".to_string()), Box::new(SimpleStorage)) + .with_share_policy(Box::new(DontAnnounce)); + + let repo_handle_1 = repo_1.run(); + let repo_handle_2 = repo_2.run(); + let repo_handle_3 = repo_3.run(); + + // note: repo 3 is not connected + connect_repos(&repo_handle_1, &repo_handle_2); + // This connection will never respond and so we will hang around waiting until someone has the + // document + connect_to_nowhere(&repo_handle_1); + + let document_id = create_doc_with_contents(&repo_handle_3, "peer", "repo3").await; + + let doc_handle_fut = repo_handle_1.request_document(document_id); + + // wait a little bit + tokio::time::sleep(Duration::from_millis(1000)).await; + + //connect repo3 + connect_repos(&repo_handle_1, &repo_handle_3); + + let handle = match tokio::time::timeout(Duration::from_millis(1000), doc_handle_fut).await { + Ok(d) => d.unwrap(), + Err(_e) => { + panic!("Request timed out"); + } + }; + + handle.expect("doc should exist").with_doc(|doc| { + let val = doc.get(&automerge::ROOT, "peer").unwrap(); + assert_eq!(val.unwrap().0.into_string().unwrap(), "repo3"); + }); + + tokio::task::spawn_blocking(|| { + repo_handle_1.stop().unwrap(); + repo_handle_2.stop().unwrap(); + repo_handle_3.stop().unwrap(); + }) + .await + .unwrap(); +} + +#[test(tokio::test)] +async fn request_document_which_no_peer_has_but_transitive_peer_appears_after_request_starts_resolves_to_some( +) { + let repo_1 = Repo::new(Some("repo1".to_string()), Box::new(SimpleStorage)); + let repo_2 = Repo::new(Some("repo2".to_string()), Box::new(SimpleStorage)); + let repo_3 = Repo::new(Some("repo3".to_string()), Box::new(SimpleStorage)) + .with_share_policy(Box::new(DontAnnounce)); + + let repo_handle_1 = repo_1.run(); + let repo_handle_2 = repo_2.run(); + let repo_handle_3 = repo_3.run(); + + // note: repo 3 is not connected + connect_repos(&repo_handle_1, &repo_handle_2); + // This connection will never respond and so we will hang around waiting until someone has the + // document + connect_to_nowhere(&repo_handle_2); + + let document_id = create_doc_with_contents(&repo_handle_3, "peer", "repo3").await; + + let doc_handle_fut = repo_handle_1.request_document(document_id); + + // wait a little bit + tokio::time::sleep(Duration::from_millis(1000)).await; + + //connect repo3 + connect_repos(&repo_handle_2, &repo_handle_3); + + let handle = match tokio::time::timeout(Duration::from_millis(1000), doc_handle_fut).await { + Ok(d) => d.unwrap(), + Err(_e) => { + panic!("Request timed out"); + } + }; + + let handle = handle.expect("doc should exist"); + + // wait for the doc to sync up + // TODO: add an API for saying "wait until we're in sync with " + tokio::time::sleep(Duration::from_millis(1000)).await; + + handle.with_doc(|doc| { + let val = doc.get(&automerge::ROOT, "peer").unwrap(); + assert_eq!(val.unwrap().0.into_string().unwrap(), "repo3"); + }); + + tokio::task::spawn_blocking(|| { + repo_handle_1.stop().unwrap(); + repo_handle_2.stop().unwrap(); + repo_handle_3.stop().unwrap(); + }) + .await + .unwrap(); +} + +async fn create_doc_with_contents(handle: &RepoHandle, key: &str, value: &str) -> DocumentId { + let document_handle = handle.new_document().await; document_handle.with_doc_mut(|doc| { let mut tx = doc.transaction(); tx.put(automerge::ROOT, key, value) diff --git a/tests/network/document_save.rs b/tests/network/document_save.rs index c257279b..2acb1cbd 100644 --- a/tests/network/document_save.rs +++ b/tests/network/document_save.rs @@ -15,7 +15,7 @@ async fn test_simple_save() { let repo_handle = repo.run(); // Create a document for one repo. - let document_handle = repo_handle.new_document(); + let document_handle = repo_handle.new_document().await; let document_id = document_handle.document_id(); // Edit the document. @@ -70,7 +70,7 @@ async fn test_save_on_shutdown() { let repo_handle = repo.run(); // Create a document for one repo. - let document_handle = repo_handle.new_document(); + let document_handle = repo_handle.new_document().await; // Edit the document. let expected_value = format!("{}", repo_handle.get_repo_id()); @@ -111,7 +111,7 @@ async fn test_multiple_save() { let repo_handle = repo.run(); // Create a document for one repo. - let document_handle = repo_handle.new_document(); + let document_handle = repo_handle.new_document().await; let document_id = document_handle.document_id(); // Edit the document, once. @@ -177,7 +177,7 @@ async fn test_compact_save() { let repo_handle = repo.run(); // Create a document for one repo. - let document_handle = repo_handle.new_document(); + let document_handle = repo_handle.new_document().await; let document_id = document_handle.document_id(); // Edit the document, once. diff --git a/tests/network/main.rs b/tests/network/main.rs index 06a4b4a1..0ddca25d 100644 --- a/tests/network/main.rs +++ b/tests/network/main.rs @@ -42,7 +42,7 @@ async fn test_simple_sync() { let repo_handle = repo.run(); // Create a document. - let doc_handle = repo_handle.new_document(); + let doc_handle = repo_handle.new_document().await; doc_handle.with_doc_mut(|doc| { let mut tx = doc.transaction(); tx.put( @@ -104,7 +104,7 @@ async fn test_sinks_closed_on_shutdown() { let repo_handle_2 = repo_2.run(); // Create a document for one repo. - let document_handle_1 = repo_handle_1.new_document(); + let document_handle_1 = repo_handle_1.new_document().await; // Edit the document. document_handle_1.with_doc_mut(|doc| { @@ -142,7 +142,11 @@ async fn test_sinks_closed_on_shutdown() { // Request the document. let doc_handle_future = repo_handle_2.request_document(document_handle_1.document_id()); - let id = doc_handle_future.await.unwrap().document_id(); + let id = doc_handle_future + .await + .unwrap() + .expect("document should be found") + .document_id(); assert_eq!(id, document_handle_1.document_id()); // Stop the repos. @@ -176,7 +180,7 @@ async fn test_sinks_closed_on_replacement() { let repo_handle_2 = repo_2.run(); // Create a document for one repo. - let document_handle_1 = repo_handle_1.new_document(); + let document_handle_1 = repo_handle_1.new_document().await; // Edit the document. document_handle_1.with_doc_mut(|doc| { @@ -213,7 +217,11 @@ async fn test_sinks_closed_on_replacement() { // Request the document. let doc_handle_future = repo_handle_2.request_document(document_handle_1.document_id()); - let doc_id = doc_handle_future.await.unwrap().document_id(); + let doc_id = doc_handle_future + .await + .unwrap() + .expect("document should be found") + .document_id(); assert_eq!(doc_id, document_handle_1.document_id()); // Replace the peers. @@ -263,7 +271,7 @@ async fn test_streams_chained_on_replacement() { let repo_handle_2 = repo_2.run(); // Create a document for one repo. - let document_handle_1 = repo_handle_1.new_document(); + let document_handle_1 = repo_handle_1.new_document().await; // Edit the document. document_handle_1.with_doc_mut(|doc| { @@ -321,7 +329,12 @@ async fn test_streams_chained_on_replacement() { right_sink, ); - let doc_id = doc_handle_future.await.unwrap().unwrap().document_id(); + let doc_id = doc_handle_future + .await + .unwrap() + .unwrap() + .expect("document should be found") + .document_id(); assert_eq!(doc_id, document_handle_1.document_id()); // Stop the repos. @@ -355,7 +368,7 @@ async fn sync_with_unauthorized_peer_never_occurs() { connect_repos(&repo_handle_1, &repo_handle_2); connect_repos(&repo_handle_1, &repo_handle_3); - let doc_handle_1 = repo_handle_1.new_document(); + let doc_handle_1 = repo_handle_1.new_document().await; doc_handle_1.with_doc_mut(|doc| { let mut tx = doc.transaction(); tx.put( diff --git a/tests/network/tincans.rs b/tests/network/tincans.rs index 209a3b68..599c479a 100644 --- a/tests/network/tincans.rs +++ b/tests/network/tincans.rs @@ -1,6 +1,6 @@ use std::sync::{atomic::AtomicBool, Arc}; -use automerge_repo::{NetworkError, RepoHandle, RepoMessage}; +use automerge_repo::{NetworkError, RepoHandle, RepoId, RepoMessage}; use futures::{Sink, SinkExt, Stream, StreamExt}; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::PollSender; @@ -115,6 +115,13 @@ pub(crate) fn connect_repos(left: &RepoHandle, right: &RepoHandle) { right.new_remote_repo(left.get_repo_id().clone(), right_recv, right_send); } +pub(crate) fn connect_to_nowhere(handle: &RepoHandle) { + let TinCan { send, recv, .. } = tincan_to_nowhere(); + let random_suffix = rand::random::(); + let repo_id = RepoId::from(format!("nowhere-{}", random_suffix).as_str()); + handle.new_remote_repo(repo_id, recv, send); +} + /// A wrapper around a `Sink` which records whether `poll_close` has ever been called struct RecordCloseSink { inner: S,