From c966e5f445fcf232930e1653bc2918c66f31aaba Mon Sep 17 00:00:00 2001 From: Raman <24293550+ramank775@users.noreply.github.com> Date: Sun, 31 Jul 2022 17:58:36 +0530 Subject: [PATCH] feat add support for nats (#84) * feat: add support for nats jetstream as event store --- .env.tmpl | 3 +- .gitpod.yml | 11 +- .gitpod/Dockerfile | 22 +- .vscode/launch.json | 184 ++++++++++++++--- deployment/.env.tmpl | 12 +- deployment/docker-compose.yml | 14 +- deployment/scripts/init-nats.bash | 27 +++ libs/event-store/index.js | 4 +- libs/event-store/nats.js | 268 +++++++++++++++++++++++++ package.json | 23 ++- services/connection-gateway/gateway.js | 11 +- www/main.js | 9 +- yarn.lock | 19 ++ 13 files changed, 547 insertions(+), 60 deletions(-) create mode 100755 deployment/scripts/init-nats.bash create mode 100644 libs/event-store/nats.js diff --git a/.env.tmpl b/.env.tmpl index 8030547..dfde0ff 100644 --- a/.env.tmpl +++ b/.env.tmpl @@ -57,4 +57,5 @@ S3_BUCKET_NAME= # Service Discovery SERVICE_DISCOVERY_PATH=./deployment/config/discovery_service/services.json - +# NATS +NATS_SERVER_LIST=nats://127.0.0.1:4222 diff --git a/.gitpod.yml b/.gitpod.yml index fe0d55d..e6af663 100644 --- a/.gitpod.yml +++ b/.gitpod.yml @@ -1,4 +1,3 @@ - image: file: .gitpod/Dockerfile @@ -9,17 +8,19 @@ ports: tasks: - init: yarn install - name: Kafka - init: chmod +x deployment/scripts/*.bash && ./deployment/scripts/setup.bash - command: > + init: chmod +x deployment/scripts/*.bash && ./deployment/scripts/setup.bash + command: > KAFKA_HOME=/kafka && ./deployment/scripts/init-kafka.bash $KAFKA_HOME .env + - name: Nats + init: chmod +x deployment/scripts/*.bash && ./deployment/scripts/setup.bash + command: ./deployment/scripts/init-nats.bash .env - name: Nginx command: export NGINX_DOCROOT="${GITPOD_REPO_ROOT}/www" && nginx - name: mongodb command: mkdir -p /workspace/data && mongod --dbpath /workspace/data - name: redis - command: redis-server - + command: redis-server vscode: extensions: diff --git a/.gitpod/Dockerfile b/.gitpod/Dockerfile index 36d1c1a..20b9b21 100644 --- a/.gitpod/Dockerfile +++ b/.gitpod/Dockerfile @@ -1,10 +1,24 @@ FROM gitpod/workspace-mongodb +ARG NATS_VERSION="v2.8.3" +ARG KAFKA_VERSION="3.2.0" +ARG NATS_CLI_VERSION="0.0.33" + COPY .gitpod/load.nginx.conf /etc/nginx/nginx.conf RUN sudo apt-get update && sudo apt-get install redis-server -y -RUN wget https://downloads.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz && \ - tar xzf kafka_2.13-3.0.0.tgz && \ - sudo mv kafka_2.13-3.0.0 /kafka && \ - rm kafka_2.13-3.0.0.tgz +RUN wget https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_2.13-${KAFKA_VERSION}.tgz && \ + tar xzf kafka_2.13-${KAFKA_VERSION}.tgz && \ + sudo mv kafka_2.13-${KAFKA_VERSION} /kafka && \ + rm kafka_2.13-${KAFKA_VERSION}.tgz + +RUN curl -L https://github.com/nats-io/nats-server/releases/download/${NATS_VERSION}/nats-server-${NATS_VERSION}-linux-amd64.zip -o nats-server.zip && \ + unzip nats-server.zip -d nats-server && \ + sudo cp nats-server/nats-server-${NATS_VERSION}-linux-amd64/nats-server /usr/bin && \ + rm -rf nats-server.zip nats-server + +RUN curl -L https://github.com/nats-io/natscli/releases/download/v${NATS_CLI_VERSION}/nats-${NATS_CLI_VERSION}-linux-amd64.zip -o nats-cli.zip && \ + unzip nats-cli.zip -d nats-cli && \ + sudo cp nats-cli/nats-${NATS_CLI_VERSION}-linux-amd64/nats /usr/bin && \ + rm -rf nats-cli.zip nats-cli diff --git a/.vscode/launch.json b/.vscode/launch.json index 81e3d24..6a6caa7 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -6,7 +6,7 @@ { "type": "node", "request": "launch", - "name": "Launch Gateway MS", + "name": "Launch Gateway MS - Kafka", "program": "${workspaceFolder}/services/connection-gateway/gateway.js", "args": [ "--app-name=ws-gateway-1", @@ -27,7 +27,7 @@ { "type": "node", "request": "launch", - "name": "Launch Message Delivery MS", + "name": "Launch Message Delivery MS - Kafka", "program": "${workspaceFolder}/services/message-delivery/message-delivery-ms.js", "args": [ "--app-name=message-delivery", @@ -54,7 +54,7 @@ { "type": "node", "request": "launch", - "name": "Launch Message Router MS", + "name": "Launch Message Router MS - Kafka", "program": "${workspaceFolder}/services/message-router-ms/message-router-ms.js", "args": [ "--app-name=message-router-1", @@ -77,7 +77,7 @@ { "type": "node", "request": "launch", - "name": "Launch profile MS", + "name": "Launch profile MS - Kafka", "program": "${workspaceFolder}/services/profile-ms/profile-ms.js", "args": [ "--app-name=profile-ms-1", @@ -99,7 +99,70 @@ { "type": "node", "request": "launch", - "name": "Launch file MS", + "name": "Launch group MS - Kafka", + "program": "${workspaceFolder}/services/group-ms/group-ms.js", + "args": [ + "--app-name=group-ms-1", + "--debug", + "--port=4002", + "--mongo-url=mongodb://localhost:27017/chat", + "--event-store=kafka", + "--kafka-client-id=group-1", + "--kafka-broker-list=${BROKER_LIST}", + "--new-group-message-topic=${TOPIC_NEW_GROUP_MS}" + ], + "envFile": "${workspaceFolder}/.env", + "autoAttachChildProcesses": true + }, + { + "type": "node", + "request": "launch", + "name": "Launch Group Message Router MS - Kafka", + "program": "${workspaceFolder}/services/group-ms/group-message-router-ms.js", + "args": [ + "--app-name=group-message-router-1", + "--debug", + "--event-store=kafka", + "--kafka-client-id=group-message-router-1", + "--send-message-topic=${TOPIC_SEND_MESSAGE}", + "--kafka-consumer-group=${CONSUMER_GROUP_GROUP_MESSAGE_ROUTER}", + "--new-group-message-topic=${TOPIC_NEW_GROUP_MS}", + "--system-message-topic=${TOPIC_SYSTEM_MESSAGE}", + "--kafka-broker-list=${BROKER_LIST}", + "--mongo-url=${MONGO_URL}", + "--persistence-message-topic=${TOPIC_DB_MESSAGE}" + ], + "envFile": "${workspaceFolder}/.env", + "autoAttachChildProcesses": true + }, + { + "type": "node", + "request": "launch", + "name": "Launch notification MS - Kafka", + "program": "${workspaceFolder}/services/notification-ms/notification-ms.js", + "args": [ + "--app-name=notification-ms-1", + "--debug", + "--mongo-url=${MONGO_URL}", + "--pn-service=mock", + "--firebase-admin-credential-json-path=${workspaceFolder}/${FIREBASE_ADMIN_CRED_JSON_PATH}", + "--event-store=kafka", + "--kafka-broker-list=${BROKER_LIST}", + "--kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL}", + "--kafka-sasl-username=${KAFKA_SASL_USERNAME}", + "--kafka-sasl-password=${KAFKA_SASL_PASSWORD}", + "--kafka-consumer-group=notification-ms", + "--new-login-topic=${TOPIC_NEW_LOGIN}", + "--offline-message-topic=${TOPIC_OFFLINE_MESSAGE}", + "--db-app-initial=${PERSISTENCE_MESSAGE_MS_INITIAL}" + ], + "envFile": "${workspaceFolder}/.env", + "autoAttachChildProcesses": true + }, + { + "type": "node", + "request": "launch", + "name": "Launch file MS - Common", "program": "${workspaceFolder}/services/file-ms/file-ms.js", "args": [ "--app-name=file-ms-1", @@ -119,17 +182,94 @@ { "type": "node", "request": "launch", - "name": "Launch group MS", + "name": "Launch Gateway MS - Nats", + "program": "${workspaceFolder}/services/connection-gateway/gateway.js", + "args": [ + "--app-name=ws-gateway-1", + "--debug", + "--gateway-name=gateway-1", + "--event-store=nats", + "--user-connection-state-topic=${TOPIC_USER_CONNECTION_STATE}", + "--new-message-topic=${TOPIC_NEW_MESSAGE}", + "--nats-server-list=${NATS_SERVER_LIST}" + ], + "envFile": "${workspaceFolder}/.env", + "autoAttachChildProcesses": true, + }, + { + "type": "node", + "request": "launch", + "name": "Launch Message Delivery MS - NATS", + "program": "${workspaceFolder}/services/message-delivery/message-delivery-ms.js", + "args": [ + "--app-name=message-delivery", + "--debug", + "--mongo-url=${MONGO_URL}", + "--event-store=nats", + "--user-connection-state-topic=${TOPIC_USER_CONNECTION_STATE}", + "--send-message-topic=${TOPIC_SEND_MESSAGE}", + "--system-message-topic=${TOPIC_SYSTEM_MESSAGE}", + "--offline-message-topic=${TOPIC_OFFLINE_MESSAGE}", + "--service-discovery-path=${SERVICE_DISCOVERY_PATH}", + "--nats-server-list=${NATS_SERVER_LIST}", + "--nats-consumer-group=${CONSUMER_GROUP_MESSAGE_DELIVERY}", + "--cache-type=redis", + "--redis-endpoint=127.0.0.1:6379" + ], + "envFile": "${workspaceFolder}/.env", + "autoAttachChildProcesses": true + }, + { + "type": "node", + "request": "launch", + "name": "Launch Message Router MS - NATS", + "program": "${workspaceFolder}/services/message-router-ms/message-router-ms.js", + "args": [ + "--app-name=message-router-1", + "--debug", + "--event-store=nats", + "--send-message-topic=${TOPIC_SEND_MESSAGE}", + "--new-message-topic=${TOPIC_NEW_MESSAGE}", + "--system-message-topic=${TOPIC_SYSTEM_MESSAGE}", + "--group-message-topic=${TOPIC_NEW_GROUP_MS}", + "--nats-server-list=${NATS_SERVER_LIST}", + "--nats-consumer-group=${CONSUMER_GROUP_MESSAGE_ROUTER}", + ], + "envFile": "${workspaceFolder}/.env", + "autoAttachChildProcesses": true + }, + { + "type": "node", + "request": "launch", + "name": "Launch profile MS - NATS", + "program": "${workspaceFolder}/services/profile-ms/profile-ms.js", + "args": [ + "--app-name=profile-ms-1", + "--debug", + "--port=4000", + "--auth-provider=mock", + "--mongo-url=${MONGO_URL}", + "--firebase-project-id=${FIREBASE_PROJECT_ID}", + "--event-store=nats", + "--new-login-topic=${TOPIC_NEW_LOGIN}", + "--nats-server-list=${NATS_SERVER_LIST}", + ], + "envFile": "${workspaceFolder}/.env", + "autoAttachChildProcesses": true + }, + { + "type": "node", + "request": "launch", + "name": "Launch group MS - NATS", "program": "${workspaceFolder}/services/group-ms/group-ms.js", "args": [ "--app-name=group-ms-1", "--debug", "--port=4002", "--mongo-url=mongodb://localhost:27017/chat", - "--event-store=kafka", - "--kafka-client-id=group-1", - "--kafka-broker-list=${BROKER_LIST}", - "--new-group-message-topic=${TOPIC_NEW_GROUP_MS}" + "--event-store=nats", + "--new-group-message-topic=${TOPIC_NEW_GROUP_MS}", + "--nats-server-list=${NATS_SERVER_LIST}", ], "envFile": "${workspaceFolder}/.env", "autoAttachChildProcesses": true @@ -137,20 +277,19 @@ { "type": "node", "request": "launch", - "name": "Launch Group Message Router MS", + "name": "Launch Group Message Router MS - NATS", "program": "${workspaceFolder}/services/group-ms/group-message-router-ms.js", "args": [ "--app-name=group-message-router-1", "--debug", - "--event-store=kafka", - "--kafka-client-id=group-message-router-1", + "--event-store=nats", "--send-message-topic=${TOPIC_SEND_MESSAGE}", - "--kafka-consumer-group=${CONSUMER_GROUP_GROUP_MESSAGE_ROUTER}", "--new-group-message-topic=${TOPIC_NEW_GROUP_MS}", "--system-message-topic=${TOPIC_SYSTEM_MESSAGE}", - "--kafka-broker-list=${BROKER_LIST}", + "--persistence-message-topic=${TOPIC_DB_MESSAGE}", + "--nats-server-list=${NATS_SERVER_LIST}", + "--nats-consumer-group=${CONSUMER_GROUP_GROUP_MESSAGE_ROUTER}", "--mongo-url=${MONGO_URL}", - "--persistence-message-topic=${TOPIC_DB_MESSAGE}" ], "envFile": "${workspaceFolder}/.env", "autoAttachChildProcesses": true @@ -158,7 +297,7 @@ { "type": "node", "request": "launch", - "name": "Launch notification MS", + "name": "Launch notification MS - NATS", "program": "${workspaceFolder}/services/notification-ms/notification-ms.js", "args": [ "--app-name=notification-ms-1", @@ -166,18 +305,15 @@ "--mongo-url=${MONGO_URL}", "--pn-service=mock", "--firebase-admin-credential-json-path=${workspaceFolder}/${FIREBASE_ADMIN_CRED_JSON_PATH}", - "--event-store=kafka", - "--kafka-broker-list=${BROKER_LIST}", - "--kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL}", - "--kafka-sasl-username=${KAFKA_SASL_USERNAME}", - "--kafka-sasl-password=${KAFKA_SASL_PASSWORD}", - "--kafka-consumer-group=notification-ms", + "--event-store=nats", + "--nats-server-list=${NATS_SERVER_LIST}", + "--nats-consumer-group=notification-ms", "--new-login-topic=${TOPIC_NEW_LOGIN}", "--offline-message-topic=${TOPIC_OFFLINE_MESSAGE}", "--db-app-initial=${PERSISTENCE_MESSAGE_MS_INITIAL}" ], "envFile": "${workspaceFolder}/.env", "autoAttachChildProcesses": true - } + }, ] } diff --git a/deployment/.env.tmpl b/deployment/.env.tmpl index e5d4fa6..5a772c3 100644 --- a/deployment/.env.tmpl +++ b/deployment/.env.tmpl @@ -2,6 +2,7 @@ STAGE=dev CHAT_SERVER_TAG=v2.2.6 CONFIG_DIR=./config +EVENT_STORE=kafka # KAFKA TOPIC TOPIC_USER_CONNECTION_STATE=user-connection-state @@ -31,7 +32,7 @@ KAFKA_SECURITY_PROTOCOL= KAFKA_SASL_USERNAME= KAFKA_SASL_PASSWORD= -# KAFKA CONSUMER GROUP +# EVENT STORE CONSUMER GROUP CONSUMER_GROUP_MESSAGE_ROUTER=message-router CONSUMER_GROUP_MESSAGE_DELIVERY=message-delivery CONSUMER_GROUP_GATEWAY=gateway @@ -54,3 +55,12 @@ PN_TTL=3600 S3_ACCESS_KEY_ID= S3_SECRET_ACCESS_KEY= S3_BUCKET_NAME= + +# NATS +NATS_SERVER_LIST=nats://127.0.0.1:4222 +NATS_AUTH= +NATS_USER= +NATS_PASS= +NATS_AUTH_TOKEN= +NATS_NKEY= +NATS_USER_JWT= diff --git a/deployment/docker-compose.yml b/deployment/docker-compose.yml index 24ec491..b84da68 100644 --- a/deployment/docker-compose.yml +++ b/deployment/docker-compose.yml @@ -9,41 +9,41 @@ services: gateway: image: ramank775/chatserver:${CHAT_SERVER_TAG} entrypoint: node - command: /app/services/connection-gateway/gateway.js --host=0.0.0.0 --app-name=ws-gateway --gateway-name=gateway-1 --user-connection-state-topic=${TOPIC_USER_CONNECTION_STATE} --new-message-topic=${TOPIC_NEW_MESSAGE} --event-store=kafka --kafka-client-id=${GATEWAY_CLIENT_ID:-gateway-1} --kafka-consumer-group=${CONSUMER_GROUP_GATEWAY} --kafka-broker-list='${BROKER_LIST}' --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} + command: /app/services/connection-gateway/gateway.js --host=0.0.0.0 --app-name=ws-gateway --gateway-name=gateway-1 --user-connection-state-topic=${TOPIC_USER_CONNECTION_STATE} --new-message-topic=${TOPIC_NEW_MESSAGE} --event-store=${EVENT_STORE} --kafka-client-id=${GATEWAY_CLIENT_ID:-gateway-1} --kafka-consumer-group=${CONSUMER_GROUP_GATEWAY} --kafka-broker-list='${BROKER_LIST}' --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} --nats-server-list=${NATS_SERVER_LIST} --nats-auth-type=${NATS_AUTH} --nats-auth-user=${NATS_USER} --nats-auth-pass=${NATS_PASS} --nats-auth-token=${NATS_AUTH_TOKEN} --nats-auth-nkey=${NATS_NKEY} --nats-auth-jwt=${NATS_USER_JWT} message-delivery: image: ramank775/chatserver:${CHAT_SERVER_TAG} volumes: - '${CONFIG_DIR}/discovery_service/services.json:/app/discovery_service.json' entrypoint: node - command: /app/services/message-delivery/message-delivery-ms.js --app-name=message-delivery --user-connection-state-topic=${TOPIC_USER_CONNECTION_STATE} --send-message-topic=${TOPIC_SEND_MESSAGE} --system-message-topic=${TOPIC_SYSTEM_MESSAGE} --offline-message-topic=${TOPIC_OFFLINE_MESSAGE} --service-discovery-path=/app/discovery_service.json --event-store=kafka --kafka-client-id=${MESSAGE_DELIVERY_CLIENT_ID:-message-delivery-1} --kafka-consumer-group=${CONSUMER_GROUP_MESSAGE_DELIVERY} --kafka-broker-list=${BROKER_LIST} --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} --cache-type=redis --redis-endpoint=${REDIS_ENDPOINT} --message-db=mongo --mongo-url=${MONGO_URL} --mongo-user=${MONGO_USER} --mongo-password=${MONGO_PASSWORD} --mongo-auth + command: /app/services/message-delivery/message-delivery-ms.js --app-name=message-delivery --user-connection-state-topic=${TOPIC_USER_CONNECTION_STATE} --send-message-topic=${TOPIC_SEND_MESSAGE} --system-message-topic=${TOPIC_SYSTEM_MESSAGE} --offline-message-topic=${TOPIC_OFFLINE_MESSAGE} --service-discovery-path=/app/discovery_service.json --event-store=${EVENT_STORE} --kafka-client-id=${MESSAGE_DELIVERY_CLIENT_ID:-message-delivery-1} --kafka-consumer-group=${CONSUMER_GROUP_MESSAGE_DELIVERY} --kafka-broker-list=${BROKER_LIST} --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} --cache-type=redis --redis-endpoint=${REDIS_ENDPOINT} --message-db=mongo --mongo-url=${MONGO_URL} --mongo-user=${MONGO_USER} --mongo-password=${MONGO_PASSWORD} --mongo-auth --nats-server-list=${NATS_SERVER_LIST} --nats-auth-type=${NATS_AUTH} --nats-auth-user=${NATS_USER} --nats-auth-pass=${NATS_PASS} --nats-auth-token=${NATS_AUTH_TOKEN} --nats-auth-nkey=${NATS_NKEY} --nats-auth-jwt=${NATS_USER_JWT} --nats-consumer-group=${CONSUMER_GROUP_MESSAGE_DELIVERY} message_router: image: ramank775/chatserver:${CHAT_SERVER_TAG} entrypoint: node - command: /app/services/message-router-ms/message-router-ms.js --app-name=message-router-1 --send-message-topic=${TOPIC_SEND_MESSAGE} --new-message-topic=${TOPIC_NEW_MESSAGE} --system-message-topic=${TOPIC_SYSTEM_MESSAGE} --group-message-topic=${TOPIC_NEW_GROUP_MS} --event-store=kafka --kafka-client-id=${MESSAGE_ROUTER_CLIENT_ID:-message-router-1} --kafka-consumer-group=${CONSUMER_GROUP_MESSAGE_ROUTER} --kafka-broker-list=${BROKER_LIST} --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} + command: /app/services/message-router-ms/message-router-ms.js --app-name=message-router-1 --send-message-topic=${TOPIC_SEND_MESSAGE} --new-message-topic=${TOPIC_NEW_MESSAGE} --system-message-topic=${TOPIC_SYSTEM_MESSAGE} --group-message-topic=${TOPIC_NEW_GROUP_MS} --event-store=${EVENT_STORE} --kafka-client-id=${MESSAGE_ROUTER_CLIENT_ID:-message-router-1} --kafka-consumer-group=${CONSUMER_GROUP_MESSAGE_ROUTER} --kafka-broker-list=${BROKER_LIST} --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} --nats-server-list=${NATS_SERVER_LIST} --nats-auth-type=${NATS_AUTH} --nats-auth-user=${NATS_USER} --nats-auth-pass=${NATS_PASS} --nats-auth-token=${NATS_AUTH_TOKEN} --nats-auth-nkey=${NATS_NKEY} --nats-auth-jwt=${NATS_USER_JWT} --nats-consumer-group=${CONSUMER_GROUP_MESSAGE_ROUTER} profile: image: ramank775/chatserver:${CHAT_SERVER_TAG} entrypoint: node - command: /app/services/profile-ms/profile-ms.js --app-name=profile-ms-1 --host=0.0.0.0 --port=4000 --new-login-topic=${TOPIC_NEW_LOGIN} --auth-provider=firebase --firebase-project-id=${FIREBASE_PROJECT_ID} --auth-db=mongo --profile-db=mongo --mongo-url=${MONGO_URL} --mongo-user=${MONGO_USER} --mongo-password=${MONGO_PASSWORD} --mongo-auth --event-store=kafka --kafka-broker-list=${BROKER_LIST} --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} --kafka-client-id=${PROFILE_CLIENT_ID:-profile-1} + command: /app/services/profile-ms/profile-ms.js --app-name=profile-ms-1 --host=0.0.0.0 --port=4000 --new-login-topic=${TOPIC_NEW_LOGIN} --auth-provider=firebase --firebase-project-id=${FIREBASE_PROJECT_ID} --auth-db=mongo --profile-db=mongo --mongo-url=${MONGO_URL} --mongo-user=${MONGO_USER} --mongo-password=${MONGO_PASSWORD} --mongo-auth --event-store=${EVENT_STORE} --kafka-broker-list=${BROKER_LIST} --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} --kafka-client-id=${PROFILE_CLIENT_ID:-profile-1} --nats-server-list=${NATS_SERVER_LIST} --nats-auth-type=${NATS_AUTH} --nats-auth-user=${NATS_USER} --nats-auth-pass=${NATS_PASS} --nats-auth-token=${NATS_AUTH_TOKEN} --nats-auth-nkey=${NATS_NKEY} --nats-auth-jwt=${NATS_USER_JWT} group: image: ramank775/chatserver:${CHAT_SERVER_TAG} entrypoint: node - command: /app/services/group-ms/group-ms.js --app-name=group-ms-1 --host=0.0.0.0 --port=4002 --new-group-message-topic=${TOPIC_NEW_GROUP_MS} --group-db=mongo --mongo-url=${MONGO_URL} --mongo-user=${MONGO_USER} --mongo-password=${MONGO_PASSWORD} --mongo-auth --event-store=kafka --kafka-broker-list=${BROKER_LIST} --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} --kafka-client-id=${GROUP_CLIENT_ID:-group-1} + command: /app/services/group-ms/group-ms.js --app-name=group-ms-1 --host=0.0.0.0 --port=4002 --new-group-message-topic=${TOPIC_NEW_GROUP_MS} --group-db=mongo --mongo-url=${MONGO_URL} --mongo-user=${MONGO_USER} --mongo-password=${MONGO_PASSWORD} --mongo-auth --event-store=${EVENT_STORE} --kafka-broker-list=${BROKER_LIST} --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} --kafka-client-id=${GROUP_CLIENT_ID:-group-1} --nats-server-list=${NATS_SERVER_LIST} --nats-auth-type=${NATS_AUTH} --nats-auth-user=${NATS_USER} --nats-auth-pass=${NATS_PASS} --nats-auth-token=${NATS_AUTH_TOKEN} --nats-auth-nkey=${NATS_NKEY} --nats-auth-jwt=${NATS_USER_JWT} group_message_router: image: ramank775/chatserver:${CHAT_SERVER_TAG} entrypoint: node - command: /app/services/group-ms/group-message-router-ms.js --app-name=group-message-router-1 --new-group-message-topic=${TOPIC_NEW_GROUP_MS} --send-message-topic=${TOPIC_SEND_MESSAGE} --system-message-topic=${TOPIC_SYSTEM_MESSAGE} --event-store=kafka --kafka-broker-list=${BROKER_LIST} --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} --kafka-client-id=${GROUP_MESSAGE_ROUTER_CLIENT_ID:-group-message-router-1} --kafka-consumer-group=${CONSUMER_GROUP_GROUP_MESSAGE_ROUTER} --group-db=mongo --mongo-url=${MONGO_URL} --mongo-user=${MONGO_USER} --mongo-password=${MONGO_PASSWORD} --mongo-auth + command: /app/services/group-ms/group-message-router-ms.js --app-name=group-message-router-1 --new-group-message-topic=${TOPIC_NEW_GROUP_MS} --send-message-topic=${TOPIC_SEND_MESSAGE} --system-message-topic=${TOPIC_SYSTEM_MESSAGE} --event-store=${EVENT_STORE} --kafka-broker-list=${BROKER_LIST} --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} --kafka-client-id=${GROUP_MESSAGE_ROUTER_CLIENT_ID:-group-message-router-1} --kafka-consumer-group=${CONSUMER_GROUP_GROUP_MESSAGE_ROUTER} --group-db=mongo --mongo-url=${MONGO_URL} --mongo-user=${MONGO_USER} --mongo-password=${MONGO_PASSWORD} --mongo-auth --nats-server-list=${NATS_SERVER_LIST} --nats-auth-type=${NATS_AUTH} --nats-auth-user=${NATS_USER} --nats-auth-pass=${NATS_PASS} --nats-auth-token=${NATS_AUTH_TOKEN} --nats-auth-nkey=${NATS_NKEY} --nats-auth-jwt=${NATS_USER_JWT} --nats-consumer-group=${CONSUMER_GROUP_GROUP_MESSAGE_ROUTER} notification: image: ramank775/chatserver:${CHAT_SERVER_TAG} volumes: - ${CONFIG_DIR}/firebase/service.json:/app/.service.json:ro entrypoint: node - command: /app/services/notification-ms/notification-ms.js --app-name=notification-ms-1 --new-login-topic=${TOPIC_NEW_LOGIN} --offline-message-topic=${TOPIC_OFFLINE_MESSAGE} --notification-db=mongo --mongo-url=${MONGO_URL} --mongo-user=${MONGO_USER} --mongo-password=${MONGO_PASSWORD} --mongo-auth --pn-service=firebase --firebase-admin-credential-json-path=/app/.service.json --event-store=kafka --kafka-broker-list=${BROKER_LIST} --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} --kafka-consumer-group=${CONSUMER_GROUP_NOTIFICATION} --offline-msg-initial=${PERSISTENCE_MESSAGE_MS_INITIAL} --kafka-client-id=${NOTIFICATION_CLIENT_ID:-notification-1} --firebase-pn-ttl=${PN_TTL:-86400} + command: /app/services/notification-ms/notification-ms.js --app-name=notification-ms-1 --new-login-topic=${TOPIC_NEW_LOGIN} --offline-message-topic=${TOPIC_OFFLINE_MESSAGE} --notification-db=mongo --mongo-url=${MONGO_URL} --mongo-user=${MONGO_USER} --mongo-password=${MONGO_PASSWORD} --mongo-auth --pn-service=firebase --firebase-admin-credential-json-path=/app/.service.json --event-store=${EVENT_STORE} --kafka-broker-list=${BROKER_LIST} --kafka-security-protocol=${KAFKA_SECURITY_PROTOCOL} --kafka-sasl-username=${KAFKA_SASL_USERNAME} --kafka-sasl-password=${KAFKA_SASL_PASSWORD} --kafka-consumer-group=${CONSUMER_GROUP_NOTIFICATION} --offline-msg-initial=${PERSISTENCE_MESSAGE_MS_INITIAL} --kafka-client-id=${NOTIFICATION_CLIENT_ID:-notification-1} --firebase-pn-ttl=${PN_TTL:-86400} --nats-server-list=${NATS_SERVER_LIST} --nats-auth-type=${NATS_AUTH} --nats-auth-user=${NATS_USER} --nats-auth-pass=${NATS_PASS} --nats-auth-token=${NATS_AUTH_TOKEN} --nats-auth-nkey=${NATS_NKEY} --nats-auth-jwt=${NATS_USER_JWT} --nats-consumer-group=${CONSUMER_GROUP_NOTIFICATION} file: image: ramank775/chatserver:${CHAT_SERVER_TAG} diff --git a/deployment/scripts/init-nats.bash b/deployment/scripts/init-nats.bash new file mode 100755 index 0000000..9b415dd --- /dev/null +++ b/deployment/scripts/init-nats.bash @@ -0,0 +1,27 @@ +#!/bin/bash + +start_nats() { + nats-server -js > /dev/null 2>&1 & + echo "Nats server started" +} + +create_stream () { + nats str add --subjects="$1.>" --storage=memory --replicas=1 --ack --retention=limits --discard=old --max-msgs=-1 --max-msgs-per-subject=-1 --max-bytes=-1 --max-age=-1 --max-msg-size=-1 --dupe-window=2m0s --allow-rollup --no-deny-delete --no-deny-purge $1 +} + +ENV_FILE=${1:-.env} + +echo "ENV FILE" $ENV_FILE + +source $ENV_FILE + +start_nats; + +sleep 2s; + +for var in "${!TOPIC_@}"; do + create_stream ${!var}; + sleep 1s; +done + +nats str ls diff --git a/libs/event-store/index.js b/libs/event-store/index.js index b54fa93..d99e394 100644 --- a/libs/event-store/index.js +++ b/libs/event-store/index.js @@ -1,8 +1,10 @@ const { IEventStore } = require('./iEventStore') const Kafka = require('./kafka'); +const Nats = require('./nats'); const EVENT_STORE = [ - Kafka + Kafka, + Nats, ]; /** diff --git a/libs/event-store/nats.js b/libs/event-store/nats.js new file mode 100644 index 0000000..7a7030d --- /dev/null +++ b/libs/event-store/nats.js @@ -0,0 +1,268 @@ +const nats = require('nats'); +const { shortuuid } = require('../../helper'); +const { IEventStore } = require('./iEventStore'); + +function parseAuthOptions(options) { + const authOptions = {} + switch (options.natsAuthType) { + case 'pass': + authOptions.user = options.natsAuthUser; + authOptions.pass = options.natsAuthPass; + break; + case 'token': + authOptions.token = options.natsAuthToken; + break; + case 'nkey': + { + const seed = new TextEncoder().encode(options.natsAuthNkey); + authOptions.authenticator = nats.nkeyAuthenticator(seed); + } + break + case 'jwt': + { + const creds = new TextEncoder().encode(` +-----BEGIN NATS USER JWT----- +${options.natsAuthJwt} +------END NATS USER JWT------ +************************* IMPORTANT ************************* +NKEY Seed printed below can be used sign and prove identity. +NKEYs are sensitive and should be treated as secrets. +-----BEGIN USER NKEY SEED----- +${options.natsAuthNkey} +------END USER NKEY SEED------ +` + ); + authOptions.authenticator = nats.credsAuthenticator(creds); + } + break; + default: + break; + } + return authOptions; +} +function parseNatsOptions(options) { + const servers = options.natsServerList.split(',') + const authOptions = parseAuthOptions(options) + return { + servers, + ...authOptions + }; +} + +function getConsumerOpts(options) { + const opts = nats.consumerOpts(); + opts.queue(options.natsConsumerGroup); + opts.durable(options.natsConsumerGroup); + return opts; +} + +class NatsEventStore extends IEventStore { + /** @type {import('../logger').Logger} */ + #logger; + + #options = {}; + + /** @type {nats.NatsConnection} */ + #nc; + + /** @type {import('node:async_hooks').AsyncLocalStorage} */ + #asyncStorage; + + /** @type {nats.JetStreamClient} */ + #jsc; + + /** @type {Map} */ + #psub = new Map(); + + #isDisconnect = false; + + /** @type { string[] } */ + #subjects + + /** @type {nats.Codec} */ + #codec + + constructor(context) { + super(); + this.#options = context.options; + this.#logger = context.log; + this.#subjects = context.listenerEvents; + this.#asyncStorage = context.asyncStorage; + this.#codec = nats.JSONCodec() + } + + /** + * Get kafka instance + * @param {Object} context + * @return {Promise} + */ + async #getNatsInstance() { + if (!this.#nc) { + const options = parseNatsOptions(this.#options); + this.#logger.info('connecting nats server'); + this.#nc = await nats.connect(options) + } + return this.#nc; + } + + async #getJetStreamClient() { + if (!this.#jsc) { + const nc = await this.#getNatsInstance(); + this.#jsc = nc.jetstream() + } + return this.#jsc; + } + + async #createNatsConsumer() { + nats.createInbox() + const js = await this.#getJetStreamClient(); + + try { + this.#logger.info('subscribing to consumer.'); + const promises = this.#subjects.map((subject) => { + const opts = getConsumerOpts(this.#options); + opts.callback(async (err, msg) => { + if (err) { + this.#logger.error('Error while processing nats message', err); + } + if (!msg) { + return; + } + await this.#eachMessage(msg); + this.#psub.get(subject).pull(); + }) + const subp = js.pullSubscribe(`${subject}.>`, opts); + subp.then((sub) => { + this.#psub.set(subject, sub); + }) + return subp; + }) + await Promise.all(promises) + this.#logger.info('Consumer subscribe sucessfully.'); + } catch (error) { + this.#logger.error(`Error while subscribing consumer. ${error}`); + throw error; + } + + setTimeout(async () => { + this.#psub.forEach((sub) => { + sub.pull(); + }) + }, 500); + + return this.#jsc; + } + + async #eachMessage(msg) { + const start = Date.now(); + const trackId = msg.headers.get('track_id') || shortuuid(); + const [topic, key, partition] = msg.subject.split('.', 3); + + await this.#asyncStorage.run(trackId, async () => { + const data = { + key, + value: this.#codec.decode(msg.data) + }; + const logInfo = { + topic, + partition, + offset: msg.seq, + key: data.key + }; + this.#logger.info(`new data received`, { ...logInfo, ...(data.value.META || {}) }); + const sConsume = Date.now(); + try { + await this.on(topic, data.value); + msg.ack(); + } catch (e) { + this.#logger.error(`Error while processing message`, { err: e }); + // TODO: wait to msg to have retryCount + if (msg.redelivered) + msg.term(); + else + msg.nak(); + } + logInfo.latency = Date.now() - start; + logInfo.consume_latency = Date.now() - sConsume; + this.#logger.info('message consumed', logInfo); + }); + } + + /** + * Initialize Nats event store + * @param {import('./iEventStore').InitOptions} options + */ + async init(options) { + await this.#getJetStreamClient(); + if (options.consumer) { + await this.#createNatsConsumer(); + } + } + + /** + * Emit an new event to event store + * @param {string} event Name of the event + * @param {*} args Event arguments + * @param {string} key + */ + async emit(event, args, key) { + const trackId = this.#asyncStorage.getStore() || shortuuid(); + try { + const start = Date.now(); + const jc = await this.#getJetStreamClient() + const data = this.#codec.encode(args) + const headers = nats.headers() + headers.append('track_id', trackId) + const response = await jc.publish(`${event}.${key}`, data, { + msgID: trackId, + headers, + }); + const elasped = Date.now() - start; + this.#logger.info(`Sucessfully produced message`, { + event, + stream: response.stream, + offset: response.seq, + key, + produceIn: elasped + }); + } catch (error) { + this.#logger.error(`Error while producing message`, { error }); + throw error; + } + } + + async dispose() { + super.dispose(); + if (this.#psub) { + this.#psub.unsubscribe(); + } + } +} + +/** + * Add Nats server options + * @param {import('commander').Command} cmd + */ +function initOptions(cmd) { + return cmd + .option('--nats-server-list ', 'List of nats server endpoints') + .option('--nats-auth-type ', 'Nats client auth options ') + .option('--nats-auth-user ', 'Nats client username for pass authentication') + .option('--nats-auth-pass ', 'Nats client password for pass authentication') + .option('--nats-auth-token ', 'Nats client authentication token for token authentication') + .option('--nats-auth-nkey ', 'Nats client secret nkey for nkey/jwt authentication') + .option('--nats-auth-jwt ', 'Nats client user jwt token for jwt authentication') + .option('--nats-consumer-group ', 'Nats consumer group name for durable consumer'); +} + +async function initialize(context, options) { + const store = new NatsEventStore(context); + await store.init(options); + return store; +} + +module.exports = { + code: 'nats', + initOptions, + initialize +} diff --git a/package.json b/package.json index 8357e69..99fb40d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "chat-server", - "version": "2.6.2", + "version": "2.7.0", "description": "A chat server based on the microservice architecture to ensure high availability, high throughput, horizontal scalability using Kafka", "main": "", "repository": "https://github.com/ramank775/chat-server.git", @@ -12,34 +12,35 @@ "prepare": "husky install" }, "engines": { - "node": ">12.0.0" + "node": ">14.0.0" }, "dependencies": { "@hapi/boom": "^10.0.0", "@hapi/hapi": "^20.2.2", "@pm2/io": "^5.0.0", - "aws-sdk": "^2.1176.0", + "aws-sdk": "^2.1185.0", "commander": "^9.4.0", - "firebase-admin": "^11.0.0", - "ioredis": "^5.2.0", + "firebase-admin": "^11.0.1", + "ioredis": "^5.2.2", "joi": "^17.6.0", "kafkajs": "2.1.0", "moment": "^2.29.4", - "mongodb": "^4.8.0", + "mongodb": "^4.8.1", + "nats": "^2.7.1", "node-fetch": "^2.6.6", "short-uuid": "^4.2.0", "winston": "^3.8.1", "ws": "^8.8.1" }, "devDependencies": { - "@babel/core": "^7.18.6", - "@babel/eslint-parser": "^7.18.2", - "@babel/preset-env": "7.18.6", - "eslint": "^8.19.0", + "@babel/core": "^7.18.9", + "@babel/eslint-parser": "^7.18.9", + "@babel/preset-env": "7.18.9", + "eslint": "^8.20.0", "eslint-config-airbnb": "^19.0.4", "eslint-config-prettier": "^8.5.0", "eslint-plugin-import": "^2.26.0", - "eslint-plugin-jsx-a11y": "^6.6.0", + "eslint-plugin-jsx-a11y": "^6.6.1", "eslint-plugin-node": "^11.1.0", "eslint-plugin-prettier": "^4.2.1", "eslint-plugin-react": "^7.30.1", diff --git a/services/connection-gateway/gateway.js b/services/connection-gateway/gateway.js index d876769..8b452f2 100644 --- a/services/connection-gateway/gateway.js +++ b/services/connection-gateway/gateway.js @@ -132,10 +132,15 @@ class Gateway extends HttpServiceBase { userSocketMapping[user] = ws; ws.user = user; userEvents.onConnect(user); - ws.on('message', function onMessage(msg) { + ws.on('message', function onMessage(rawmsg) { + const msg = rawmsg.toString(); + if (msg === "ping") { + ws.send("pong"); + return + } const trackId = shortuuid(); asyncStorage.run(trackId, () => { - messageEvents.onNewMessage(msg.toString(), this.user); + messageEvents.onNewMessage(msg, this.user); }); }); ws.on('close', function onClose(_code, _reason) { @@ -192,7 +197,7 @@ class Gateway extends HttpServiceBase { 'post', this.newMessage.bind(this), { - validate:{ + validate: { headers: schemas.authHeaders, payload: Joi.array().items(Joi.string()).min(1).required() } diff --git a/www/main.js b/www/main.js index 40f232b..3a5278d 100644 --- a/www/main.js +++ b/www/main.js @@ -228,9 +228,11 @@ function connectSocket() { if (window.WebSocket) { console.log('WebSocket object is supported in your browser'); - const host = window.location.hostname; + const { protocol, hostname } = window.location; + console.log('protocol', protocol) + const socketUrl = `${protocol === 'https:' ? 'wss' : 'ws'}://${hostname}/v1.0/wss/` const startTime = Date.now(); - ws = new WebSocket(`wss://${host}/v1.0/wss/`); + ws = new WebSocket(socketUrl); ws.onopen = function onopen() { console.log('connection time', Date.now() - startTime); @@ -238,6 +240,7 @@ function connectSocket() { console.log('onopen'); }; ws.onmessage = function onmessage(e) { + if (e.data === "pong") return; const msgSpace = document.getElementById('message'); const newMsgItem = document.createElement('li'); @@ -261,7 +264,7 @@ function connectSocket() { timer = null; }; timer = setInterval(() => { - ws.send(0x9); + ws.send("ping"); }, 30000) } else { console.log('WebSocket object is not supported in your browser'); diff --git a/yarn.lock b/yarn.lock index c49861f..c892893 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3468,11 +3468,25 @@ ms@^2.1.1: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== +nats@^2.7.1: + version "2.7.1" + resolved "https://registry.yarnpkg.com/nats/-/nats-2.7.1.tgz#d390a48df1a348a335143abc09afad03b2eb9687" + integrity sha512-aH0OXxasfLCTG+LQCFRaWoL1kqejCQg7B+t4z++JgLPgfdpQMET1Rqo95I06DEQyIJGTTgYpxkI/zC0ul8V3pw== + dependencies: + nkeys.js "^1.0.0-9" + natural-compare@^1.4.0: version "1.4.0" resolved "https://registry.yarnpkg.com/natural-compare/-/natural-compare-1.4.0.tgz#4abebfeed7541f2c27acfb29bdbbd15c8d5ba4f7" integrity sha512-OWND8ei3VtNC9h7V60qff3SVobHr996CTwgxubgyQYEpg290h9J0buyECNNJexkFm5sOajh5G116RYA1c8ZMSw== +nkeys.js@^1.0.0-9: + version "1.0.3" + resolved "https://registry.yarnpkg.com/nkeys.js/-/nkeys.js-1.0.3.tgz#487688a6f4f36f4a2796eee000cc6e54e122cb08" + integrity sha512-p5Bpb/acPaQmCrbe4gNmMBY/naZJV8Q7m2B9UkXT8BQRC6wjX8zqD2ya8eZu9mpSXQffodV46HCP9OckmxcwYA== + dependencies: + tweetnacl "1.0.3" + node-fetch@^2.6.1, node-fetch@^2.6.6, node-fetch@^2.6.7: version "2.6.7" resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.7.tgz#24de9fba827e3b4ae44dc8b20256a379160052ad" @@ -4173,6 +4187,11 @@ tslib@^2.1.0: resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.4.0.tgz#7cecaa7f073ce680a05847aa77be941098f36dc3" integrity sha512-d6xOpEDfsi2CZVlPQzGeux8XMwLT9hssAsaPYExaQMuYskwb+x1x7J371tWlbBdWHroy99KnVB6qIkUbs5X3UQ== +tweetnacl@1.0.3: + version "1.0.3" + resolved "https://registry.yarnpkg.com/tweetnacl/-/tweetnacl-1.0.3.tgz#ac0af71680458d8a6378d0d0d050ab1407d35596" + integrity sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw== + type-check@^0.4.0, type-check@~0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/type-check/-/type-check-0.4.0.tgz#07b8203bfa7056c0657050e3ccd2c37730bab8f1"