Skip to content

Commit

Permalink
MINOR: Compatibility and upgrade tests for 0.11.0.x
Browse files Browse the repository at this point in the history
Author: Ismael Juma <[email protected]>

Reviewers: Eno Thereska <[email protected]>, Ewen Cheslack-Postava <[email protected]>

Closes apache#3454 from ijuma/test-upgrades-from-0.11.0.x
  • Loading branch information
ijuma committed Jun 30, 2017
1 parent 342f34a commit 49ed16d
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 13 deletions.
1 change: 1 addition & 0 deletions tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s "${MIRROR}kafka/0.9.0.1/kafka_2.11-
RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "${MIRROR}kafka/0.10.0.1/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "${MIRROR}kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "${MIRROR}kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s "${MIRROR}kafka/0.11.0.0/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"

# Set up the ducker user.
RUN useradd -ms /bin/bash ducker && mkdir -p /home/ducker/ && rsync -aiq /root/.ssh/ /home/ducker/.ssh && chown -R ducker /home/ducker/ /mnt/ && echo 'ducker ALL=(ALL) NOPASSWD: ALL' >> /etc/sudoers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from ducktape.tests.test import Test
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, V_0_11_0_0, V_0_10_1_0, KafkaVersion
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, V_0_11_0_0, V_0_10_1_0, KafkaVersion

def get_broker_features(broker_version):
features = {}
if (broker_version < V_0_10_1_0):
if broker_version < V_0_10_1_0:
features["create-topics-supported"] = False
features["offsets-for-times-supported"] = False
features["cluster-id-supported"] = False
Expand All @@ -37,7 +37,7 @@ def get_broker_features(broker_version):
features["offsets-for-times-supported"] = True
features["cluster-id-supported"] = True
features["expect-record-too-large-exception"] = False
if (broker_version < V_0_11_0_0):
if broker_version < V_0_11_0_0:
features["describe-acls-supported"] = False
else:
features["describe-acls-supported"] = True
Expand Down Expand Up @@ -101,6 +101,7 @@ def invoke_compatibility_program(self, features):
@parametrize(broker_version=str(LATEST_0_10_0))
@parametrize(broker_version=str(LATEST_0_10_1))
@parametrize(broker_version=str(LATEST_0_10_2))
@parametrize(broker_version=str(LATEST_0_11_0))
def run_compatibility_test(self, broker_version):
self.zk.start()
self.kafka.set_version(KafkaVersion(broker_version))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int_with_prefix
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, KafkaVersion
from kafkatest.version import DEV_BRANCH, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, KafkaVersion

class ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
"""
Expand Down Expand Up @@ -56,6 +56,7 @@ def min_cluster_size(self):
@parametrize(broker_version=str(LATEST_0_10_0))
@parametrize(broker_version=str(LATEST_0_10_1))
@parametrize(broker_version=str(LATEST_0_10_2))
@parametrize(broker_version=str(LATEST_0_11_0))
def test_produce_consume(self, broker_version):
print("running producer_consumer_compat with broker_version = %s" % broker_version)
self.kafka.set_version(KafkaVersion(broker_version))
Expand Down
13 changes: 7 additions & 6 deletions tests/kafkatest/tests/core/compatibility_test_new_broker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_10_2, LATEST_0_10_1, LATEST_0_10_0, LATEST_0_9, LATEST_0_8_2, DEV_BRANCH, KafkaVersion
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, DEV_BRANCH, KafkaVersion

# Compatibility tests for moving to a new broker (e.g., 0.10.x) and using a mix of old and new clients (e.g., 0.9.x)
class ClientCompatibilityTestNewBroker(ProduceConsumeValidateTest):
Expand All @@ -44,17 +44,18 @@ def setUp(self):
self.messages_per_producer = 1000

@cluster(num_nodes=6)
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=None)
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("CreateTime"))
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
@parametrize(producer_version=str(LATEST_0_11_0), consumer_version=str(LATEST_0_11_0), compression_types=["gzip"], timestamp_type=str("CreateTime"))
@parametrize(producer_version=str(LATEST_0_10_2), consumer_version=str(LATEST_0_10_2), compression_types=["lz4"], timestamp_type=str("CreateTime"))
@parametrize(producer_version=str(LATEST_0_10_1), consumer_version=str(LATEST_0_10_1), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
@parametrize(producer_version=str(LATEST_0_10_0), consumer_version=str(LATEST_0_10_0), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=None)
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(DEV_BRANCH), compression_types=["snappy"], timestamp_type=None)
@parametrize(producer_version=str(LATEST_0_9), consumer_version=str(LATEST_0_9), compression_types=["snappy"], timestamp_type=str("LogAppendTime"))
@parametrize(producer_version=str(DEV_BRANCH), consumer_version=str(DEV_BRANCH), compression_types=["none"], new_consumer=False, timestamp_type=str("LogAppendTime"))
@parametrize(producer_version=str(LATEST_0_8_2), consumer_version=str(LATEST_0_8_2), compression_types=["none"], new_consumer=False, timestamp_type=None)
def test_compatibility(self, producer_version, consumer_version, compression_types, new_consumer=True, timestamp_type=None):

self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=DEV_BRANCH, topics={self.topic: {
Expand Down
4 changes: 3 additions & 1 deletion tests/kafkatest/tests/core/upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
from kafkatest.utils import is_int
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, DEV_BRANCH, KafkaVersion
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, LATEST_0_10, LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, DEV_BRANCH, KafkaVersion

class TestUpgrade(ProduceConsumeValidateTest):

Expand Down Expand Up @@ -62,6 +62,8 @@ def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
self.kafka.start_node(node)

@cluster(num_nodes=6)
@parametrize(from_kafka_version=str(LATEST_0_11_0), to_message_format_version=None, compression_types=["gzip"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_11_0), to_message_format_version=None, compression_types=["lz4"])
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_9), compression_types=["none"])
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_10), compression_types=["snappy"], new_consumer=False)
@parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=None, compression_types=["lz4"])
Expand Down
6 changes: 4 additions & 2 deletions vagrant/base.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ get_kafka() {
scala_version=$2

kafka_dir=/opt/kafka-$version
url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_$scala_version-$version.tgz
url=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka_$scala_version-$version.tgz
# the .tgz above does not include the streams test jar hence we need to get it separately
url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka-streams-$version-test.jar
url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar
if [ ! -d /opt/kafka-$version ]; then
pushd /tmp
curl -O $url
Expand Down Expand Up @@ -93,6 +93,8 @@ get_kafka 0.10.1.1 2.11
chmod a+rw /opt/kafka-0.10.1.1
get_kafka 0.10.2.1 2.11
chmod a+rw /opt/kafka-0.10.2.1
get_kafka 0.11.0.0 2.11
chmod a+rw /opt/kafka-0.11.0.0


# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
Expand Down

0 comments on commit 49ed16d

Please sign in to comment.