Click here to Skip to main content
15,888,527 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


class streaming:
    def __init__(self, inputformat, input_json_schema):
        self.inputformat = inputformat,
        self.input_json_schema = input_json_schema

    def connection(self, query):
        spark = SparkSession\
            .builder\
            .appName("File Streaming Application - JSON")\
            .master("local[*]")\
            .getOrCreate()

        spark.sparkContext.setLogLevel('ERROR')
        print(self.inputformat[0])
        print(self.input_json_schema)

        stream_df = spark\
            .readStream\
            .format(self.inputformat[0]) \
            .schema(self.input_json_schema) \
            .load(path="./json/")
        print(stream_df.isStreaming)

        print(stream_df.printSchema())
        print(query)
        filt = stream_df.query

        write_stream_query = filt\
            .writeStream\
            .format("memory")\
            .outputMode("update")\
            .queryName("qcount1")\
            .trigger(processingTime="10 second")\
            .start()
        write_stream_query.awaitTermination(50)
        spark.sql("SELECT * from qcount1").show()


input_json_schema = StructType([
    StructField("id", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("def", StringType(), True), ])
inputformat = "json"

obj = streaming(inputformat, input_json_schema)
query=withColumn("contains_km", col("def").rlike("2km")).withColumn("ts", to_timestamp(col("timestamp"))).withColumn("week_num", weekofyear(col("ts"))).withColumn("month", month(col("ts"))).withColumn("year", year(col("ts"))).withColumn("hour", hour(col("ts"))).groupby("id", "contains_km", "month", "def", "ts").agg(count("contains_km").alias("countresult")).select("id", "def", "countresult", "month", "contains_km", "ts")
obj.connection(query)


What I have tried:

how to pass the parameters in oops I tried to pass It giving the error
NameError: name 'withColumn' is not defined
Posted
Updated 12-Jan-21 22:05pm

I don't know python. But withColumn is a function so you're missing something before that. Probably you need to query. before first withColumn. So make required changes.
query = query.withColumn("contains_km", col("def").rlike("2km")).withColumn("ts", to_timestamp(col("timestamp"))).withColumn("week_num", weekofyear(col("ts"))).withColumn("month", month(col("ts"))).withColumn("year", year(col("ts"))).withColumn("hour", hour(col("ts"))).groupby("id", "contains_km", "month", "def", "ts").agg(count("contains_km").alias("countresult")).select("id", "def", "countresult", "month", "contains_km", "ts")
 
Share this answer
 
I guess you copied this code from somewhere. See Spark DataFrame withColumn — SparkByExamples[^].
 
Share this answer
 

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900