Skip to content

Commit

Permalink
KAFKA-4450; Add upgrade tests for 0.10.1 and rename TRUNK to DEV_BRAN…
Browse files Browse the repository at this point in the history
…CH to reduce confusion

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Ismael Juma <[email protected]>

Closes apache#2457 from ewencp/kafka-4450-upgrade-tests
  • Loading branch information
ewencp authored and ijuma committed Jan 28, 2017
1 parent 8827a5b commit 6264cc1
Show file tree
Hide file tree
Showing 29 changed files with 107 additions and 106 deletions.
4 changes: 2 additions & 2 deletions tests/docker/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ docker network inspect knw

for i in $(seq -w 1 ${KAFKA_NUM_CONTAINERS}); do
echo knode${i}
docker exec knode${i} bash -c "(tar xfz /kafka_src/core/build/distributions/kafka_*SNAPSHOT.tgz -C /opt || echo missing kafka tgz did you build kafka tarball) && mv /opt/kafka*SNAPSHOT /opt/kafka-trunk && ls -l /opt"
docker exec knode${i} bash -c "(tar xfz /kafka_src/core/build/distributions/kafka_*SNAPSHOT.tgz -C /opt || echo missing kafka tgz did you build kafka tarball) && mv /opt/kafka*SNAPSHOT /opt/kafka-dev && ls -l /opt"
docker exec knode01 bash -c "ssh knode$i hostname"
done

Expand All @@ -64,7 +64,7 @@ done
(cd ${KAFKA_SRC} && ./gradlew copyDependantTestLibs)
for i in $(seq -w 1 ${KAFKA_NUM_CONTAINERS}); do
echo knode${i}
docker exec knode${i} bash -c "cp /kafka_src/core/build/dependant-testlibs/* /opt/kafka-trunk/libs/"
docker exec knode${i} bash -c "cp /kafka_src/core/build/dependant-testlibs/* /opt/kafka-dev/libs/"
docker exec knode01 bash -c "ssh knode$i hostname"
done

Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
# due to python version naming restrictions, which are enforced by python packaging tools
# (see https://www.python.org/dev/peps/pep-0440/)
#
# Instead, in trunk, the version should have a suffix of the form ".devN"
# Instead, in development branches, the version should have a suffix of the form ".devN"
#
# For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be something like "0.9.0.0.dev0"
__version__ = '0.10.3.0.dev0'
18 changes: 9 additions & 9 deletions tests/kafkatest/benchmarks/core/benchmark_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from kafkatest.services.kafka import KafkaService
from kafkatest.services.performance import ProducerPerformanceService, EndToEndLatencyService, ConsumerPerformanceService, throughput, latency, compute_aggregate_throughput
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import TRUNK, KafkaVersion
from kafkatest.version import DEV_BRANCH, KafkaVersion

