Skip to content

Spark and Minio

Jitse-Jan edited this page Jun 30, 2019 · 1 revision
from pyspark import SparkContext, SparkConf, SQLContext
import os

os.environ['HADOOP_HOME'] = '/opt/hadoop/'
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'
os.environ['PYSPARK_PYTHON'] = 'python3'
os.environ['LD_LIBRARY_PATH'] = '/opt/hadoop/lib/native'
os.environ['SPARK_DIST_CLASSPATH'] = "/opt/hadoop/etc/hadoop:/opt/hadoop/share/hadoop/common/lib/*:/opt/hadoop/share/hadoop/common/*:/opt/hadoop/share/hadoop/hdfs:/opt/hadoop/share/hadoop/hdfs/lib/*:/opt/hadoop/share/hadoop/hdfs/*:/opt/hadoop/share/hadoop/mapreduce/lib/*:/opt/hadoop/share/hadoop/mapreduce/*:/opt/hadoop/share/hadoop/yarn:/opt/hadoop/share/hadoop/yarn/lib/*:/opt/hadoop/share/hadoop/yarn/*"
os.environ['SPARK_HOME'] = '/opt/spark/'

conf = (
    SparkConf()
    .setAppName("Spark Minio Test")
    .set("spark.hadoop.fs.s3a.endpoint", "http://localhost:9091")
    .set("spark.hadoop.fs.s3a.access.key", os.environ.get('MINIO_ACCESS_KEY'))
    .set("spark.hadoop.fs.s3a.secret.key", os.environ.get('MINIO_SECRET_KEY'))
    .set("spark.hadoop.fs.s3a.path.style.access", True)
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
)
sc = SparkContext(conf=conf).getOrCreate()
sqlContext = SQLContext(sc)

print(sc.wholeTextFiles('s3a://datalake/test.txt').collect())
# Returns: [('s3a://datalake/test.txt', 'Some text\nfor testing\n')]
path = "s3a://user-jitsejan/mario-colors-two/"
rdd = sc.parallelize([('Mario', 'Red'), ('Luigi', 'Green'), ('Princess', 'Pink')])
rdd.toDF(['name', 'color']).write.csv(path)
Clone this wiki locally