From 49ed16daf43b5f28e51fed7cc124cd308de3deca Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 30 Jun 2017 23:36:21 +0200 Subject: [PATCH] MINOR: Compatibility and upgrade tests for 0.11.0.x Author: Ismael Juma Reviewers: Eno Thereska , Ewen Cheslack-Postava Closes #3454 from ijuma/test-upgrades-from-0.11.0.x --- tests/docker/Dockerfile | 1 + .../client/client_compatibility_features_test.py | 7 ++++--- .../client_compatibility_produce_consume_test.py | 3 ++- .../core/compatibility_test_new_broker_test.py | 13 +++++++------ tests/kafkatest/tests/core/upgrade_test.py | 4 +++- vagrant/base.sh | 6 ++++-- 6 files changed, 21 insertions(+), 13 deletions(-) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index add1734459d51..d68deee6a108a 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -45,6 +45,7 @@ RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s "${MIRROR}kafka/0.9.0.1/kafka_2.11- RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "${MIRROR}kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1" RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "${MIRROR}kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1" RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1" +RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s "${MIRROR}kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0" # Set up the ducker user. RUN useradd -ms /bin/bash ducker && mkdir -p /home/ducker/ && rsync -aiq /root/.ssh/ /home/ducker/.ssh && chown -R ducker /home/ducker/ /mnt/ && echo 'ducker ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py b/tests/kafkatest/tests/client/client_compatibility_features_test.py index a10c376b90f4e..c5c2f2dcc540c 100644 --- a/tests/kafkatest/tests/client/client_compatibility_features_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py @@ -23,11 +23,11 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from ducktape.tests.test import Test -from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, V_0_11_0_0, V_0_10_1_0, KafkaVersion +from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, V_0_11_0_0, V_0_10_1_0, KafkaVersion def get_broker_features(broker_version): features = {} - if (broker_version < V_0_10_1_0): + if broker_version < V_0_10_1_0: features["create-topics-supported"] = False features["offsets-for-times-supported"] = False features["cluster-id-supported"] = False @@ -37,7 +37,7 @@ def get_broker_features(broker_version): features["offsets-for-times-supported"] = True features["cluster-id-supported"] = True features["expect-record-too-large-exception"] = False - if (broker_version < V_0_11_0_0): + if broker_version < V_0_11_0_0: features["describe-acls-supported"] = False else: features["describe-acls-supported"] = True @@ -101,6 +101,7 @@ def invoke_compatibility_program(self, features): @parametrize(broker_version=str(LATEST_0_10_0)) @parametrize(broker_version=str(LATEST_0_10_1)) @parametrize(broker_version=str(LATEST_0_10_2)) + @parametrize(broker_version=str(LATEST_0_11_0)) def run_compatibility_test(self, broker_version): self.zk.start() self.kafka.set_version(KafkaVersion(broker_version)) diff --git a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py index 24f35c5d87d4f..d3aa83b12e005 100644 --- a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py +++ b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py @@ -22,7 +22,7 @@ from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int_with_prefix -from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, KafkaVersion +from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, KafkaVersion class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest): """ @@ -56,6 +56,7 @@ def min_cluster_size(self): @parametrize(broker_version=str(LATEST_0_10_0)) @parametrize(broker_version=str(LATEST_0_10_1)) @parametrize(broker_version=str(LATEST_0_10_2)) + @parametrize(broker_version=str(LATEST_0_11_0)) def test_produce_consume(self, broker_version): print("running producer_consumer_compat with broker_version = %s" % broker_version) self.kafka.set_version(KafkaVersion(broker_version)) diff --git a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py index 3747c43a264f4..0d6ad7d0e2c52 100644 --- a/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py +++ b/tests/kafkatest/tests/core/compatibility_test_new_broker_test.py @@ -23,7 +23,7 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -from kafkatest.version import LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, DEV_BRANCH, KafkaVersion +from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, DEV_BRANCH, KafkaVersion # Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x) class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest): @@ -44,17 +44,18 @@ def setUp(self): self.messages_per_producer = 1000 @cluster(num_nodes=6) - @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=None) + @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) + @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime")) @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None) - @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=None) @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime")) - @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) + @parametrize(producer_version=str(LATEST_0_11_0), consumer_version=str(LATEST_0_11_0), compression_types=["gzip"], timestamp_type=str("CreateTime")) @parametrize(producer_version=str(LATEST_0_10_2), consumer_version=str(LATEST_0_10_2), compression_types=["lz4"], timestamp_type=str("CreateTime")) @parametrize(producer_version=str(LATEST_0_10_1), consumer_version=str(LATEST_0_10_1), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) @parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) + @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=None) + @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=None) @parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime")) - @parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime")) + @parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None) def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None): self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: { diff --git a/tests/kafkatest/tests/core/upgrade_test.py b/tests/kafkatest/tests/core/upgrade_test.py index 5b7756d607562..3e88755b88718 100644 --- a/tests/kafkatest/tests/core/upgrade_test.py +++ b/tests/kafkatest/tests/core/upgrade_test.py @@ -25,7 +25,7 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest from kafkatest.utils import is_int -from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, DEV_BRANCH, KafkaVersion +from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, DEV_BRANCH, KafkaVersion class TestUpgrade(ProduceConsumeValidateTest): @@ -62,6 +62,8 @@ def perform_upgrade(self, from_kafka_version, to_message_format_version=None): self.kafka.start_node(node) @cluster(num_nodes=6) + @parametrize(from_kafka_version=str(LATEST_0_11_0), to_message_format_version=None, compression_types=["gzip"], new_consumer=False) + @parametrize(from_kafka_version=str(LATEST_0_11_0), to_message_format_version=None, compression_types=["lz4"]) @parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_9), compression_types=["none"]) @parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_10), compression_types=["snappy"], new_consumer=False) @parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=None, compression_types=["lz4"]) diff --git a/vagrant/base.sh b/vagrant/base.sh index 7c0b5ed852f10..2ebebf9adaacf 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -63,9 +63,9 @@ get_kafka() { scala_version=$2 kafka_dir=/opt/kafka-$version - url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_$scala_version-$version.tgz + url=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka_$scala_version-$version.tgz # the .tgz above does not include the streams test jar hence we need to get it separately - url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka-streams-$version-test.jar + url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar if [ ! -d /opt/kafka-$version ]; then pushd /tmp curl -O $url @@ -93,6 +93,8 @@ get_kafka 0.10.1.1 2.11 chmod a+rw /opt/kafka-0.10.1.1 get_kafka 0.10.2.1 2.11 chmod a+rw /opt/kafka-0.10.2.1 +get_kafka 0.11.0.0 2.11 +chmod a+rw /opt/kafka-0.11.0.0 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local