티스토리 뷰

데이터 프레임 작업의 대부분은 데이터를 조건에 따라 빠르게 필터링하는 것에 달려있다할 수 있다.

 

.filter()

  • 데이터의 값들을 조건에 따라 필터링을 할 수 있다.
# 애플 주식 관련 값 로딩
df = spark.read.csv('./appl_stock.csv', inferSchema=True, header=True)

# 데이터 프레임 확인
df.show()

'''
result

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
..
'''

# 시퀄 스타일
df.filter("Close < 500").select(['Open', 'Close']).show()

# 데이터프레임 스타일
df.filter(df['Close'] < 500).select('Volume').show()

# 종착 가격이 200달러 미만이며, 시작 가격이 200달러를 초과하는 데이터
df.filter((df['Open'] > 200) & (df['Close'] < 200)).show()

# 가격이 가장 낮았던 날 - 197.16 달러이던 날
# show 대신 collect를 쓰면 실제 로우 오브젝트가 반환됨
result = df.filter(df['Low'] == 197.16).collect()

# 사전으로 바꾸기
row = result[0]
row.asDict()
'''
result

{'Date': datetime.date(2010, 1, 22),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}
'''

 

 

 

집합 함수

  • 여러 데이터 행들을 하나의 아웃풋으로 합치거나 집합하는 것

.groupBy(column_name_str)

  • 열의 특정 값에 기반해서 행들을 그룹화
  • 그룹화된 데이터 오브젝트 그룹이 리턴됨
  • 메모리상의 위치를 보여줌
df = spark.read.csv('./sales_info.csv', inferSchema=True, header=True)

df.show()
'''
+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
.
.
.
'''

df.groupBy("Company")
# result
# GroupedData[grouping expressions: [Company], value: [Company: string, Person: string ... 1 more field], type: GroupBy]

df.groupBy("Company").mean().show()
'''
result

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+
'''

.agg()

  • groupby대신 그냥 전체의 집계를 알고 싶거나 할 경우에 사용
# 매출을 groupby 하는게 아니라 집합
df.agg({'Sales': 'sum'}).show()
'''
+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+
'''

 

스파크에서 함수 불러오기 & alias

  • alias(): 컬럼명을 내가 원하는대로 명명할 수 있음
from pyspark.sql.functions import countDistinct, avg, stddev

# 선택 불러오기로 합칠 수 있다!
# count distinct : 변수상의 유니크한 갯수를 불러온다
# 즉, Sales의 유일한 값들을 불러온다
df.select(countDistinct('Sales').alias('Average Sales')).show()

 

데이터 포맷하기

from pyspark.sql.functions import format_number

# 원본
df.select(stddev('Sales')).show()
'''
+------------------+
|     stddev(Sales)|
+------------------+
|250.08742410799007|
+------------------+
'''

df.select(stddev('Sales').alias('std')).show()
'''
+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+
'''

sales_std.select(format_number('std', 2).alias('std')).show() # 소수점 2자리까지만
'''
사실 alias는 한번만 해도 되지만 그러면 지저분해지므로 두번 해준다.
+------+
|   std|
+------+
|250.09|
+------+
'''

 

정렬하기

df.orderBy("Sales").show()
'''
+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+
'''

# 내림차순
# 오름차순이 기본이므로 오름차순은 굳이 asc 같은거 안 써도 된다.
df.orderBy(df['Sales'].desc()).show()
반응형
댓글