TOPIC_REP_ONE = "topic-replication-factor-one"
TOPIC_REP_THREE = "topic-replication-factor-three"
Expand Down Expand Up @@ -72,8 +72,8 @@ def start_kafka(self, security_protocol, interbroker_security_protocol, version)
@cluster(num_nodes=7)
@parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)
def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DEFAULT_RECORD_SIZE,
compression_type="none", security_protocol='PLAINTEXT', client_version=str(TRUNK),
broker_version=str(TRUNK)):
compression_type="none", security_protocol='PLAINTEXT', client_version=str(DEV_BRANCH),
broker_version=str(DEV_BRANCH)):
"""
Setup: 1 node zk + 3 node kafka cluster
Produce ~128MB worth of messages to a topic with 6 partitions. Required acks, topic replication factor,
Expand Down Expand Up @@ -104,8 +104,8 @@ def test_producer_throughput(self, acks, topic, num_producers=1, message_size=DE
@parametrize(security_protocol='SSL', interbroker_security_protocol='PLAINTEXT')
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
def test_long_term_producer_throughput(self, compression_type="none", security_protocol='PLAINTEXT',
interbroker_security_protocol=None, client_version=str(TRUNK),
broker_version=str(TRUNK)):
interbroker_security_protocol=None, client_version=str(DEV_BRANCH),
broker_version=str(DEV_BRANCH)):
"""
Setup: 1 node zk + 3 node kafka cluster
Produce 10e6 100 byte messages to a topic with 6 partitions, replication-factor 3, and acks=1.
Expand Down Expand Up @@ -162,8 +162,8 @@ def test_long_term_producer_throughput(self, compression_type="none", security_p
@cluster(num_nodes=6)
@matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], compression_type=["none", "snappy"])
def test_end_to_end_latency(self, compression_type="none", security_protocol="PLAINTEXT",
interbroker_security_protocol=None, client_version=str(TRUNK),
broker_version=str(TRUNK)):
interbroker_security_protocol=None, client_version=str(DEV_BRANCH),
broker_version=str(DEV_BRANCH)):
"""
Setup: 1 node zk + 3 node kafka cluster
Produce (acks = 1) and consume 10e3 messages to a topic with 6 partitions and replication-factor 3,
Expand Down Expand Up @@ -194,7 +194,7 @@ def test_end_to_end_latency(self, compression_type="none", security_protocol="PL
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
def test_producer_and_consumer(self, compression_type="none", security_protocol="PLAINTEXT",
interbroker_security_protocol=None, new_consumer=True,
client_version=str(TRUNK), broker_version=str(TRUNK)):
client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)):
"""
Setup: 1 node zk + 3 node kafka cluster
Concurrently produce and consume 10e6 messages with a single producer and a single consumer,
Expand Down Expand Up @@ -243,7 +243,7 @@ def test_producer_and_consumer(self, compression_type="none", security_protocol=
@matrix(security_protocol=['PLAINTEXT', 'SSL'], compression_type=["none", "snappy"])
def test_consumer_throughput(self, compression_type="none", security_protocol="PLAINTEXT",
interbroker_security_protocol=None, new_consumer=True, num_consumers=1,
client_version=str(TRUNK), broker_version=str(TRUNK)):
client_version=str(DEV_BRANCH), broker_version=str(DEV_BRANCH)):
"""
Consume 10e6 100-byte messages with 1 or more consumers from a topic with 6 partitions
(using new consumer iff new_consumer == True), and report throughput.
Expand Down
14 changes: 7 additions & 7 deletions tests/kafkatest/directory_layout/kafka_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import importlib
import os

from kafkatest.version import get_version, KafkaVersion, TRUNK
from kafkatest.version import get_version, KafkaVersion, DEV_BRANCH


"""This module serves a few purposes:
Expand All @@ -43,7 +43,7 @@
TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME = "tools-dependant-libs"

