diff --git a/blockchainetl/jobs/exporters/kafka_exporter.py b/blockchainetl/jobs/exporters/kafka_exporter.py index ac75e242b..6afa00dae 100644 --- a/blockchainetl/jobs/exporters/kafka_exporter.py +++ b/blockchainetl/jobs/exporters/kafka_exporter.py @@ -13,14 +13,21 @@ def __init__(self, output, item_type_to_topic_mapping, converters=()): self.item_type_to_topic_mapping = item_type_to_topic_mapping self.converter = CompositeItemConverter(converters) self.connection_url = self.get_connection_url(output) - print(self.connection_url) + self.topic_prefix = self.get_topic_prefix(output) + print(self.connection_url, self.topic_prefix) self.producer = KafkaProducer(bootstrap_servers=self.connection_url) def get_connection_url(self, output): try: return output.split('/')[1] - except KeyError: - raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092"') + except IndexError: + raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092" or "kafka/127.0.0.1:9092/"') + + def get_topic_prefix(self, output): + try: + return output.split('/')[2] + "." + except IndexError: + return '' def open(self): pass @@ -34,7 +41,7 @@ def export_item(self, item): if item_type is not None and item_type in self.item_type_to_topic_mapping: data = json.dumps(item).encode('utf-8') print(data) - return self.producer.send(self.item_type_to_topic_mapping[item_type], value=data) + return self.producer.send(self.topic_prefix + self.item_type_to_topic_mapping[item_type], value=data) else: logging.warning('Topic for item type "{}" is not configured.'.format(item_type)) @@ -45,7 +52,6 @@ def convert_items(self, items): def close(self): pass - def group_by_item_type(items): result = collections.defaultdict(list) for item in items: diff --git a/docs/commands.md b/docs/commands.md index 2be620cda..e8b1124d7 100644 --- a/docs/commands.md +++ b/docs/commands.md @@ -213,7 +213,7 @@ e.g. `-e block,transaction,log,token_transfer,trace,contract,token`. - For Postgres: `--output=postgresql+pg8000://:@:/`, e.g. `--output=postgresql+pg8000://postgres:admin@127.0.0.1:5432/ethereum`. - For GCS: `--output=gs://`. Make sure to install and initialize `gcloud` cli. - - For Kafka: `--output=kafka/:`, e.g. `--output=kafka/127.0.0.1:9092` + - For Kafka: `--output=kafka/:/`, e.g. `--output=kafka/127.0.0.1:9092` or `--output=kafka/127.0.0.1:9092/crypto_ethereum`. - Those output types can be combined with a comma e.g. `--output=gs://,projects//topics/crypto_ethereum` The [schema](https://github.com/blockchain-etl/ethereum-etl-postgres/tree/master/schema)