-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark_commands.txt
47 lines (32 loc) · 1.94 KB
/
spark_commands.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
Reading data from apache kafka from apache spark
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test-mysql-jdbc-coffee")
.option("startingOffsets", "latest")
.load()
val schema = new StructType()
.add("id",IntegerType)
.add("product",StringType)
.add("price",DoubleType)
.add("ptime",LongType)
//this line is only used for reading schema
val coffes = spark.sqlContext.read.json("/home/ozhan/Downloads/kafka/data/data.json")
import org.apache.spark.sql.types.{StringType, LongType, StructField, StructType, DoubleType, IntegerType}
val kdf = df.selectExpr("CAST(value AS STRING) as json").
select(from_json($"json",schema=coffes.schema).as("data")). select("data.payload"). select("payload.id","payload.price","payload.product","payload.ptime")
val coffeeWin = kdf.groupBy(window('Date, "1 hour"),product).
.agg(sum("price").as("hourly_sum"),count(*).as(hourly_cnt))
val coffeeNewSUM = coffeeWin.selectExpr("window.start" as ptime,"product","hourly_sum","hourly_cnt")
coffeeNewSUM.writeStream.format("csv").outputMode("append").start("/home/ozhan/coffeesumdata/").awaitTermination()
//now, sample data at this state
'1598346000', 'COFFEE', 50.00, 10
'1598346000', 'TEA', 18.00, 6
'1598353200', 'PIE', 20.00, 2
Reading data from local file system for creating BI queries
val coffeeReadedDF = spark.read.format("csv").option("header", "true")
.option("inferSchema","true")
.load("/home/ozhan/coffeesumdata/")
coffeeReadedDF.createOrReplaceTempView("coffeeReadedDFTemp")
val resDF = spark.sql("select product,from_unixtime(ptime,'yyyyMMdd') as mtime,sum(hourly_sum),sum(hourly_cnt) from coffeeReadedDFTemp group by product,from_unixtime(ptime,'yyyyMMdd') order by mtime")
resDF.write.format("csv").option("sep",",").option("header","true").save("/home/ozhan/coffeeresdata/")