JARS = {
"trunk": {
"dev": {
CORE_JAR_NAME: "core/build/*/*.jar",
CORE_LIBS_JAR_NAME: "core/build/libs/*.jar",
CORE_DEPENDANT_TEST_LIBS_JAR_NAME: "core/build/dependant-testlibs/*.jar",
Expand Down Expand Up @@ -97,7 +97,7 @@ def path(self):
class KafkaSystemTestPathResolver(object):
"""Path resolver for Kafka system tests which assumes the following layout:
/opt/kafka-trunk # Current version of kafka under test
/opt/kafka-dev # Current version of kafka under test
/opt/kafka-0.9.0.1 # Example of an older version of kafka installed from tarball
/opt/kafka-<version> # Other previous versions of kafka
...
Expand All @@ -106,23 +106,23 @@ def __init__(self, context, project="kafka"):
self.context = context
self.project = project

def home(self, node_or_version=TRUNK):
def home(self, node_or_version=DEV_BRANCH):
version = self._version(node_or_version)
home_dir = self.project
if version is not None:
home_dir += "-%s" % str(version)

return os.path.join(KAFKA_INSTALL_ROOT, home_dir)

def bin(self, node_or_version=TRUNK):
def bin(self, node_or_version=DEV_BRANCH):
version = self._version(node_or_version)
return os.path.join(self.home(version), "bin")

def script(self, script_name, node_or_version=TRUNK):
def script(self, script_name, node_or_version=DEV_BRANCH):
version = self._version(node_or_version)
return os.path.join(self.bin(version), script_name)

def jar(self, jar_name, node_or_version=TRUNK):
def jar(self, jar_name, node_or_version=DEV_BRANCH):
version = self._version(node_or_version)
return os.path.join(self.home(version), JARS[str(version)][jar_name])

Expand Down
6 changes: 3 additions & 3 deletions tests/kafkatest/sanity_checks/test_kafka_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from kafkatest.services.kafka import KafkaService, config_property
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.utils import is_version
from kafkatest.version import LATEST_0_8_2, TRUNK
from kafkatest.version import LATEST_0_8_2, DEV_BRANCH


class KafkaVersionTest(Test):
Expand Down Expand Up @@ -47,12 +47,12 @@ def test_0_8_2(self):
@cluster(num_nodes=3)
def test_multi_version(self):
"""Test kafka service node-versioning api - ensure we can bring up a 2-node cluster, one on version 0.8.2.X,
the other on trunk."""
the other on the current development branch."""
self.kafka = KafkaService(self.test_context, num_nodes=2, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 2}})
self.kafka.nodes[1].version = LATEST_0_8_2
self.kafka.nodes[1].config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X"
self.kafka.start()

assert is_version(self.kafka.nodes[0], [TRUNK.vstring])
assert is_version(self.kafka.nodes[0], [DEV_BRANCH.vstring])
assert is_version(self.kafka.nodes[1], [LATEST_0_8_2])
6 changes: 3 additions & 3 deletions tests/kafkatest/sanity_checks/test_performance_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from kafkatest.services.performance import ProducerPerformanceService, ConsumerPerformanceService, EndToEndLatencyService
from kafkatest.services.performance import latency, compute_aggregate_throughput
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.version import TRUNK, LATEST_0_8_2, LATEST_0_9, KafkaVersion
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, KafkaVersion


class PerformanceServiceTest(Test):
Expand All @@ -42,8 +42,8 @@ def setUp(self):
@parametrize(version=str(LATEST_0_8_2), new_consumer=False)
@parametrize(version=str(LATEST_0_9), new_consumer=False)
@parametrize(version=str(LATEST_0_9))
@parametrize(version=str(TRUNK), new_consumer=False)
@parametrize(version=str(TRUNK))
@parametrize(version=str(DEV_BRANCH), new_consumer=False)
@parametrize(version=str(DEV_BRANCH))
def test_version(self, version=str(LATEST_0_9), new_consumer=True):
"""
Sanity check out producer performance service - verify that we can run the service with a small
Expand Down
16 changes: 8 additions & 8 deletions tests/kafkatest/sanity_checks/test_verifiable_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.utils import is_version
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, TRUNK, KafkaVersion
from kafkatest.version import LATEST_0_8_2, LATEST_0_9, DEV_BRANCH, KafkaVersion


class TestVerifiableProducer(Test):
Expand All @@ -48,10 +48,10 @@ def setUp(self):
@cluster(num_nodes=3)
@parametrize(producer_version=str(LATEST_0_8_2))
@parametrize(producer_version=str(LATEST_0_9))
@parametrize(producer_version=str(TRUNK))
def test_simple_run(self, producer_version=TRUNK):
@parametrize(producer_version=str(DEV_BRANCH))
def test_simple_run(self, producer_version=DEV_BRANCH):
"""
Test that we can start VerifiableProducer on trunk or against the 0.8.2 jar, and
Test that we can start VerifiableProducer on the current branch snapshot version or against the 0.8.2 jar, and
verify that we can produce a small number of messages.
"""
node = self.producer.nodes[0]
Expand All @@ -61,11 +61,11 @@ def test_simple_run(self, producer_version=TRUNK):
err_msg="Producer failed to start in a reasonable amount of time.")

# using version.vstring (distutils.version.LooseVersion) is a tricky way of ensuring
# that this check works with TRUNK
# When running VerifiableProducer 0.8.X, both trunk version and 0.8.X should show up because of the way
# verifiable producer pulls in some trunk directories into its classpath
# that this check works with DEV_BRANCH
# When running VerifiableProducer 0.8.X, both the current branch version and 0.8.X should show up because of the
# way verifiable producer pulls in some development directories into its classpath
if node.version <= LATEST_0_8_2:
assert is_version(node, [node.version.vstring, TRUNK.vstring])
assert is_version(node, [node.version.vstring, DEV_BRANCH.vstring])
else:
assert is_version(node, [node.version.vstring])

Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/services/console_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.version import TRUNK, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0
from kafkatest.version import DEV_BRANCH, LATEST_0_8_2, LATEST_0_9, LATEST_0_10_0, V_0_10_0_0

"""
0.8.2.1 ConsoleConsumer options
Expand Down Expand Up @@ -87,7 +87,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
}

def __init__(self, context, num_nodes, kafka, topic, group_id="test-consumer-group", new_consumer=True,
message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=TRUNK,
message_validator=None, from_beginning=True, consumer_timeout_ms=None, version=DEV_BRANCH,
client_id="console-consumer", print_key=False, jmx_object_names=None, jmx_attributes=None,
enable_systest_events=False, stop_timeout_sec=15, print_timestamp=False):
"""
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.security.minikdc import MiniKdc
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import TRUNK
from kafkatest.version import DEV_BRANCH

Port = collections.namedtuple('Port', ['name', 'number', 'open'])

Expand Down Expand Up @@ -67,7 +67,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):

def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
authorizer_class_name=None, topics=None, version=TRUNK, jmx_object_names=None,
authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=[]):
"""
:type context
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/services/performance/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import TRUNK, V_0_9_0_0, LATEST_0_10_0
from kafkatest.version import DEV_BRANCH, V_0_9_0_0, LATEST_0_10_0


