ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spark] [python] 구조적 API 기본 연산
    Data miner/Development log 2021. 3. 1. 17:32
    728x90

    구조적 API 기본 연산

     

    1) 스키마 

    • DataFrame의 컬럼명과 데이터 타입을 결정

    • DataFrame의 컬럼명과 데이터 타입을 정의. 데이터를 추출(Extract), 변환(Transform), 적재(Load)를 수행하는 작업에 스파크를 사용한다면, 스파크를 정의해야 한다. 정의하지 않을 경우, 스키마 추론 과정에서 스키마를 임의로 결정할 수 있다. 

    • 스키마는 여러 개의 StructField 타입 필드로 구성된 StructType 객체

    • StructType(컬럼의 이름, 데이터 타입, 컬럼의 값이 null값일 수 있는지 지정 True/False 및 메타데이터 지정)

    example_of_Schema = StructType([StructType(StructField("Specific_Column_name", StringType, true), 
    StructType(StructField("Specific_Column_name", LongType, true),...
    )])
    
    df = spark.read.format("json").schema(example_of_Schema).load("/data_path/file.json")

     

     

    2) 컬럼

    • 컬럼 생성 및 참조하기 위해서 Column/col 함수 사용

     

    3) 로우

    • 각 컬럼에 해당하는 값을 사용하여 Row 객체를 직접 생성 가능함

     

    4) 데이터프레임

    • 4-1) json/csv 파일을 불러와서 Dataframe을 생성하는 방법

    • createOrReplaceTempView : 데이터 프레임에 SQL 쿼리 실행 및 SQL 기본 트렌스포메이션 확인 위함. 스파크데이터프레임.createOrReplaceTempView('객체명')의 방식으로 사용함

    df.createOrReplaceTempView("dfTable")

     

    • 4-2) Row 객체를 가진 Seq 타입을 변환하여 DataFrame 생성하는 방법

    from pyspark.sql import Row
    from pyspark.sql.types import StructField, StructType, SringType, LongType
    
    myManualSchema = StructType([StructField("some", StringType(), True),
    StructField("col", StringType(), True),
    StructField("names", LongType(), False)])
    
    myRow = Row("Cara", None, 20)
    myDf = spark.createDataFrame([myRow], myManualSchema)
    myDf.show()

     

      • Dataframe 관련 함수 : SQL사용하는 함수 select, selectExpr(select함수에 expr함수를 같이 사용하는 경우) 

      • 4-3) 컬럼명 변경하기 : withColumnRenamed 

      • df.withColumnRenamed("변경_전_컬럼_네임", "변경_후_컬럼_네임").columns
      • 4-4) 공백이나 하이픈(-)과 같은 문자는 컬럼명에 사용할 때 유의해야 한다. ' 을 이용해서 이스케이핑 해야 한다. 

      • dfwithlongcolname = df.withColumn("sample-name", expr("ORIGIN_COUNTRY_NAME")) dfwithlongcolname.select(expr("'sample-name'")).columns
      • 4-4) 컬럼 제거하기 

      • dfwithlongcolname.drop("'sample-name'").columns
    •  4-5) 컬럼의 데이터 타입 변경하기 

    #count의 컬럼을 string 형태의 데이터 타입으로 변환해서 count2에 저장한다. 
    
    df.withColumn("count2", col("count").cast("string")) 

     

    • 4-6) 로우 필터링 하기
    df.filter(col("count")<2).show(2)
    df.where("count<2").show(2)
    
    df.where("count<2").where(col("column_name" != 'abc')
    \.show(2)
    

     

    • 4-6) 고유값 얻기
    df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

     

Designed by Tistory.