diff --git a/.github/workflows/php-package.yml b/.github/workflows/php-package.yml index 148c234..f1a454a 100644 --- a/.github/workflows/php-package.yml +++ b/.github/workflows/php-package.yml @@ -1,4 +1,7 @@ name: PHP Package +env: + PHAR_TOOL_VERSION: 1.4.0 + PHAR_TOOL_REPOSITORY: clue/phar-composer on: push: pull_request: @@ -26,19 +29,24 @@ jobs: php-version: - 8.2 - 8.3 - io-driver: - - eio - - uv + - 8.4 steps: - uses: actions/checkout@v4 - name: Setup PHP uses: shivammathur/setup-php@v2 with: php-version: ${{ matrix.php-version }} - extensions: ${{ matrix.io-driver }} tools: composer:v2 coverage: xdebug3 - run: composer install shell: bash - run: composer test shell: bash + - name: Download build package + run: gh release download v${{ env.PHAR_TOOL_VERSION }} -R=${{ env.PHAR_TOOL_REPOSITORY }} + env: + GH_TOKEN: ${{ github.token }} + - name: Build package + run: | + chmod +x ./phar-composer-${{ env.PHAR_TOOL_VERSION }}.phar + ./phar-composer-${{ env.PHAR_TOOL_VERSION }}.phar build ./ mysql2json diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml index 7f44357..63b0a5f 100644 --- a/.github/workflows/release-please.yml +++ b/.github/workflows/release-please.yml @@ -2,6 +2,9 @@ on: push: branches: - main +env: + PHAR_TOOL_VERSION: 1.4.0 + PHAR_TOOL_REPOSITORY: clue/phar-composer permissions: contents: write @@ -15,7 +18,36 @@ jobs: release-please: needs: verify-release runs-on: ubuntu-24.04 + outputs: + releases_created: ${{ steps.release.outputs.release_created }} + tag: ${{ steps.release.outputs.tag_name }} steps: - uses: googleapis/release-please-action@v4 + id: release with: - release-type: php \ No newline at end of file + release-type: php + upload_phar: + needs: release-please + runs-on: ubuntu-24.04 + if: ${{ needs.release-please.outputs.releases_created == true }} + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ needs.release-please.outputs.tag }} + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: 8.2 + tools: composer:v2 + - name: Download build package + run: gh release download v${{ env.PHAR_TOOL_VERSION }} -R=${{ env.PHAR_TOOL_REPOSITORY }} + - name: Build package + run: | + chmod +x ./phar-composer-${{ env.PHAR_TOOL_VERSION }}.phar + ./phar-composer-${{ env.PHAR_TOOL_VERSION }}.phar build ./ mysql2json + env: + GH_TOKEN: ${{ github.token }} + - name: Upload package + run: gh release upload ${{ needs.release-please.outputs.tag }} mysql2json + env: + GH_TOKEN: ${{ github.token }} diff --git a/bin/mysql2jsonl b/bin/mysql2jsonl old mode 100644 new mode 100755 index 1101123..d729068 --- a/bin/mysql2jsonl +++ b/bin/mysql2jsonl @@ -1,4 +1,20 @@ -#!env php +#!/usr/bin/env php addCommands([ + new ExportCommand(), + new ImportCommand(), +]); + +$application->run(); diff --git a/composer.json b/composer.json index 275cb54..4dd4ace 100644 --- a/composer.json +++ b/composer.json @@ -4,17 +4,18 @@ "type": "library", "require": { "php": "~8.2", - "amphp/file": "~3.2", - "amphp/mysql": "~3.0", "amphp/amp": "~3.0", + "amphp/parallel": "~v2.3.1", "revolt/event-loop": "~1.0", - "symfony/console": "~7.2" + "symfony/console": "~7.2", + "justinrainbow/json-schema": "^6.0", + "ext-pdo": "*" }, "require-dev": { "squizlabs/php_codesniffer": "^3.0", "phpunit/phpunit": "^11.5", "brianium/paratest": "^7.7", - "ecomdev/testcontainers-magento-data":"~1.1" + "ecomdev/testcontainers-magento-data":"~1.2" }, "license": [ "MIT" @@ -24,16 +25,20 @@ ], "autoload": { "psr-4": { - "EcomDev\\MySQL2JSONL\\": "src" + "EcomDev\\MySQL2JSONL\\": "src/" } }, "autoload-dev": { + "files": [ + "tests/fixtures.php" + ], "psr-4": { - "EcomDev\\MySQL2JSONL\\": "tests" + "EcomDev\\MySQL2JSONL\\": "tests/" } }, "scripts": { "test": "XDEBUG_MODE=coverage paratest --coverage-text", + "test:single-threaded": "XDEBUG_MODE=coverage phpunit --coverage-text", "format:check": "phpcs", "format:write": "phpcbf" }, diff --git a/phpunit.xml b/phpunit.xml index 358ee77..fc66dfe 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -1,5 +1,12 @@ - + tests diff --git a/schema.json b/schema.json new file mode 100644 index 0000000..da6633c --- /dev/null +++ b/schema.json @@ -0,0 +1,186 @@ +{ + "$schema": "https://json-schema.org/draft-04/schema#", + "title": "MySQL2JSONL configuration file", + "type": "object", + "properties": { + "config": { + "$ref": "#/definitions/configuration", + "description": "Configuration settings" + } + }, + "required": ["config"], + "definitions": { + "configuration": { + "properties": { + "connection": { + "$ref": "#/definitions/connection", + "description": "MySQL connections settings" + }, + "concurrency": { + "type": "integer", + "description": "Maximum concurrency used by tool", + "default": 4 + }, + "batchSize": { + "type": "integer", + "description": "Batch size for importing data into database", + "default": 1000 + }, + "includeTables": { + "$ref": "#/definitions/tableConditions", + "description": "List of tables to include from the database dump" + }, + "excludeTables": { + "$ref": "#/definitions/tableConditions", + "description": "List of tables to exclude from the database dump" + }, + "importMode": { + "$ref": "#/definitions/importMode" + } + }, + "required": ["connection"], + "additionalProperties": false + }, + "importMode": { + "enum": ["truncate", "update"], + "default": "truncate" + }, + "matchExpression": { + "type": "object", + "description": "Match condition for table name", + "properties": { + "regexp": { + "type": "string", + "description": "Valid regular expression for a table name, # sign is prohibited as used as delimiter", + "pattern": "^[^#]+$" + }, + "startsWith": { + "type": "string", + "description": "Table name starts with" + }, + "endsWith": { + "type": "string", + "description": "Table name ends with" + }, + "contains": { + "type": "string", + "description": "Table name contains" + } + }, + "minProperties": 1, + "maxProperties": 1 + }, + "rowCount": { + "type": "object", + "properties": { + "min": { + "type": "integer", + "description": "Minimum number of rows for table to be exported" + }, + "max": { + "type": "integer", + "description": "Maximum number of rows for table to be exported" + } + }, + "minProperties": 1, + "maxProperties": 1 + } , + "tableName": { + "type": "string", + "pattern": "^[0-9a-zA-Z$_\u0080-\uFFFF]+$" + }, + "andCondition": { + "type": "array", + "description": "Table matching conditions", + "items": { + "oneOf": [ + { "$ref": "#/definitions/matchExpression" }, + { "$ref": "#/definitions/rowCount" }, + { "$ref": "#/definitions/tableName" } + ] + } + }, + "tableConditions": { + "type": "array", + "description": "Table matching conditions", + "default": [], + "items": { + "anyOf": [ + { "$ref": "#/definitions/matchExpression" }, + { "$ref": "#/definitions/rowCount" }, + { "$ref": "#/definitions/tableName" }, + { + "type": "object", + "properties": { + "and": {"$ref": "#/definitions/andCondition"} + } + } + ] + } + }, + "connection": { + "type": "object", + "properties": { + "host": { + "type": "string", + "description": "Host for MySQL host" + }, + "unixSocket": { + "type": "string", + "description": "Unix socket path for MySQL connection" + }, + "port": { + "type": "integer", + "description": "Port for MySQL connection", + "default": 3306 + }, + "user": { + "type": "string", + "description": "Username for MySQL connection", + "default": "root" + }, + "password": { + "type": "string", + "description": "Password for MySQL connection", + "default": "" + }, + "database": { + "type": "string", + "description": "Database for MySQL connection" + }, + "charset": { + "type": "string", + "description": "Charset for MySQL connection", + "default": "utf8mb4" + }, + "key": { + "type": "string", + "description": "Path to public key to use for sha256_password auth method in MySQL connection" + } + }, + "allOf": [ + { + "required": [ + "database" + ] + }, + { + "oneOf": [ + { + "required": [ + "host" + ], + "errorMessage": "For connection host or unixSocket must be specified" + }, + { + "required": [ + "unixSocket" + ], + "errorMessage": "For connection host or unixSocket must be specified" + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/src/Command/ExportCommand.php b/src/Command/ExportCommand.php new file mode 100644 index 0000000..339f299 --- /dev/null +++ b/src/Command/ExportCommand.php @@ -0,0 +1,95 @@ +addOption( + 'config', + 'c', + InputOption::VALUE_REQUIRED, + 'Configuration file', + 'config.json' + ); + + $this->addOption( + 'concurrency', + 'p', + InputOption::VALUE_REQUIRED, + 'Number of concurrent workers, by default taken from config file', + ); + + $this->addArgument( + 'directory', + InputArgument::OPTIONAL, + 'Directory to export data to', + './data-dump' + ); + } + + public function execute(InputInterface $input, OutputInterface $output): int + { + $file = $input->getOption('config'); + if (!file_exists($file)) { + /* @var FormatterHelper $formatter*/ + $formatter = $this->getHelper('formatter'); + $output->write($formatter->formatSection('Error', sprintf( + 'Configuration file %s does not exist', + $file + ), 'error')); + return 1; + } + try { + $config = Configuration::fromJSON(file_get_contents($file)); + if ($input->getOption('concurrency')) { + $config = $config->withConcurrency((int)$input->getOption('concurrency')); + } + } catch (ConfigurationException $error) { + $error->output($output); + return 1; + } + + $start = microtime(true); + $progressNotifier = new ConsoleProgressNotifier($output, 'Exporting', 'Exported'); + $exporter = WorkerExportServiceFactory::fromConfiguration( + $config, + $input->getArgument('directory') + )->create(); + $tables = (new TableListService($config))->tablesToExport(); + + foreach ($tables as $table) { + $exporter->exportTable($table, $progressNotifier); + } + + $exporter->await(); + $timeTaken = Helper::formatTime(microtime(true) - $start, 3); + $output->writeln( + "Export finished in {$timeTaken}" + ); + return 0; + } +} diff --git a/src/Command/ImportCommand.php b/src/Command/ImportCommand.php new file mode 100644 index 0000000..457d22a --- /dev/null +++ b/src/Command/ImportCommand.php @@ -0,0 +1,114 @@ +addOption( + 'config', + 'c', + InputOption::VALUE_REQUIRED, + 'Configuration file', + 'config.json' + ); + + $this->addOption( + 'concurrency', + 'p', + InputOption::VALUE_REQUIRED, + 'Number of concurrent workers, by default taken from config file', + ); + + $this->addOption( + 'batch-size', + 'b', + InputOption::VALUE_REQUIRED, + 'Number of rows to process per batch, by default taken from config file ', + ); + + $this->addArgument( + 'directory', + InputArgument::OPTIONAL, + 'Directory to import data from', + './data-dump' + ); + } + + public function execute(InputInterface $input, OutputInterface $output): int + { + $file = $input->getOption('config'); + if (!file_exists($file)) { + /* @var FormatterHelper $formatter*/ + $formatter = $this->getHelper('formatter'); + $output->write($formatter->formatSection('Error', sprintf( + 'Configuration file %s does not exist', + $file + ), 'error')); + return 1; + } + + try { + $config = Configuration::fromJSON(file_get_contents($file)); + if ($input->getOption('concurrency')) { + $config = $config->withConcurrency((int)$input->getOption('concurrency')); + } + if ($input->getOption('batch-size')) { + $config = $config->withBatchSize((int)$input->getOption('batch-size')); + } + } catch (ConfigurationException $error) { + $error->output($output); + return 1; + } + + $start = microtime(true); + + $progressNotifier = new ConsoleProgressNotifier($output, 'Importing', 'Imported'); + $importer = WorkerImportServiceFactory::fromConfiguration( + $config, + $input->getArgument('directory') + )->create(); + + $inputSource = new JsonImportSourceFactory($input->getArgument('directory'), $config); + + foreach ($inputSource->listTables() as $table) { + $importer->importTable($table, $progressNotifier); + } + + $importer->await(); + + $timeTaken = Helper::formatTime(microtime(true) - $start, 3); + $output->writeln( + "Import finished in {$timeTaken}" + ); + return 0; + } +} diff --git a/src/Condition/AndTableCondition.php b/src/Condition/AndTableCondition.php new file mode 100644 index 0000000..0e863bb --- /dev/null +++ b/src/Condition/AndTableCondition.php @@ -0,0 +1,41 @@ +conditions, + fn ($carry, TableCondition $condition) => $carry && $condition->isSatisfiedBy($table), + true + ); + } + + public function withCondition(TableCondition $condition): TableCondition + { + $conditions = $this->conditions; + $conditions[] = $condition; + return new self($conditions); + } + + public static function from(TableCondition...$conditions): TableCondition + { + return new self($conditions); + } +} diff --git a/src/Condition/AnyTableCondition.php b/src/Condition/AnyTableCondition.php new file mode 100644 index 0000000..9c4e2da --- /dev/null +++ b/src/Condition/AnyTableCondition.php @@ -0,0 +1,36 @@ +conditions, + fn ($carry, TableCondition $condition) => $carry || $condition->isSatisfiedBy($table), + false + ); + } + + public function withCondition(TableCondition $condition): TableCondition + { + $conditions = $this->conditions; + $conditions[] = $condition; + return new self($conditions); + } +} diff --git a/src/Condition/ComposeAndCondition.php b/src/Condition/ComposeAndCondition.php new file mode 100644 index 0000000..e5c9e4d --- /dev/null +++ b/src/Condition/ComposeAndCondition.php @@ -0,0 +1,20 @@ + new AnyTableCondition([$condition]), + default => new AnyTableCondition([$this, $condition]), + }; + } +} diff --git a/src/Condition/InitialTableCondition.php b/src/Condition/InitialTableCondition.php new file mode 100644 index 0000000..7905384 --- /dev/null +++ b/src/Condition/InitialTableCondition.php @@ -0,0 +1,38 @@ +result; + } +} diff --git a/src/Condition/TableNameCondition.php b/src/Condition/TableNameCondition.php new file mode 100644 index 0000000..2fa2863 --- /dev/null +++ b/src/Condition/TableNameCondition.php @@ -0,0 +1,66 @@ +matchType) { + self::EXACT_MATCH => $table->name === $this->tableName, + self::STARTS_WITH => str_starts_with($table->name, $this->tableName), + self::ENDS_WITH => str_ends_with($table->name, $this->tableName), + self::CONTAINS => str_contains($table->name, $this->tableName), + self::REGEXP => !!preg_match($this->tableName, $table->name), + default => false + }; + } +} diff --git a/src/Condition/TableRowsCondition.php b/src/Condition/TableRowsCondition.php new file mode 100644 index 0000000..92fd0c1 --- /dev/null +++ b/src/Condition/TableRowsCondition.php @@ -0,0 +1,46 @@ +type) { + self::MIN_ROWS => $table->rowCount >= $this->rowsCount, + self::MAX_ROWS => $table->rowCount <= $this->rowsCount, + default => false, + }; + } +} diff --git a/src/Configuration.php b/src/Configuration.php new file mode 100644 index 0000000..24f52f1 --- /dev/null +++ b/src/Configuration.php @@ -0,0 +1,210 @@ +validate($data, $schema, Constraint::CHECK_MODE_APPLY_DEFAULTS); + + if (!$validator->isValid()) { + $errors = array_reduce( + $validator->getErrors(), + function ($errors, $error) { + $errors[$error['property']] = $error['message']; + return $errors; + } + ); + + throw ConfigurationException::fromErrors($errors); + } + + $data = $data->config; + + $mysqlConfig = !empty($data->connection->unixSocket) ? + MySQLConfiguration::fromUnixSocket( + $data->connection->unixSocket, + $data->connection->database + ) : + MySQLConfiguration::fromHost( + $data->connection->host, + $data->connection->database + ); + + $mysqlConfig = $mysqlConfig + ->withPort($data->connection->port) + ->withCharset($data->connection->charset) + ->withCredentials($data->connection->user, $data->connection->password); + + if (isset($data->connection->key)) { + $mysqlConfig = $mysqlConfig->withServerKey($data->connection->key); + } + + $includeCondition = InitialTableCondition::alwaysTrue(); + $excludeCondition = InitialTableCondition::alwaysFalse(); + + foreach ($data->includeTables as $include) { + $includeCondition = $includeCondition->withCondition(self::buildCondition($include)); + } + + foreach ($data->excludeTables as $include) { + $excludeCondition = $excludeCondition->withCondition(self::buildCondition($include)); + } + + return new self( + $mysqlConfig, + $includeCondition, + $excludeCondition, + $data->importMode === 'update' ? ImportMode::Update : ImportMode::Truncate, + $data->concurrency, + $data->batchSize + ); + } + public static function fromMySQLConfiguration(MySQLConfiguration $mysqlConfiguration, int $concurrency): self + { + return new self( + $mysqlConfiguration, + InitialTableCondition::alwaysTrue(), + InitialTableCondition::alwaysFalse(), + ImportMode::Truncate, + $concurrency, + 1000 + ); + } + + public function withIncludeCondition(TableCondition $condition): self + { + return new self( + $this->connection, + $this->includeCondition->withCondition($condition), + $this->excludeCondition, + $this->importMode, + $this->concurrency, + $this->batchSize + ); + } + + public function withExcludeCondition(TableCondition $condition): self + { + return new self( + $this->connection, + $this->includeCondition, + $this->excludeCondition->withCondition($condition), + $this->importMode, + $this->concurrency, + $this->batchSize + ); + } + + private static function buildCondition(string|object $condition): TableCondition + { + if (is_string($condition)) { + return TableNameCondition::exactMatch($condition); + } + + foreach ($condition as $key => $value) { + return match ($key) { + 'startsWith' => TableNameCondition::startsWith($value), + 'endsWith' => TableNameCondition::endsWith($value), + 'contains' => TableNameCondition::contains($value), + 'regexp' => TableNameCondition::regexp($value), + 'minRows' => TableRowsCondition::minRows($value), + 'maxRows' => TableRowsCondition::maxRows($value), + 'and' => AndTableCondition::from( + ...array_map(fn ($condition) => self::buildCondition($condition), $value) + ) + }; + } + } + + public function createPDOConnection(): PDO + { + return new PDO( + $this->connection->generateDSN(), + $this->connection->user, + $this->connection->password, + $this->connection->generateDriverOptions() + ); + } + + public function withImportMode(Import\ImportMode $mode): self + { + return new self( + $this->connection, + $this->includeCondition, + $this->excludeCondition, + $mode, + $this->concurrency, + $this->batchSize + ); + } + + public function withBatchSize(int $batchSize): self + { + return new self( + $this->connection, + $this->includeCondition, + $this->excludeCondition, + $this->importMode, + $this->concurrency, + $batchSize + ); + } + + public function withConcurrency(int $concurrency): self + { + return new self( + $this->connection, + $this->includeCondition, + $this->excludeCondition, + $this->importMode, + $concurrency, + $this->batchSize + ); + } +} diff --git a/src/ConfigurationException.php b/src/ConfigurationException.php new file mode 100644 index 0000000..9a8ac89 --- /dev/null +++ b/src/ConfigurationException.php @@ -0,0 +1,50 @@ +errors as $path => $error) { + $messages[] = sprintf('%s: %s', $path, $error); + } + + $output->getErrorOutput()->writeln( + $helper->formatSection($this->getMessage(), PHP_EOL . implode(PHP_EOL, $messages), 'error') + ); + } +} diff --git a/src/Export/BlockingExportTable.php b/src/Export/BlockingExportTable.php new file mode 100644 index 0000000..9ed9b02 --- /dev/null +++ b/src/Export/BlockingExportTable.php @@ -0,0 +1,49 @@ +outputFactory->create($table->name); + $result = $this->connection->prepare(sprintf('SELECT SQL_NO_CACHE * FROM `%s`', $table->name)); + $result->execute(); + + $progressNotifier->start($table->name, $table->rowCount); + + $header = []; + for ($i = 0; $i < $result->columnCount(); $i++) { + $header[] = $result->getColumnMeta($i)['name']; + } + $output->open($header); + + $index = 0; + foreach ($result as $row) { + $output->write($row); + $progressNotifier->update($table->name, ++$index); + } + + $output->close(); + $progressNotifier->finish($table->name); + } +} diff --git a/src/Export/BlockingExportTableFactory.php b/src/Export/BlockingExportTableFactory.php new file mode 100644 index 0000000..f6bf1aa --- /dev/null +++ b/src/Export/BlockingExportTableFactory.php @@ -0,0 +1,29 @@ +configuration->createPDOConnection(), $this->outputFactory); + } +} diff --git a/src/Export/ExportOutput.php b/src/Export/ExportOutput.php new file mode 100644 index 0000000..2950333 --- /dev/null +++ b/src/Export/ExportOutput.php @@ -0,0 +1,19 @@ +header = $header; + + $this->file->fwrite(json_encode($header) . PHP_EOL); + } + + public function write(array $row): void + { + if ($this->header === null) { + throw new ExportOutputWrittenBeforeOpenException(); + } + + if (count($this->header) !== count($row)) { + throw new ExportOutputColumCountDoesNotMatchException(); + } + + $this->file->fwrite(json_encode($row, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . PHP_EOL); + } + + public function close(): void + { + unset($this->file); + } +} diff --git a/src/Export/JsonExportOutputFactory.php b/src/Export/JsonExportOutputFactory.php new file mode 100644 index 0000000..c9f158a --- /dev/null +++ b/src/Export/JsonExportOutputFactory.php @@ -0,0 +1,33 @@ +path)) { + mkdir($this->path, 0777, true); + } + + return new JsonExportOutput( + new SplFileObject( + $this->path . DIRECTORY_SEPARATOR . $tableName . '.jsonl', + 'w' + ) + ); + } +} diff --git a/src/Export/NullExportOutput.php b/src/Export/NullExportOutput.php new file mode 100644 index 0000000..b15a553 --- /dev/null +++ b/src/Export/NullExportOutput.php @@ -0,0 +1,28 @@ +configuration->createPDOConnection(); + $connection->prepare('SET information_schema_stats_expiry=0')->execute(); + + $result = $connection->prepare( + 'SELECT TABLE_NAME, TABLE_ROWS FROM information_schema.tables' + . ' WHERE TABLE_SCHEMA = SCHEMA() AND TABLE_TYPE = ?' + ); + $result->execute(['BASE TABLE']); + + $tables = []; + foreach ($result as [$tableName, $rowCount]) { + $tableMetadata = new TableEntry($tableName, $rowCount); + $isSkipped = !$this->configuration->includeCondition->isSatisfiedBy($tableMetadata) + || $this->configuration->excludeCondition->isSatisfiedBy($tableMetadata); + if ($isSkipped) { + continue; + } + $tables[] = $tableMetadata; + } + return $tables; + } +} diff --git a/src/Export/WorkerExportService.php b/src/Export/WorkerExportService.php new file mode 100644 index 0000000..9a66add --- /dev/null +++ b/src/Export/WorkerExportService.php @@ -0,0 +1,54 @@ +pendingExecution->waitIfLimited($this->configuration->concurrency); + $this->pendingExecution->pushFuture( + async(fn() => $this->processTask($table, $progressNotifier)) + ); + } + + private function processTask(TableEntry $table, ProgressNotifier $progressNotifier): void + { + $executor = $this->workerPool->submit( + WorkerTask::taskExportTable($table, $this->configuration, $this->outputDirectory) + ); + $notifier = new WorkerProgressConsumer($progressNotifier); + $notifier->process($executor->getChannel()); + $this->pendingExecution->pushExecution($executor); + } + + public function await(): void + { + $this->pendingExecution->await(); + $this->workerPool->shutdown(); + } +} diff --git a/src/Export/WorkerExportServiceFactory.php b/src/Export/WorkerExportServiceFactory.php new file mode 100644 index 0000000..f0c6b13 --- /dev/null +++ b/src/Export/WorkerExportServiceFactory.php @@ -0,0 +1,39 @@ +concurrency), + $outputDirectory, + ); + } + + public function create(): WorkerExportService + { + return new WorkerExportService($this->configuration, $this->workerPool, $this->outputDirectory, new PendingExecution()); + } +} diff --git a/src/Export/WorkerTask.php b/src/Export/WorkerTask.php new file mode 100644 index 0000000..5e21c33 --- /dev/null +++ b/src/Export/WorkerTask.php @@ -0,0 +1,57 @@ +exportTable($channel); + return true; + } + + private function initWorker(Configuration $configuration, string $outputPath): void + { + self::$exporter = (new BlockingExportTableFactory( + $configuration, + new JsonExportOutputFactory($outputPath) + ))->create(); + } + + private function exportTable(Channel $channel): void + { + if (!self::$exporter) { + $this->initWorker($this->configuration, $this->outputPath); + } + + self::$exporter->exportTable($this->table, new WorkerProgressNotifier($channel)); + } +} diff --git a/src/ExportTableFactory.php b/src/ExportTableFactory.php deleted file mode 100644 index 1a9915d..0000000 --- a/src/ExportTableFactory.php +++ /dev/null @@ -1,7 +0,0 @@ -sourceFactory->create($table); + + $columns = $source->header(); + + if ($this->mode->isTruncate()) { + $this->connection->prepare(sprintf('TRUNCATE TABLE `%s`', $table->name))->execute(); + } + + $statementCache = InsertStatementCache::create( + $this->connection, + $this->insertOnDuplicate, + $table->name, + $columns, + ); + + $params = []; + $rows = 0; + $notifier->start($table->name, $table->rowCount); + $this->connection->beginTransaction(); + foreach ($source->rows() as $row) { + $rows++; + array_push($params, ...$row); + + if ($this->batchSize === $rows) { + $statement = $statementCache->prepareStatement($rows); + $statement->execute($params); + $notifier->update($table->name, $rows); + $rows = 0; + $params = []; + } + } + + if ($rows > 0) { + $statement = $statementCache->prepareStatement($rows); + $statement->execute($params); + $notifier->update($table->name, $rows); + } + + $this->connection->commit(); + $notifier->finish($table->name); + } +} diff --git a/src/Import/BlockingImportServiceFactory.php b/src/Import/BlockingImportServiceFactory.php new file mode 100644 index 0000000..fe96be5 --- /dev/null +++ b/src/Import/BlockingImportServiceFactory.php @@ -0,0 +1,32 @@ +configuration->createPDOConnection(), + $this->insertOnDuplicate, + $this->importSourceFactory, + $this->configuration->importMode, + $this->configuration->batchSize + ); + } +} diff --git a/src/Import/ImportMode.php b/src/Import/ImportMode.php new file mode 100644 index 0000000..9dc9850 --- /dev/null +++ b/src/Import/ImportMode.php @@ -0,0 +1,14 @@ +statements[$rows])) { + $this->statements[$rows] = $this->connection->prepare( + $this->insertOnDuplicate->generate($this->tableName, $this->columns, $rows, $this->columns) + ); + } + + return $this->statements[$rows]; + } +} diff --git a/src/Import/JsonImportSource.php b/src/Import/JsonImportSource.php new file mode 100644 index 0000000..ddc6ae0 --- /dev/null +++ b/src/Import/JsonImportSource.php @@ -0,0 +1,59 @@ +header = $this->readJsonLine(); + } + + public function header(): array + { + return $this->header; + } + + public function rows(): iterable + { + $rowNumber = 0; + while ($row = $this->readJsonLine(false)) { + $rowNumber++; + if (count($row) != count($this->header)) { + throw new \LogicException( + sprintf( + 'Column count does not match header on row %s in %s', + $rowNumber, + $this->fileObject->getRealPath() + ) + ); + } + yield $row; + } + } + + private function readJsonLine(bool $strict = true): mixed + { + try { + $line = $this->fileObject->fgets(); + if (!$line && !$strict) { + return false; + } + return json_decode( + $line, + flags: JSON_THROW_ON_ERROR + ); + } catch (\JsonException $e) { + throw new \LogicException( + sprintf('Failed to parse JSON file: %s', $this->fileObject->getRealPath()), + $e->getCode(), + $e + ); + } + } +} diff --git a/src/Import/JsonImportSourceFactory.php b/src/Import/JsonImportSourceFactory.php new file mode 100644 index 0000000..248078c --- /dev/null +++ b/src/Import/JsonImportSourceFactory.php @@ -0,0 +1,41 @@ +inputPath)) { + throw new \InvalidArgumentException("Input source is not valid directory"); + } + } + public function create(TableEntry $table): ImportSource + { + $file = sprintf('%s/%s.jsonl', $this->inputPath, $table->name); + return new JsonImportSource(new SplFileObject($file, 'r')); + } + + public function listTables(): iterable + { + foreach (new DirectoryIterator($this->inputPath) as $file) { + if ($file->isFile() && $file->getExtension() === 'jsonl') { + $file = new SplFileObject($file->getRealPath(), 'r'); + $file->seek(PHP_INT_MAX); + $rowCount = $file->key() - 1; + $table = new TableEntry($file->getBasename('.jsonl'), $rowCount); + $isSkipped = !$this->configuration->includeCondition->isSatisfiedBy($table) + || $this->configuration->excludeCondition->isSatisfiedBy($table); + if ($isSkipped) { + continue; + } + yield $table; + } + } + } +} diff --git a/src/Import/WorkerImportService.php b/src/Import/WorkerImportService.php new file mode 100644 index 0000000..a9447dc --- /dev/null +++ b/src/Import/WorkerImportService.php @@ -0,0 +1,54 @@ +pendingExecution->waitIfLimited($this->configuration->concurrency); + $this->pendingExecution->pushFuture( + async(fn() => $this->processTask($table, $notifier)) + ); + } + + private function processTask(TableEntry $table, ProgressNotifier $progressNotifier): void + { + $executor = $this->workerPool->submit( + WorkerTask::taskImportTable($table, $this->configuration, $this->inputDirectory) + ); + $notifier = new WorkerProgressConsumer($progressNotifier); + $notifier->process($executor->getChannel()); + $this->pendingExecution->pushExecution($executor); + } + + public function await(): void + { + $this->pendingExecution->await(); + $this->workerPool->shutdown(); + } +} diff --git a/src/Import/WorkerImportServiceFactory.php b/src/Import/WorkerImportServiceFactory.php new file mode 100644 index 0000000..3f079fa --- /dev/null +++ b/src/Import/WorkerImportServiceFactory.php @@ -0,0 +1,44 @@ +concurrency), + $outputDirectory, + ); + } + + public function create(): WorkerImportService + { + return new WorkerImportService( + $this->configuration, + $this->workerPool, + $this->inputDirectory, + new PendingExecution() + ); + } +} diff --git a/src/Import/WorkerTask.php b/src/Import/WorkerTask.php new file mode 100644 index 0000000..38daddd --- /dev/null +++ b/src/Import/WorkerTask.php @@ -0,0 +1,60 @@ +importTable($channel); + return true; + } + + private function initWorker(Configuration $configuration, string $inputPath): void + { + self::$importer = BlockingImportServiceFactory::createFromConfiguration( + $configuration, + new JsonImportSourceFactory($inputPath, $configuration) + )->create(); + } + + private function importTable(Channel $channel): void + { + if (!self::$importer) { + $this->initWorker($this->configuration, $this->outputPath); + } + + self::$importer->importTable($this->table, new WorkerProgressNotifier($channel, 1)); + } +} diff --git a/src/MySQLConfiguration.php b/src/MySQLConfiguration.php new file mode 100644 index 0000000..2303f0b --- /dev/null +++ b/src/MySQLConfiguration.php @@ -0,0 +1,119 @@ +unixSocket) { + return sprintf( + 'mysql:unix_socket=%s;dbname=%s;charset=%s', + $this->unixSocket, + $this->database, + $this->charset + ); + } + + return sprintf( + 'mysql:host=%s;port=%d;dbname=%s;charset=%s', + $this->host, + $this->port, + $this->database, + $this->charset + ); + } + + public function withPort(int $port): self + { + return new self( + $this->database, + $this->host, + $this->unixSocket, + $port, + $this->user, + $this->password, + $this->charset, + $this->key + ); + } + + public function withCredentials(string $user, string $password): self + { + return new self( + $this->database, + $this->host, + $this->unixSocket, + $this->port, + $user, + $password, + $this->charset, + $this->key + ); + } + + public function generateDriverOptions() + { + return [ + PDO::MYSQL_ATTR_INIT_COMMAND => + "SET SQL_MODE='NO_AUTO_VALUE_ON_ZERO',FOREIGN_KEY_CHECKS=0,NAMES $this->charset", + PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_NUM, + PDO::ATTR_STRINGIFY_FETCHES => false, + PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => false, + ] + ($this->key ? [PDO::MYSQL_ATTR_SERVER_PUBLIC_KEY => $this->key] : []); + } + + public function withCharset(string $charset): self + { + return new self( + $this->database, + $this->host, + $this->unixSocket, + $this->port, + $this->user, + $this->password, + $charset, + $this->key + ); + } + + public function withServerKey(string $key): self + { + return new self( + $this->database, + $this->host, + $this->unixSocket, + $this->port, + $this->user, + $this->password, + $this->charset, + $key + ); + } +} diff --git a/src/PendingExecution.php b/src/PendingExecution.php new file mode 100644 index 0000000..22f12dc --- /dev/null +++ b/src/PendingExecution.php @@ -0,0 +1,59 @@ +executors[] = $execution; + } + + public function pushFuture(Future $future): void + { + $this->futures[] = $future; + } + + public function await(): void + { + $futures = $this->futures; + $this->futures = []; + Future\awaitAll($futures); + $executors = $this->executors; + foreach ($executors as $executor) { + $executor->await(); + } + } + + public function waitIfLimited(int $maxExecutors): void + { + if ($maxExecutors === count($this->futures)) { + foreach ($this->futures as $index => $future) { + if ($future->isComplete()) { + unset($this->futures[$index]); + return; + } + } + foreach (Future::iterate($this->futures) as $future) { + $future->await(); + break; + }; + } + } +} diff --git a/src/Progress/ConsoleProgressNotifier.php b/src/Progress/ConsoleProgressNotifier.php new file mode 100644 index 0000000..ff02a8a --- /dev/null +++ b/src/Progress/ConsoleProgressNotifier.php @@ -0,0 +1,92 @@ +sectionOutput = $this->output->section(); + } + + public function withRealtime(): self + { + return new self($this->output, $this->inProgressMessage, $this->finishedMessage, 0); + } + + public function start(string $name, int $total): void + { + $section = $this->reusableSections ? array_shift($this->reusableSections) : $this->output->section(); + $progressBar = new ProgressBar($section, $total, $this->redrawFreq); + $progressBar->setProgressCharacter('🚀'); + $progressBar->setFormat('%message% `%table%` [%bar%] %percent%% '); + $progressBar->setMessage($name, 'table'); + $progressBar->setMessage($this->inProgressMessage); + $this->progressBars[$name] = $progressBar; + $this->progressBarSections[$name] = $section; + $progressBar->start(); + } + + public function update(string $name, int $current): void + { + $this->progressBars[$name]->advance($current); + } + + public function finish(string $name): void + { + $total = $this->progressBars[$name]->getMaxSteps(); + $this->progressBars[$name]->finish(); + $this->progressBarSections[$name]->clear(); + $this->reusableSections[] = $this->progressBarSections[$name]; + unset($this->progressBars[$name]); + unset($this->progressBarSections[$name]); + + $this->sectionOutput->writeln( + sprintf('%s %d rows in `%s`', $this->finishedMessage, $total, $name), + ); + } +} diff --git a/src/Progress/WorkerMessage.php b/src/Progress/WorkerMessage.php new file mode 100644 index 0000000..b8d10ac --- /dev/null +++ b/src/Progress/WorkerMessage.php @@ -0,0 +1,26 @@ +type->process($progressNotifier, $this->data); + } +} diff --git a/src/Progress/WorkerMessageType.php b/src/Progress/WorkerMessageType.php new file mode 100644 index 0000000..cd2c135 --- /dev/null +++ b/src/Progress/WorkerMessageType.php @@ -0,0 +1,28 @@ + $notifier->start(...$payload), + self::Update => $notifier->update(...$payload), + self::Finish => $notifier->finish(...$payload), + }; + } +} diff --git a/src/Progress/WorkerProgressConsumer.php b/src/Progress/WorkerProgressConsumer.php new file mode 100644 index 0000000..b261f81 --- /dev/null +++ b/src/Progress/WorkerProgressConsumer.php @@ -0,0 +1,31 @@ +receive()) { + $message->process($this->notifier); + } + } catch (ChannelException $e) { + } + } +} diff --git a/src/Progress/WorkerProgressNotifier.php b/src/Progress/WorkerProgressNotifier.php new file mode 100644 index 0000000..03ed8dc --- /dev/null +++ b/src/Progress/WorkerProgressNotifier.php @@ -0,0 +1,52 @@ +total = $total; + $this->channel->send(new WorkerMessage( + WorkerMessageType::Start, + [$name, $total] + )); + } + + public function update(string $name, int $current): void + { + if ($this->total > 0 && $this->freq < 1 && $current % ceil($this->total * $this->freq) > 0) { + return; + } + + $this->channel->send(new WorkerMessage( + WorkerMessageType::Update, + [$name, $current] + )); + } + + public function finish(string $name): void + { + $this->channel->send(new WorkerMessage( + WorkerMessageType::Finish, + [$name] + )); + } +} diff --git a/src/ProgressNotifier.php b/src/ProgressNotifier.php new file mode 100644 index 0000000..5644edf --- /dev/null +++ b/src/ProgressNotifier.php @@ -0,0 +1,19 @@ + "`$column` = VALUES(`$column`)", $onUpdate) + ) + ); + } + + $rowLine = rtrim(str_repeat('?,', count($columns)), ','); + $rowLines = str_repeat( + "($rowLine),", + $rowCount + ); + + $sql = sprintf( + 'INSERT INTO `%s` (%s) VALUES %s%s', + $tableName, + implode(',', array_map(fn ($column) => "`$column`", $columns)), + rtrim( + $rowLines, + ',' + ), + $sqlOnUpdate + ); + + return $sql; + } +} diff --git a/src/TableCondition.php b/src/TableCondition.php new file mode 100644 index 0000000..3a2f123 --- /dev/null +++ b/src/TableCondition.php @@ -0,0 +1,17 @@ +name, $rows); + } +} diff --git a/tests/Condition/AndTableConditionTest.php b/tests/Condition/AndTableConditionTest.php new file mode 100644 index 0000000..bb0f489 --- /dev/null +++ b/tests/Condition/AndTableConditionTest.php @@ -0,0 +1,37 @@ +assertTrue(InitialTableCondition::alwaysFalse() + ->withCondition(InitialTableCondition::alwaysFalse()) + ->withCondition(InitialTableCondition::alwaysTrue()) + ->withCondition(InitialTableCondition::alwaysFalse()) + ->isSatisfiedBy(TableEntry::fromName('some_table'))); + } + + #[Test] + public function returnsTrueWhenNoneReturnedTrue() + { + $this->assertFalse(InitialTableCondition::alwaysFalse() + ->withCondition(InitialTableCondition::alwaysFalse()) + ->withCondition(InitialTableCondition::alwaysFalse()) + ->withCondition(InitialTableCondition::alwaysFalse()) + ->isSatisfiedBy(TableEntry::fromName('some_table'))); + } +} diff --git a/tests/Condition/AnyTableConditionTest.php b/tests/Condition/AnyTableConditionTest.php new file mode 100644 index 0000000..bb0f64c --- /dev/null +++ b/tests/Condition/AnyTableConditionTest.php @@ -0,0 +1,37 @@ +assertTrue(InitialTableCondition::alwaysFalse() + ->withCondition(InitialTableCondition::alwaysFalse()) + ->withCondition(InitialTableCondition::alwaysTrue()) + ->withCondition(InitialTableCondition::alwaysFalse()) + ->isSatisfiedBy(TableEntry::fromName('some_table'))); + } + + #[Test] + public function returnsTrueWhenNoneReturnedTrue() + { + $this->assertFalse(InitialTableCondition::alwaysFalse() + ->withCondition(InitialTableCondition::alwaysFalse()) + ->withCondition(InitialTableCondition::alwaysFalse()) + ->withCondition(InitialTableCondition::alwaysFalse()) + ->isSatisfiedBy(TableEntry::fromName('some_table'))); + } +} diff --git a/tests/Condition/TableNameConditionTest.php b/tests/Condition/TableNameConditionTest.php new file mode 100644 index 0000000..db9b549 --- /dev/null +++ b/tests/Condition/TableNameConditionTest.php @@ -0,0 +1,133 @@ +assertFalse( + TableNameCondition::exactMatch("table_name") + ->isSatisfiedBy(TableEntry::fromName('table_name_!')), + 'Matches prefix' + ); + + $this->assertTrue( + TableNameCondition::exactMatch("table_name") + ->isSatisfiedBy(TableEntry::fromName('table_name')), + 'Does not match exact value' + ); + } + + #[Test] + public function startsWithType(): void + { + $condition = TableNameCondition::startsWith("table_name"); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('table_name_1')), + "Failed to match prefix" + ); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('table_name')), + "Failed to match exact value" + ); + + $this->assertFalse( + $condition->isSatisfiedBy(TableEntry::fromName('table_another')), + "Failed to ignore wrong prefix" + ); + } + + #[Test] + public function endsWithType(): void + { + $condition = TableNameCondition::endsWith("_suffix"); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('table_name_suffix')), + "Failed to match _suffix" + ); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('_suffix')), + "Failed to match exact value" + ); + + $this->assertFalse( + $condition->isSatisfiedBy(TableEntry::fromName('table_suffix_another')), + "Failed to ignore _suffix in the middle" + ); + } + + #[Test] + public function containsType(): void + { + $condition = TableNameCondition::contains("_catalog_"); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('table_catalog_name')), + "Failed to match in the middle" + ); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('_catalog_')), + "Failed to match exact value" + ); + + $this->assertFalse( + $condition->isSatisfiedBy(TableEntry::fromName('table_cat_another')), + "Failed to ignore wrong name" + ); + } + + #[Test] + public function regexpType(): void + { + $condition = TableNameCondition::regexp("_cat.*?_"); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('table_catalog_name')), + "Failed to match in the middle" + ); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('_catalog_')), + "Failed to match exact value" + ); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('table_cat_another')), + "Failed to match valid expression" + ); + + $this->assertFalse( + $condition->isSatisfiedBy(TableEntry::fromName('table_ca_t_another')), + "Failed to ignore wrong name" + ); + } + + #[Test] + public function combineType(): void + { + $condition = TableNameCondition::endsWith("_suffix") + ->withCondition(TableNameCondition::startsWith("catalog_")) + ->withCondition(TableNameCondition::contains("_name")); + + $this->assertTrue($condition->isSatisfiedBy(TableEntry::fromName('catalog_table_name_suffix'))); + $this->assertFalse($condition->isSatisfiedBy(TableEntry::fromName('catalog_table_name'))); + } +} diff --git a/tests/Condition/TableRowConditionTest.php b/tests/Condition/TableRowConditionTest.php new file mode 100644 index 0000000..0cdc94f --- /dev/null +++ b/tests/Condition/TableRowConditionTest.php @@ -0,0 +1,77 @@ +assertFalse( + $condition->isSatisfiedBy(TableEntry::fromName('table_name_')), + 'Matches lower amount' + ); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('table_name_')->withRows(1)), + 'Does not matches exact amount' + ); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('table_name_')->withRows(20)), + 'Does not match larger amount' + ); + } + + #[Test] + public function maxRowCount(): void + { + $condition = TableRowsCondition::maxRows(100); + + $this->assertFalse( + $condition->isSatisfiedBy(TableEntry::fromName('table_name_')->withRows(101)), + 'Matches lower amount' + ); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('table_name_')->withRows(100)), + 'Does not matches exact amount' + ); + + $this->assertTrue( + $condition->isSatisfiedBy(TableEntry::fromName('table_name_')->withRows(10)), + 'Does not match smaller amount' + ); + } + + #[Test] + public function combineType(): void + { + $condition = TableRowsCondition::maxRows(100) + ->withCondition(TableRowsCondition::minRows(10)); + + $this->assertTrue($condition->isSatisfiedBy(TableEntry::fromName('__')->withRows(11))); + ; + $this->assertTrue($condition->isSatisfiedBy(TableEntry::fromName('__')->withRows(99))); + ; + $this->assertTrue($condition->isSatisfiedBy(TableEntry::fromName('__')->withRows(100))); + ; + $this->assertFalse($condition->isSatisfiedBy(TableEntry::fromName('__')->withRows(101))); + ; + $this->assertFalse($condition->isSatisfiedBy(TableEntry::fromName('__')->withRows(9))); + ; + } +} diff --git a/tests/ConfigurationExceptionTest.php b/tests/ConfigurationExceptionTest.php new file mode 100644 index 0000000..e209776 --- /dev/null +++ b/tests/ConfigurationExceptionTest.php @@ -0,0 +1,39 @@ + 'Host is a required field', + 'connection.username' => 'Username is empty', + ], + ); + + $output = TestableConsoleOutput::create(); + + $error->output($output); + $this->assertEquals( + [ + '[Configuration validation failed] ', + 'connection.host: Host is a required field', + 'connection.username: Username is empty' + ], + $output->flushError() + ); + } +} diff --git a/tests/ConfigurationTest.php b/tests/ConfigurationTest.php new file mode 100644 index 0000000..be43f3d --- /dev/null +++ b/tests/ConfigurationTest.php @@ -0,0 +1,323 @@ +assertEquals( + Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost("db", "magento"), + 4 + ), + Configuration::fromJSON( + <<assertEquals( + Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost("db", "magento2") + ->withCredentials("magento", "magento") + ->withPort(3307) + ->withCharset('utf8mb3') + ->withServerKey('path/to/some-secret-key'), + 4 + ), + Configuration::fromJSON( + <<assertEquals( + Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromUnixSocket("/var/run/db.sock", "magento2") + ->withCredentials("magento", "magento") + ->withCharset('utf8mb3') + ->withServerKey('path/to/some-secret-key'), + 4 + ), + Configuration::fromJSON( + <<assertConfigException( + << 'The property host is required', + 'config.connection.unixSocket' => 'The property unixSocket is required', + 'config.connection' => 'Failed to match all schemas', + ]) + ); + } + + private function assertConfigException(string $json, ConfigurationException $error): void + { + $this->expectException(ConfigurationException::class); + + try { + Configuration::fromJSON($json); + } catch (ConfigurationException $e) { + $this->assertEquals($error, $e); + throw $e; + } + } + + #[Test] + public function createsIncludeConditions() + { + $this->assertEquals( + Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost("db", "magento"), + 4 + ) + ->withIncludeCondition(TableNameCondition::exactMatch("table_name")) + ->withIncludeCondition(TableNameCondition::startsWith("catalog_")) + ->withIncludeCondition(TableNameCondition::endsWith("_index")) + ->withIncludeCondition(TableNameCondition::contains("_tmp_")) + ->withIncludeCondition(TableNameCondition::regexp("_.*_")) + ->withIncludeCondition(TableRowsCondition::minRows(10)) + ->withIncludeCondition(TableRowsCondition::maxRows(100)) + ->withIncludeCondition(AndTableCondition::from( + TableNameCondition::regexp("_.*_"), + TableRowsCondition::minRows(10), + TableRowsCondition::maxRows(100) + )), + Configuration::fromJSON( + <<assertEquals( + Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost("db", "magento"), + 4, + ) + ->withExcludeCondition(TableNameCondition::exactMatch("table_name")) + ->withExcludeCondition(TableNameCondition::startsWith("catalog_")) + ->withExcludeCondition(TableNameCondition::endsWith("_index")) + ->withExcludeCondition(TableNameCondition::contains("_tmp_")) + ->withExcludeCondition(TableNameCondition::regexp("_.*_")) + ->withExcludeCondition(TableRowsCondition::minRows(10)) + ->withExcludeCondition(TableRowsCondition::maxRows(100)) + ->withExcludeCondition(AndTableCondition::from( + TableNameCondition::regexp("_.*_"), + TableRowsCondition::minRows(10), + TableRowsCondition::maxRows(100) + )), + Configuration::fromJSON( + <<assertEquals( + Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost("db", "magento"), + 40 + ), + Configuration::fromJSON( + <<assertEquals( + Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost("db", "magento"), + 4 + )->withImportMode(ImportMode::Update), + Configuration::fromJSON( + <<assertEquals( + Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost("db", "magento"), + 4 + )->withBatchSize(400), + Configuration::fromJSON( + <<createPDOConnection(); + + $connection = $configuration->createPDOConnection(); + + $this->assertSame( + 2040, + $connection->query('SELECT COUNT(*) FROM `catalog_product_entity`')->fetch()[0] + ); + } +} diff --git a/tests/Export/ExportTableTest.php b/tests/Export/ExportTableTest.php new file mode 100644 index 0000000..ae053f9 --- /dev/null +++ b/tests/Export/ExportTableTest.php @@ -0,0 +1,103 @@ +export = (new BlockingExportTableFactory(magentoWithSampleData("export"), $this))->create(); + } + + #[Test] + public function exportsTableRowByRow() + { + $this->export->exportTable((TableEntry::fromName('store')->withRows(2)), $this); + $this->assertEquals( + [ + ['open', ['store_id', 'code', 'website_id', 'group_id', 'name', 'sort_order', 'is_active']], + ['row', [0, 'admin', 0, 0, 'Admin', 0, 1]], + ['row', [1, 'default', 1, 1, 'Default Store View', 0, 1]], + ['close'] + ], + $this->output + ); + } + + #[Test] + public function exportsHeadersForEmptyTable() + { + $this->export->exportTable((TableEntry::fromName('customer_visitor')->withRows(0)), $this); + $this->assertEquals( + [ + ['open', [ + 'visitor_id', + 'customer_id', + 'session_id', + 'created_at', + 'last_visit_at', + ]], + ['close'] + ], + $this->output + ); + } + + + #[Test] + public function executesNotifications() + { + $this->export->exportTable((TableEntry::fromName('store')->withRows(2)), $this); + + $this->assertEquals( + [ + ['start', 'store', 2], + ['update', 'store', 1], + ['update', 'store', 2], + ['finish', 'store'], + ], + $this->progress + ); + } + + public function open(array $header): void + { + $this->output[] = ['open', $header]; + } + + public function write(array $row): void + { + $this->output[] = ['row', $row]; + } + + public function close(): void + { + $this->output[] = ['close']; + } + + public function create(string $tableName): ExportOutput + { + return $this; + } +} diff --git a/tests/Export/JsonExportOutputFactoryTest.php b/tests/Export/JsonExportOutputFactoryTest.php new file mode 100644 index 0000000..bd33735 --- /dev/null +++ b/tests/Export/JsonExportOutputFactoryTest.php @@ -0,0 +1,43 @@ +path); + $factory->create('test'); + + $this->assertTrue( + is_dir($this->path) + ); + } + + #[Test] + public function createsWritableFileInDirectoryWithTableName() + { + $factory = new JsonExportOutputFactory($this->path); + $factory->create('test'); + $this->assertTrue( + is_file($this->path . DIRECTORY_SEPARATOR . 'test.jsonl') + && is_writable($this->path . DIRECTORY_SEPARATOR . 'test.jsonl') + ); + } +} diff --git a/tests/Export/JsonExportOutputTest.php b/tests/Export/JsonExportOutputTest.php new file mode 100644 index 0000000..cfc3c5c --- /dev/null +++ b/tests/Export/JsonExportOutputTest.php @@ -0,0 +1,99 @@ +file()); + $output->open(['one', 'two', 'three']); + + $this->assertEquals( + '["one","two","three"]' . PHP_EOL, + $this->readFile() + ); + } + #[Test] + public function appendsRowsToTheFile() + { + $output = new JsonExportOutput($this->file()); + $output->open(['one', 'two', 'three']); + $output->write([1.1, 2.1, 3.1]); + $output->write([1.2, 2.2, 3.2]); + $output->write([1.3, 2.3, 3.3]); + + $this->assertEquals( + implode( + PHP_EOL, + [ + '["one","two","three"]', + '[1.1,2.1,3.1]', + '[1.2,2.2,3.2]', + '[1.3,2.3,3.3]', + ] + ) . PHP_EOL, + $this->readFile() + ); + } + + #[Test] + public function errorsWhenWrittenBeforeOpen() + { + $output = new JsonExportOutput($this->file()); + $this->expectExceptionObject(new ExportOutputWrittenBeforeOpenException()); + + $output->write([1.1, 2.1, 3.1]); + } + + #[Test] + public function errorsWhenColumnCountDoesNotMatchHeader() + { + $output = new JsonExportOutput($this->file()); + $this->expectExceptionObject(new ExportOutputColumCountDoesNotMatchException()); + $output->open(['one', 'two']); + $output->write([1.1, 2.1, 3.1]); + } + + #[Test] + public function closesFile() + { + $file = $this->file(); + $file->flock(LOCK_EX); + + $output = new JsonExportOutput($file); + unset($file); + + $output->open(['one', 'two', 'three']); + $output->write([1.1, 2.1, 3.1]); + $output->close(); + + $this->assertTrue( + flock(fopen($this->file, 'r'), LOCK_EX | LOCK_NB) + ); + } + + private function readFile(): string + { + return file_get_contents($this->file); + } +} diff --git a/tests/Export/TableListServiceTest.php b/tests/Export/TableListServiceTest.php new file mode 100644 index 0000000..9aaf262 --- /dev/null +++ b/tests/Export/TableListServiceTest.php @@ -0,0 +1,40 @@ +withIncludeCondition(TableNameCondition::startsWith('catalog_product_entity_')) + ->withExcludeCondition(TableNameCondition::endsWith('_varchar')) + ->withExcludeCondition(TableNameCondition::contains('gallery')); + + $this->assertEquals( + [ + TableEntry::fromName('catalog_product_entity_datetime')->withRows(6), + TableEntry::fromName('catalog_product_entity_decimal')->withRows(3903), + TableEntry::fromName('catalog_product_entity_int')->withRows(12480), + TableEntry::fromName('catalog_product_entity_text')->withRows(2652), + TableEntry::fromName('catalog_product_entity_tier_price')->withRows(0) + ], + (new TableListService($config))->tablesToExport() + ); + } +} diff --git a/tests/Export/WorkerExportServiceTest.php b/tests/Export/WorkerExportServiceTest.php new file mode 100644 index 0000000..67af031 --- /dev/null +++ b/tests/Export/WorkerExportServiceTest.php @@ -0,0 +1,64 @@ +withIncludeCondition(TableNameCondition::exactMatch('store')) + ->withIncludeCondition(TableNameCondition::startsWith('catalog_product_entity')); + + $tableService = new TableListService($config); + $exportService = WorkerExportServiceFactory::fromConfiguration($config, $this->path)->create(); + + foreach ($tableService->tablesToExport() as $table) { + $exportService->exportTable( + $table, + $this + ); + } + + $exportService->await(); + + $this->assertEquals( + [ + 'catalog_product_entity.jsonl', + 'catalog_product_entity_datetime.jsonl', + 'catalog_product_entity_decimal.jsonl', + 'catalog_product_entity_gallery.jsonl', + 'catalog_product_entity_int.jsonl', + 'catalog_product_entity_media_gallery.jsonl', + 'catalog_product_entity_media_gallery_value.jsonl', + 'catalog_product_entity_media_gallery_value_to_entity.jsonl', + 'catalog_product_entity_media_gallery_value_video.jsonl', + 'catalog_product_entity_text.jsonl', + 'catalog_product_entity_tier_price.jsonl', + 'catalog_product_entity_varchar.jsonl', + 'store.jsonl' + ], + array_map(fn ($v) => basename($v), glob($this->path . DIRECTORY_SEPARATOR . '*.jsonl')) + ); + } +} diff --git a/tests/ExportTableTest.php b/tests/ExportTableTest.php deleted file mode 100644 index b0a5e6a..0000000 --- a/tests/ExportTableTest.php +++ /dev/null @@ -1,15 +0,0 @@ -assertTrue(true); - } -} diff --git a/tests/Import/FakeImportSource.php b/tests/Import/FakeImportSource.php new file mode 100644 index 0000000..fd6c46e --- /dev/null +++ b/tests/Import/FakeImportSource.php @@ -0,0 +1,66 @@ +table, $header, $this->rows); + } + + public function withRow(array $row): FakeImportSource + { + $rows = $this->rows; + $rows[] = $row; + + return new self($this->table->withRows(count($rows)), $this->header, $rows); + } + + public static function fromTableName(string $tableName): FakeImportSource + { + return new self(TableEntry::fromName($tableName), [], []); + } + + public function header(): array + { + return $this->header; + } + + public function rows(): iterable + { + return $this->rows; + } + + public function create(TableEntry $table): FakeImportSource + { + if ($this->table->name === $table->name) { + return $this; + } + + return new self($table, [], []); + } + + public function listTables(): iterable + { + return [ + $this->table, + ]; + } +} diff --git a/tests/Import/ImportTableTest.php b/tests/Import/ImportTableTest.php new file mode 100644 index 0000000..edd2239 --- /dev/null +++ b/tests/Import/ImportTableTest.php @@ -0,0 +1,156 @@ +containerWithConfig = magentoWithoutData(); + } + + #[Test] + public function importsTableDataFromSourceWithTruncate() + { + $source = FakeImportSource::fromTableName('store') + ->withHeader(['store_id', 'code', 'name', 'website_id']) + ->withRow([2, 'english', 'English Store View', '1']) + ->withRow([3, 'french', 'French Store View', '1']); + + $factory = BlockingImportServiceFactory::createFromConfiguration( + $this->containerWithConfig->configuration, + $source + ); + + $import = $factory->create(); + $import->importTable($source->table, $this); + + $connection = $this->containerWithConfig->configuration->createPDOConnection(); + $result = $connection->prepare('SELECT store_id, code, name, website_id FROM store'); + $result->execute(); + + $this->assertEquals( + [ + [2, 'english', 'English Store View', 1], + [3, 'french', 'French Store View', 1], + ], + $result->fetchAll() + ); + } + + #[Test] + public function appendsExistingStoresWhenUpdateStrategyIsUsed() + { + $source = FakeImportSource::fromTableName('store') + ->withHeader(['store_id', 'code', 'name', 'website_id']) + ->withRow([1, 'custom', 'Custom Store View', '1']) + ->withRow([2, 'english', 'English Store View', '1']) + ->withRow([3, 'french', 'French Store View', '1']); + + $factory = BlockingImportServiceFactory::createFromConfiguration( + $this->containerWithConfig->configuration->withImportMode(ImportMode::Update), + $source + ); + + $import = $factory->create(); + $import->importTable($source->table, $this); + + $connection = $this->containerWithConfig->configuration->createPDOConnection(); + $result = $connection->prepare('SELECT store_id, code, name, website_id FROM store'); + $result->execute(); + + $this->assertEquals( + [ + [0, 'admin', 'Admin', 0], + [1, 'custom', 'Custom Store View', 1], + [2, 'english', 'English Store View', 1], + [3, 'french', 'French Store View', 1], + ], + $result->fetchAll() + ); + } + + #[Test] + public function notifiesOfTheImportPorgress() + { + $source = FakeImportSource::fromTableName('store') + ->withHeader(['store_id', 'code', 'name', 'website_id']) + ->withRow([1, 'custom', 'Custom Store View', '1']) + ->withRow([2, 'english', 'English Store View', '1']) + ->withRow([3, 'french', 'French Store View', '1']); + + $factory = BlockingImportServiceFactory::createFromConfiguration( + $this->containerWithConfig->configuration->withImportMode(ImportMode::Update), + $source + ); + + $import = $factory->create(); + $import->importTable($source->table, $this); + + $this->assertEquals( + [ + ['start', 'store', 3], + ['update', 'store', 3], + ['finish', 'store'] + ], + $this->progress + ); + } + + #[Test] + public function batchesLongInputs() + { + $source = FakeImportSource::fromTableName('store') + ->withHeader(['store_id', 'code', 'name', 'website_id']); + + $expectedResult = []; + for ($i = 2; $i < 10000; $i++) { + $row = [$i, 'custom' . $i, 'Custom Store View', 4]; + $source = $source->withRow($row); + $expectedResult[] = $row; + } + + $factory = BlockingImportServiceFactory::createFromConfiguration( + $this->containerWithConfig->configuration + ->withImportMode(ImportMode::Update) + ->withBatchSize(100), + $source + ); + + $import = $factory->create(); + $import->importTable($source->table, $this); + + $connection = $this->containerWithConfig->configuration->createPDOConnection(); + $result = $connection->prepare('SELECT store_id, code, name, website_id FROM store'); + $result->execute(); + + $this->assertEquals( + [ + [0, 'admin', 'Admin', 0], + [1, 'default', 'Default Store View', 1], + ...$expectedResult, + ], + $result->fetchAll() + ); + } +} diff --git a/tests/Import/JsonImportSourceFactoryTest.php b/tests/Import/JsonImportSourceFactoryTest.php new file mode 100644 index 0000000..3560696 --- /dev/null +++ b/tests/Import/JsonImportSourceFactoryTest.php @@ -0,0 +1,120 @@ +expectException(\InvalidArgumentException::class); + new JsonImportSourceFactory( + $this->path, + Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost('localhost', 'magento'), + 10 + ) + ); + } + + #[Test] + public function returnsAllFilesWithRowCountInDirectoryThatEndInJsonl() + { + $this->writeFiles([ + 'catalog_product_entity_datetime.jsonl' => [[],[],[]], + 'catalog_product_entity_int.txt' => [[],[],[]], + 'catalog_product_entity.jsonl' => [[],[],[],[]], + 'catalog_product_entity_int.jsonl' => [[],[]], + ]); + + $factory = new JsonImportSourceFactory( + $this->path, + Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost('localhost', 'magento'), + 10 + ) + ); + + $this->assertEqualsCanonicalizing( + [ + TableEntry::fromName('catalog_product_entity')->withRows(3), + TableEntry::fromName('catalog_product_entity_int')->withRows(1), + TableEntry::fromName('catalog_product_entity_datetime')->withRows(2) + ], + iterator_to_array($factory->listTables()) + ); + } + + #[Test] + public function filtersOutputFilesBasedOnConfiguration() + { + $this->writeFiles([ + 'catalog_product_entity_int.txt' => [[],[],[]], + 'catalog_product_entity.jsonl' => [[],[],[],[]], + 'catalog_product_entity_int.jsonl' => [[],[]], + ]); + + $factory = new JsonImportSourceFactory( + $this->path, + Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost('localhost', 'magento'), + 10 + ) + ->withIncludeCondition(TableNameCondition::contains('product')) + ->withExcludeCondition(TableNameCondition::endsWith('_int')) + ); + + $this->assertEqualsCanonicalizing( + [ + TableEntry::fromName('catalog_product_entity')->withRows(3), + ], + iterator_to_array($factory->listTables()) + ); + } + + #[Test] + public function createsInputSourceFromExistingFile() + { + $this->writeFiles([ + 'catalog_product_entity.jsonl' => [["entity_id", "sku", "type_id"]], + ]); + + $factory = new JsonImportSourceFactory( + $this->path, + Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost('localhost', 'magento'), + 10 + ) + ); + + $inputSource = $factory->create(TableEntry::fromName('catalog_product_entity')->withRows(0)); + $this->assertEquals( + ['entity_id', 'sku', 'type_id'], + $inputSource->header() + ); + } + + private function writeFiles(array $files): void + { + foreach ($files as $file => $content) { + $this->writeFile($file, $content); + } + } +} diff --git a/tests/Import/JsonImportSourceTest.php b/tests/Import/JsonImportSourceTest.php new file mode 100644 index 0000000..f1c60f3 --- /dev/null +++ b/tests/Import/JsonImportSourceTest.php @@ -0,0 +1,71 @@ +file(); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Failed to parse JSON file: ' . $this->file); + new JsonImportSource($file); + } + + #[Test] + public function readsLineByLineAfterHeader() + { + $file = $this->file(); + $file->fwrite(implode(PHP_EOL, [ + '["header","column"]', + '["value1","value2"]', + '["value3","value4"]', + '["value5","value6"]', + '' + ])); + + + $source = new JsonImportSource(new \SplFileObject($file->getRealPath(), 'r')); + $this->assertEquals( + [ + ['value1', 'value2'], + ['value3', 'value4'], + ['value5', 'value6'], + ], + iterator_to_array($source->rows()) + ); + } + + #[Test] + public function errorsOutOnColumnsNotMatchingRows() + { + $file = $this->file(); + $file->fwrite(implode(PHP_EOL, [ + '["header","column"]', + '["value1","value2"]', + '["value3"]', + '["value5","value6"]', + '' + ])); + + + $source = new JsonImportSource(new \SplFileObject($file->getRealPath(), 'r')); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Column count does not match header on row 2 in ' . $this->file); + iterator_to_array($source->rows()); + } +} diff --git a/tests/Import/WorkerImportTableTest.php b/tests/Import/WorkerImportTableTest.php new file mode 100644 index 0000000..3adf87d --- /dev/null +++ b/tests/Import/WorkerImportTableTest.php @@ -0,0 +1,56 @@ +writeFile('store.jsonl', [ + ['store_id', 'code', 'name', 'website_id'], + [2, 'english', 'English Store View', '1'], + [3, 'french', 'French Store View', '1'] + ]); + + $service = WorkerImportServiceFactory::fromConfiguration($container->configuration, $this->path) + ->create(); + + $service->importTable(TableEntry::fromName('store')->withRows(2), $this); + $service->await(); + + $connection = $container->configuration->createPDOConnection(); + $result = $connection->prepare('SELECT store_id, code, name, website_id FROM store'); + $result->execute(); + + $this->assertEquals( + [ + [2, 'english', 'English Store View', 1], + [3, 'french', 'French Store View', 1], + ], + $result->fetchAll() + ); + } +} diff --git a/tests/MySQLConfigurationTest.php b/tests/MySQLConfigurationTest.php new file mode 100644 index 0000000..3029e81 --- /dev/null +++ b/tests/MySQLConfigurationTest.php @@ -0,0 +1,108 @@ +assertEquals( + 'mysql:unix_socket=/var/run/mysqld/mysqld.sock;dbname=magento_db;charset=utf8mb4', + $configuration->generateDSN() + ); + } + + #[Test] + public function createsFromHost() + { + $configuration = MySQLConfiguration::fromHost( + 'db', + 'db_name' + ); + + $this->assertEquals( + 'mysql:host=db;port=3306;dbname=db_name;charset=utf8mb4', + $configuration->generateDSN() + ); + } + + #[Test] + public function allowsToSpecifyPort() + { + $configuration = MySQLConfiguration::fromHost('db', 'magento') + ->withPort(3307); + + $this->assertEquals( + 'mysql:host=db;port=3307;dbname=magento;charset=utf8mb4', + $configuration->generateDSN() + ); + } + + #[Test] + public function specifiesDefaultCredentials() + { + $configuration = MySQLConfiguration::fromHost('db', 'magento'); + $this->assertEquals('root', $configuration->user); + $this->assertEquals('', $configuration->password); + } + + #[Test] + public function overridesDefaultCredentials() + { + $configuration = MySQLConfiguration::fromHost('db', 'magento') + ->withCredentials('magento', 'password'); + + $this->assertEquals('magento', $configuration->user); + $this->assertEquals('password', $configuration->password); + } + + #[Test] + public function generatesDefaultDriverOptions() + { + $configuration = MySQLConfiguration::fromHost('db', 'magento'); + $this->assertEquals( + [ + PDO::MYSQL_ATTR_INIT_COMMAND => 'SET SQL_MODE=\'NO_AUTO_VALUE_ON_ZERO\',FOREIGN_KEY_CHECKS=0,NAMES utf8mb4', + PDO::ATTR_STRINGIFY_FETCHES => false, + PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_NUM, + PDO::MYSQL_ATTR_USE_BUFFERED_QUERY => false + ], + $configuration->generateDriverOptions() + ); + } + + #[Test] + public function customCharsetAffectsBothDnsAndDriverOptions() + { + $configuration = MySQLConfiguration::fromHost('db', 'magento') + ->withCharset('utf8mb3'); + + $this->assertStringContainsString('charset=utf8mb3', $configuration->generateDSN()); + $this->assertEquals( + 'SET SQL_MODE=\'NO_AUTO_VALUE_ON_ZERO\',FOREIGN_KEY_CHECKS=0,NAMES utf8mb3', + $configuration->generateDriverOptions()[PDO::MYSQL_ATTR_INIT_COMMAND] + ); + } + + #[Test] + public function settingsServerKeyAddsItToDriverOptions() + { + $configuration = MySQLConfiguration::fromHost('db', 'magento') + ->withServerKey('path/to/server/key.ca'); + + $this->assertEquals( + 'path/to/server/key.ca', + $configuration->generateDriverOptions()[PDO::MYSQL_ATTR_SERVER_PUBLIC_KEY] + ); + } +} diff --git a/tests/Progress/ConsoleProgressNotifierTest.php b/tests/Progress/ConsoleProgressNotifierTest.php new file mode 100644 index 0000000..fc0e911 --- /dev/null +++ b/tests/Progress/ConsoleProgressNotifierTest.php @@ -0,0 +1,79 @@ +createNotifier($output); + $notifier->start('table_one', 100); + $notifier->start('table_two', 99); + + $this->assertEquals( + [ + 'Exporting `table_one` [🚀---------------------------] 0% ', + 'Exporting `table_two` [🚀---------------------------] 0% ', + ], + $output->flushOutput() + ); + } + + #[Test] + public function updatesProgressBar() + { + $output = TestableConsoleOutput::create(); + $notifier = $this->createNotifier($output); + + $notifier->start('table_one', 100); + $notifier->start('table_two', 99); + $notifier->update('table_two', 40); + + $this->assertEquals( + [ + 'Exporting `table_one` [🚀---------------------------] 0% ', + 'Exporting `table_two` [===========🚀----------------] 40% ', + ], + $output->flushOutput() + ); + } + + #[Test] + public function notifiesWhenUploadHasFinished() + { + $output = TestableConsoleOutput::create(); + $notifier = $this->createNotifier($output)->withRealtime(); + + $notifier->start('table_one', 100); + $notifier->start('table_two', 99); + $notifier->update('table_two', 40); + $notifier->finish('table_two'); + + $this->assertEquals( + [ + 'Exported 99 rows in `table_two`', + 'Exporting `table_one` [🚀---------------------------] 0% ', + ], + $output->flushOutput() + ); + } + + public function createNotifier(TestableConsoleOutput $output, string $inProgress = 'Exporting', string $finished = 'Exported'): ConsoleProgressNotifier + { + return (new ConsoleProgressNotifier($output, $inProgress, $finished))->withRealtime(); + } +} diff --git a/tests/Progress/FakeChannel.php b/tests/Progress/FakeChannel.php new file mode 100644 index 0000000..faa475e --- /dev/null +++ b/tests/Progress/FakeChannel.php @@ -0,0 +1,43 @@ +messages)) { + throw new ChannelException('No messages in channel'); + } + + return array_shift($this->messages); + } + + public function send(mixed $data): void + { + $this->messages[] = $data; + } + + public function close(): void + { + throw new \LogicException('Should not be called'); + } + + public function isClosed(): bool + { + throw new \LogicException('Should not be called'); + } + + public function onClose(\Closure $onClose): void + { + throw new \LogicException('Should not be called'); + } +} diff --git a/tests/Progress/ProgressNotifierAware.php b/tests/Progress/ProgressNotifierAware.php new file mode 100644 index 0000000..1861914 --- /dev/null +++ b/tests/Progress/ProgressNotifierAware.php @@ -0,0 +1,23 @@ +progress[] = ['start', $name, $total]; + } + + public function update(string $name, int $current): void + { + $this->progress[] = ['update', $name, $current]; + } + + public function finish(string $name): void + { + $this->progress[] = ['finish', $name]; + } +} diff --git a/tests/Progress/WorkerProgressConsumerTest.php b/tests/Progress/WorkerProgressConsumerTest.php new file mode 100644 index 0000000..517f62c --- /dev/null +++ b/tests/Progress/WorkerProgressConsumerTest.php @@ -0,0 +1,32 @@ +process(new FakeChannel([ + new WorkerMessage(WorkerMessageType::Start, ['test1', 10]), + new WorkerMessage(WorkerMessageType::Update, ['test2', 10]), + new WorkerMessage(WorkerMessageType::Finish, ['test1']) + ])); + + $this->assertEquals( + [ + ['start', 'test1', 10], + ['update', 'test2', 10], + ['finish', 'test1'] + ], + $this->progress + ); + } +} diff --git a/tests/Progress/WorkerProgressNotifierTest.php b/tests/Progress/WorkerProgressNotifierTest.php new file mode 100644 index 0000000..0760377 --- /dev/null +++ b/tests/Progress/WorkerProgressNotifierTest.php @@ -0,0 +1,62 @@ +start('test1', 10); + $notifier->start('test2', 100); + $notifier->update('test1', 100); + $notifier->finish('test1'); + $notifier->update('test2', 5); + $notifier->finish('test2'); + + $this->assertEquals( + [ + new WorkerMessage(WorkerMessageType::Start, ['test1', 10]), + new WorkerMessage(WorkerMessageType::Start, ['test2', 100]), + new WorkerMessage(WorkerMessageType::Update, ['test1', 100]), + new WorkerMessage(WorkerMessageType::Finish, ['test1']), + new WorkerMessage(WorkerMessageType::Update, ['test2', 5]), + new WorkerMessage(WorkerMessageType::Finish, ['test2']) + ], + $channel->messages + ); + } + + + public function receive(?Cancellation $cancellation = null): mixed + { + throw new \LogicException('Should not be called'); + } + + public function send(mixed $data): void + { + $this->progress[] = $data; + } + + public function close(): void + { + throw new \LogicException('Should not be called'); + } + + public function isClosed(): bool + { + throw new \LogicException('Should not be called'); + } + + public function onClose(\Closure $onClose): void + { + throw new \LogicException('Should not be called'); + } +} diff --git a/tests/Sql/InsertOnDuplicateTest.php b/tests/Sql/InsertOnDuplicateTest.php new file mode 100644 index 0000000..52f70c7 --- /dev/null +++ b/tests/Sql/InsertOnDuplicateTest.php @@ -0,0 +1,51 @@ +assertEquals( + 'INSERT INTO `table1` (`column_one`,`column_two`) VALUES (?,?)', + (new InsertOnDuplicate()) + ->generate('table1', ['column_one', 'column_two'], 1) + ); + } + + #[Test] + public function generatesMultipleRows() + { + $this->assertEquals( + 'INSERT INTO `table1` (`column_one`,`column_two`) VALUES (?,?),(?,?),(?,?)', + (new InsertOnDuplicate()) + ->generate('table1', ['column_one', 'column_two'], 3) + ); + } + + #[Test] + public function generatesSingleRowWithOnDuplicate() + { + $this->assertEquals( + 'INSERT INTO `table1` (`column_one`,`column_two`) VALUES (?,?) ON DUPLICATE KEY UPDATE `column_two` = VALUES(`column_two`)', + (new InsertOnDuplicate()) + ->generate( + 'table1', + ['column_one', 'column_two'], + 1, + ['column_two'] + ) + ); + } +} diff --git a/tests/TempDirectoryFixture.php b/tests/TempDirectoryFixture.php new file mode 100644 index 0000000..dcfb8c3 --- /dev/null +++ b/tests/TempDirectoryFixture.php @@ -0,0 +1,51 @@ +path = implode(DIRECTORY_SEPARATOR, [sys_get_temp_dir(), uniqid('test-directory')]); + } + + private function file(): SplFileObject + { + mkdir($this->path, recursive: true); + $this->file = $this->path . DIRECTORY_SEPARATOR . uniqid('test-file'); + return new SplFileObject( + $this->file, + 'w+' + ); + } + + private function writeFile(string $name, array $content): SplFileObject + { + if (!is_dir($this->path)) { + mkdir($this->path, recursive: true); + } + $path = $this->path . DIRECTORY_SEPARATOR . $name; + $file = new SplFileObject($path, 'w'); + + foreach ($content as $line) { + $file->fwrite(json_encode($line) . PHP_EOL); + } + + return new SplFileObject($path, 'r'); + } + + protected function tearDown(): void + { + if (is_dir($this->path)) { + foreach (glob($this->path . DIRECTORY_SEPARATOR . '*') as $file) { + unlink($file); + } + rmdir($this->path); + } + } +} diff --git a/tests/TestableConsoleOutput.php b/tests/TestableConsoleOutput.php new file mode 100644 index 0000000..dd687bb --- /dev/null +++ b/tests/TestableConsoleOutput.php @@ -0,0 +1,108 @@ +stdOutput, decorated: true); + $this->getFormatter()->setStyle('error', new NullOutputFormatterStyle()); + $this->getFormatter()->setStyle('info', new NullOutputFormatterStyle()); + $this->getFormatter()->setStyle('warning', new NullOutputFormatterStyle()); + $this->errorOutput = new StreamOutput($this->stdError, decorated: true, formatter: $this->getFormatter()); + } + + public static function create(): self + { + return new self( + fopen('php://temp/maxmemory:8192', 'r+'), + fopen('php://temp/maxmemory:8192', 'r+') + ); + } + + public function getErrorOutput(): OutputInterface + { + return $this->errorOutput; + } + + public function setErrorOutput(OutputInterface $error): void + { + $this->errorOutput = $error; + } + + public function section(): ConsoleSectionOutput + { + return new ConsoleSectionOutput( + $this->stdOutput, + $this->consoleSectionOutputs, + $this->getVerbosity(), + $this->isDecorated(), + $this->getFormatter() + ); + } + + public function flushError(): array + { + return $this->readValue($this->stdError); + } + + public function flushOutput(): array + { + return $this->readValue($this->stdOutput); + } + + private function readValue($input): array + { + $position = ftell($input); + rewind($input); + + $lines = (object)['lines' => [], 'cursor' => 0]; + while ($line = fgets($input)) { + $lines->cursor = count($lines->lines); + $line = preg_replace_callback( + "/\x1b\[(\d+A|0J)/", + fn ($matches) => $this->processEscapeCommand($matches[1], $lines), + $line + ); + $lines->lines[] = rtrim($line, "\n"); + } + + fseek($input, $position); + return $lines->lines; + } + + private function processEscapeCommand(string $command, object $lines): string + { + if (str_ends_with($command, 'J')) { + array_splice($lines->lines, -$lines->cursor); + } + + if (str_ends_with($command, 'A')) { + $lines->cursor -= (int)substr($command, 0, -1); + } + + return ''; + } +} diff --git a/tests/fixtures.php b/tests/fixtures.php new file mode 100644 index 0000000..9c16b3c --- /dev/null +++ b/tests/fixtures.php @@ -0,0 +1,68 @@ +withMagentoVersion('2.4.7-p3') + ->withSampleData() + ->shared($sharedId) + ->getConnectionSettings(); + + return Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost( + $connection->host, + $connection->database, + ) + ->withPort($connection->port) + ->withCredentials( + $connection->user, + $connection->password + ), + 10 + ); +} + + +readonly class ContainerWithConfig +{ + public function __construct( + public Configuration $configuration, + public DbContainer $container + ) + { + + } +} + +function magentoWithoutData(): ContainerWithConfig +{ + $container = DbContainerBuilder::mysql() + ->withMagentoVersion('2.4.7-p3') + ->withSampleData() + ->build(); + + $connection = $container->getConnectionSettings(); + + $config = Configuration::fromMySQLConfiguration( + MySQLConfiguration::fromHost( + $connection->host, + $connection->database, + ) + ->withPort($connection->port) + ->withCredentials( + $connection->user, + $connection->password + ), + 10 + ); + + return new ContainerWithConfig( + $config, + $container, + ); +} \ No newline at end of file