class ConsumerPerformanceService(PerformanceService):
Expand Down Expand Up @@ -70,7 +70,7 @@ class ConsumerPerformanceService(PerformanceService):
"collect_default": True}
}

def __init__(self, context, num_nodes, kafka, topic, messages, version=TRUNK, new_consumer=True, settings={}):
def __init__(self, context, num_nodes, kafka, topic, messages, version=DEV_BRANCH, new_consumer=True, settings={}):
super(ConsumerPerformanceService, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/services/performance/end_to_end_latency.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import TRUNK, V_0_9_0_0
from kafkatest.version import DEV_BRANCH, V_0_9_0_0



Expand Down Expand Up @@ -45,7 +45,7 @@ class EndToEndLatencyService(PerformanceService):
"collect_default": True}
}

def __init__(self, context, num_nodes, kafka, topic, num_records, compression_type="none", version=TRUNK, acks=1):
def __init__(self, context, num_nodes, kafka, topic, num_records, compression_type="none", version=DEV_BRANCH, acks=1):
super(EndToEndLatencyService, self).__init__(context, num_nodes)
self.kafka = kafka
self.security_config = kafka.security_config.client_config()
Expand Down
12 changes: 6 additions & 6 deletions tests/kafkatest/services/performance/producer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.performance import PerformanceService
from kafkatest.services.security.security_config import SecurityConfig
from kafkatest.version import TRUNK, V_0_9_0_0
from kafkatest.version import DEV_BRANCH, V_0_9_0_0


class ProducerPerformanceService(JmxMixin, PerformanceService):
Expand All @@ -34,7 +34,7 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
LOG_FILE = os.path.join(LOG_DIR, "producer_performance.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")

def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=TRUNK, settings=None,
def __init__(self, context, num_nodes, kafka, topic, num_records, record_size, throughput, version=DEV_BRANCH, settings=None,
intermediate_stats=False, client_id="producer-performance", jmx_object_names=None, jmx_attributes=None):

JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes or [])
Expand Down Expand Up @@ -89,11 +89,11 @@ def start_cmd(self, node):

cmd = ""

if node.version < TRUNK:
if node.version < DEV_BRANCH:
# In order to ensure more consistent configuration between versions, always use the ProducerPerformance
# tool from trunk
tools_jar = self.path.jar(TOOLS_JAR_NAME, TRUNK)
tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, TRUNK)
# tool from the development branch
tools_jar = self.path.jar(TOOLS_JAR_NAME, DEV_BRANCH)
tools_dependant_libs_jar = self.path.jar(TOOLS_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)

cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_jar
cmd += "for file in %s; do CLASSPATH=$CLASSPATH:$file; done; " % tools_dependant_libs_jar
Expand Down
6 changes: 3 additions & 3 deletions tests/kafkatest/services/security/minikdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from ducktape.services.service import Service

from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin, CORE_LIBS_JAR_NAME, CORE_DEPENDANT_TEST_LIBS_JAR_NAME
from kafkatest.version import TRUNK
from kafkatest.version import DEV_BRANCH


class MiniKdc(KafkaPathResolverMixin, Service):
Expand Down Expand Up @@ -103,8 +103,8 @@ def start_node(self, node):
principals = 'client ' + kafka_principals + ' ' + self.extra_principals
self.logger.info("Starting MiniKdc with principals " + principals)

core_libs_jar = self.path.jar(CORE_LIBS_JAR_NAME, TRUNK)
core_dependant_test_libs_jar = self.path.jar(CORE_DEPENDANT_TEST_LIBS_JAR_NAME, TRUNK)
core_libs_jar = self.path.jar(CORE_LIBS_JAR_NAME, DEV_BRANCH)
core_dependant_test_libs_jar = self.path.jar(CORE_DEPENDANT_TEST_LIBS_JAR_NAME, DEV_BRANCH)

cmd = "for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_libs_jar
cmd += " for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % core_dependant_test_libs_jar
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/services/verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import TopicPartition
from kafkatest.version import TRUNK
from kafkatest.version import DEV_BRANCH


class ConsumerState:
Expand Down Expand Up @@ -136,7 +136,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, BackgroundThreadService):
def __init__(self, context, num_nodes, kafka, topic, group_id,
max_messages=-1, session_timeout_sec=30, enable_autocommit=False,
assignment_strategy="org.apache.kafka.clients.consumer.RangeAssignor",
version=TRUNK, stop_timeout_sec=30):
version=DEV_BRANCH, stop_timeout_sec=30):
super(VerifiableConsumer, self).__init__(context, num_nodes)
self.log_level = "TRACE"

Expand Down
Loading

0 comments on commit 6264cc1

Please sign in to comment.