[Spark] [python] 구조적 API 기본 연산
구조적 API 기본 연산
1) 스키마
-
DataFrame의 컬럼명과 데이터 타입을 결정
-
DataFrame의 컬럼명과 데이터 타입을 정의. 데이터를 추출(Extract), 변환(Transform), 적재(Load)를 수행하는 작업에 스파크를 사용한다면, 스파크를 정의해야 한다. 정의하지 않을 경우, 스키마 추론 과정에서 스키마를 임의로 결정할 수 있다.
-
스키마는 여러 개의 StructField 타입 필드로 구성된 StructType 객체
-
StructType(컬럼의 이름, 데이터 타입, 컬럼의 값이 null값일 수 있는지 지정 True/False 및 메타데이터 지정)
example_of_Schema = StructType([StructType(StructField("Specific_Column_name", StringType, true),
StructType(StructField("Specific_Column_name", LongType, true),...
)])
df = spark.read.format("json").schema(example_of_Schema).load("/data_path/file.json")
2) 컬럼
-
컬럼 생성 및 참조하기 위해서 Column/col 함수 사용
3) 로우
-
각 컬럼에 해당하는 값을 사용하여 Row 객체를 직접 생성 가능함
4) 데이터프레임
-
4-1) json/csv 파일을 불러와서 Dataframe을 생성하는 방법
-
createOrReplaceTempView : 데이터 프레임에 SQL 쿼리 실행 및 SQL 기본 트렌스포메이션 확인 위함. 스파크데이터프레임.createOrReplaceTempView('객체명')의 방식으로 사용함
df.createOrReplaceTempView("dfTable")
-
4-2) Row 객체를 가진 Seq 타입을 변환하여 DataFrame 생성하는 방법
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, SringType, LongType
myManualSchema = StructType([StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)])
myRow = Row("Cara", None, 20)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()
-
Dataframe 관련 함수 : SQL사용하는 함수 select, selectExpr(select함수에 expr함수를 같이 사용하는 경우)
-
4-3) 컬럼명 변경하기 : withColumnRenamed
- df.withColumnRenamed("변경_전_컬럼_네임", "변경_후_컬럼_네임").columns
-
4-4) 공백이나 하이픈(-)과 같은 문자는 컬럼명에 사용할 때 유의해야 한다. ' 을 이용해서 이스케이핑 해야 한다.
- dfwithlongcolname = df.withColumn("sample-name", expr("ORIGIN_COUNTRY_NAME")) dfwithlongcolname.select(expr("'sample-name'")).columns
-
4-4) 컬럼 제거하기
- dfwithlongcolname.drop("'sample-name'").columns
-
4-5) 컬럼의 데이터 타입 변경하기
#count의 컬럼을 string 형태의 데이터 타입으로 변환해서 count2에 저장한다.
df.withColumn("count2", col("count").cast("string"))
- 4-6) 로우 필터링 하기
df.filter(col("count")<2).show(2)
df.where("count<2").show(2)
df.where("count<2").where(col("column_name" != 'abc')
\.show(2)
- 4-6) 고유값 얻기
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()