本文共 1992 字,大约阅读时间需要 6 分钟。
RDD转DFtoDF 转为 DataFrame, toDF 的参数是 String 类型的变长参数使用名称数组使用 createDataFrame 创建 指定文件类型创建DF指定文件类型创建DFspark 表示sparkSession,因为J son 自带属性名和属性值,可直接获得 schema指定 schema 创建 dfs对于 csv,txt 格式,可以指定分隔符和 schemaDF的特征方法df.printSchema 显示 schema
1、selectimport spark.implicits._df1.select($"name", $"age"+1).show2、where 和 filterdf.select($"name", $"age").filter($"age">20).show3、groupBydf.select("age").filter($"age">20).groupBy("age").count.show4、orderBy5、head, take,first6、join更多种 join 内容,参见 https://blog.csdn.net/timothyzh/article/details/89919157
DF使用sql 语句,需要创建一个临时视图1、临时视图df1.createOrReplaceTempView("people")spark.sql("select age from people where age is not null group by age").show2、全局临时视图
df.createGlobalTempView("people")spark.newSession().sql("SELECT * FROM global_temp.people").show()全局临时视图可跨session使用,保存在数据库global_temp中
count_distinct 近似值avg mean 平均值collect_list 聚合指定字段的值到 listcollect_set 聚合指定字段的值到 setcountDistinct 去重计数select count(distinct class) 去重计数first 分组第一个元素last 分组最后一个元素grouping sets 等同于group by A,B union group by A,Cmax 最大值min 最小值sum 求和sumDistinctselect sum(distinct class) 非重复值求和
concat(exprs: Column*) 连接多列字符串initcap(e: Column) 单词首字母大写upper(e: Column) 转大写instr(str: Column, substring: String) substring 在 str 中第一次出现的位置length(e: Column) 字符串长度locate(substr: String, str: Column, pos: Int): substring 在 str 中第一次出现位置, 0 表示未找到,从pos 位置后查找trim(e: Column): 剪掉左右两边的空格、空白字符regexp_replace(e: Column, pattern: Column, replacement: Column): 正则替换匹配的部 分,这里参数为列reverse(str: Column) 将 str 反转
窗口函数在是聚合函数后面使用over()标记要使用窗口功能,填写partition by指明分区,使用order by指明排序,over函数外面指明需要返回哪些值rank() 排名,返回数据项在分组中的排名,排名相等会在名次中留下空位 1,2,2,4dense_rank() 排名,返回数据项在分组中的排名,排名相等会在名次中不会留下空位row_number(): 行号,为每条记录返回一个数字 1,2,3,4spark.sql("select name,age rank() over(partition by name order by age) as rank from global_temp.Human").show
二者关系如下图,DataSet 不如 DataFrame 灵活DataSet 是数据类型安全的类。 DataFrame 没有类型,只有字段的定义RDD调用toDS转为DataSet类型