av激情亚洲男人的天堂国语,日韩欧美精品一中文字幕,无码av一区二区三区无码,国产又色又爽又刺激的a片,国产又色又爽又刺激的a片

用Spark實現(xiàn)一列數(shù)據(jù)變成一條數(shù)據(jù)庫記錄(spark一列變一條數(shù)據(jù)庫)

在現(xiàn)代數(shù)據(jù)處理過程中,大規(guī)模數(shù)據(jù)的處理已經(jīng)成為了大數(shù)據(jù)領域的主要任務之一。Spark是當前比較流行的大數(shù)據(jù)處理框架之一,可以用于構建分布式數(shù)據(jù)處理系統(tǒng),具有可擴展性和性能等優(yōu)點。在現(xiàn)實生活中,我們經(jīng)常會遇到需要將一列數(shù)據(jù)轉換成一條數(shù)據(jù)庫記錄的情況,這時我們可以使用Spark來實現(xiàn)這個任務。

在保亭黎族等地區(qū),都構建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務理念,為客戶提供成都網(wǎng)站建設、成都做網(wǎng)站 網(wǎng)站設計制作按需求定制制作,公司網(wǎng)站建設,企業(yè)網(wǎng)站建設,成都品牌網(wǎng)站建設,營銷型網(wǎng)站建設,外貿(mào)網(wǎng)站建設,保亭黎族網(wǎng)站建設費用合理。

Spark是一個分布式計算框架,能夠?qū)Υ笠?guī)模的數(shù)據(jù)進行分布式處理。它的特點是高效、可擴展、易于使用,并能夠處理多種類型的數(shù)據(jù)。實現(xiàn)將一列數(shù)據(jù)轉換成一條數(shù)據(jù)記錄的過程,也可以稱之為“行轉列”操作,Spark提供了map、flatMap、reduce等操作,可以輕松地實現(xiàn)這個功能。

在具體實現(xiàn)中,我們可以使用Spark的分布式計算框架來實現(xiàn)將一列數(shù)據(jù)轉換成一條數(shù)據(jù)庫記錄。整個過程可以分為以下幾個步驟:

1. 數(shù)據(jù)預處理:需要對原始數(shù)據(jù)做一些預處理,比如去除頭部和尾部的空格,轉換編碼格式等。預處理數(shù)據(jù)可以使后續(xù)的處理步驟更加高效和準確。

2. 讀取數(shù)據(jù):Spark支持從多種數(shù)據(jù)源中讀取數(shù)據(jù),包括文本文件、壓縮文件、數(shù)據(jù)庫等。在讀取數(shù)據(jù)之前,需要確定數(shù)據(jù)的文件格式和數(shù)據(jù)編碼格式。

3. 轉換數(shù)據(jù):接下來,需要對讀取到的數(shù)據(jù)進行轉換操作。通常情況下,用逗號或制表符分隔的單行數(shù)據(jù)中,每個字段都是一組重復數(shù)據(jù),包括日期、地點、名稱等信息。我們需要將這些信息轉換成一條數(shù)據(jù)庫記錄。

4. 過濾數(shù)據(jù):在數(shù)據(jù)轉換過程中,需要根據(jù)實際業(yè)務需求過濾不符合條件的數(shù)據(jù)。比如可以使用map和filter操作,根據(jù)指定關鍵字篩選出需要的數(shù)據(jù)。

5. 寫入數(shù)據(jù):將轉換后的數(shù)據(jù)寫入數(shù)據(jù)庫中。Spark可以通過JDBC或ODBC驅(qū)動來向數(shù)據(jù)庫插入數(shù)據(jù)。

對于上述幾個步驟,我們可以使用如下偽代碼來實現(xiàn)將一列數(shù)據(jù)轉換成一條數(shù)據(jù)庫記錄:

“`python

# 定義讀取數(shù)據(jù)路徑

path = “data.csv”

# 讀取數(shù)據(jù)

data = spark.read.csv(path)

# 預處理數(shù)據(jù)

data = data.map(lambda line: line.strip().encode(‘utf-8’))

# 轉換數(shù)據(jù)

data = data.flatMap(lambda line: line.split(‘,’))

# 過濾數(shù)據(jù)

data = data.filter(lambda line: ‘keyword’ in line)

# 寫入數(shù)據(jù)庫

data.write.format(“jdbc”) \

.option(“url”, “jdbc:mysql://host:port/database”) \

.option(“driver”, “com.mysql.jdbc.Driver”) \

.option(“dbtable”, “tablename”) \

.option(“user”, “username”) \

.option(“password”, “password”) \

.mode(“append”) \

.save()

“`

在上述偽代碼中,我們使用Spark讀取了數(shù)據(jù)文件data.csv,對讀取到的數(shù)據(jù)做了預處理和轉換操作,并過濾出符合條件的數(shù)據(jù),最后將數(shù)據(jù)插入到MySQL數(shù)據(jù)庫中。

來說,使用Spark實現(xiàn)將一列數(shù)據(jù)轉換成一條數(shù)據(jù)庫記錄功能,需要經(jīng)過數(shù)據(jù)預處理、數(shù)據(jù)讀取、數(shù)據(jù)轉換、數(shù)據(jù)過濾和數(shù)據(jù)寫入等多個步驟。使用Spark的分布式計算框架結合JDBC或ODBC驅(qū)動,可以輕松地實現(xiàn)這個功能,并且支持多種數(shù)據(jù)源和數(shù)據(jù)格式。

相關問題拓展閱讀:

  • Spark SQL(十):Hive On Spark
  • 創(chuàng)建sparksqltable代碼

Spark SQL(十):Hive On Spark

Hive是目前大數(shù)據(jù)領域,事實上的SQL標準。其底層默認是基于MapReduce實現(xiàn)的,但是由于MapReduce速度實在比較慢,因此這幾年,陸續(xù)出來了新的SQL查詢引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。

Spark SQL與Hive On Spark是不一樣的。Spark SQL是Spark自己研發(fā)出來的針對各種數(shù)據(jù)源,包括Hive、ON、Parquet、JDBC、RDD等都可以執(zhí)行查詢的,一套基于Spark計算引擎的查詢引擎。因此它是Spark的一個項目,只不過提供了逗閉針對Hive執(zhí)行查詢的工功能而已,適合在一些使用Spark技術棧的大數(shù)據(jù)應用類系統(tǒng)中使用。

而Hive On Spark,是Hive的一個項目,它是將Spark作為底層的查詢引擎(不通過MapReduce作為唯一的查詢引擎)。Hive On Spark,只適用于Hive,在可預見的未來,很有可能Hive默認的底層引擎就從MapReduce切換為Spark了;適合于將原有早粗的Hive數(shù)據(jù)倉庫以及數(shù)據(jù)統(tǒng)計分析替山睜裂換為Spark引擎,作為全公司通用的大數(shù)據(jù)統(tǒng)計分析引擎。

Hive On Spark做了一些優(yōu)化:

1、Map Join

Spark SQL默認對join是支持使用broadcast機制將小表廣播到各個節(jié)點上,以進行join的。但是問題是,這會給Driver和Worker帶來很大的內(nèi)存開銷。因為廣播的數(shù)據(jù)要一直保留在Driver內(nèi)存中。所以目前采取的是,類似乎MapReduce的Distributed Cache機制,即提高HDFS replica factor的復制因子,以讓數(shù)據(jù)在每個計算節(jié)點上都有一個備份,從而可以在本地進行數(shù)據(jù)讀取。

2、Cache Table

對于某些需要對一張表執(zhí)行多次操作的場景,Hive On Spark內(nèi)部做了優(yōu)化,即將要多次操作的表cache到內(nèi)存中,以便于提升性能。但是這里要注意,并不是對所有的情況都會自動進行cache。所以說,Hive On Spark還有很多不完善的地方。

Hive QL語句 =>

語法分析 => AST =>

生成邏輯執(zhí)行計劃 => Operator Tree =>

優(yōu)化邏輯執(zhí)行計劃 => Optimized Operator Tree =>

生成物理執(zhí)行計劃 => Task Tree =>

優(yōu)化物理執(zhí)行計劃 => Optimized Task Tree =>

執(zhí)行優(yōu)化后的Optimized Task Tree

創(chuàng)建sparksqltable代碼

SQLContext具體的執(zhí)行過程告友如下:

(1)SQL | HQL語句經(jīng)過SqlParse解析成UnresolvedLogicalPlan。

(2)使用yzer結合數(shù)據(jù)字典(catalog)進行綁定,生成resolvedLogicalPlan,在這個過程中,Catalog提取出SchemRDD,并注冊類似case class的對象,然后把表注冊進內(nèi)存中。

(3)Analyzed Logical Plan經(jīng)過Catalyst Optimizer優(yōu)化器優(yōu)化處理后,生成Optimized Logical Plan,該過程完成以后,以下的部分在Spark core中完成。

(4)Optimized Logical Plan的結果交給SparkPlanner,然后SparkPlanner處理后交給PhysicalPlan,經(jīng)過該過程后生成Spark Plan。

(5)使用SparkPlan將LogicalPlan轉換成PhysicalPlan。

(6)使用prepareForExecution()將PhysicalPlan轉換成可執(zhí)行物理計滾友搜劃。

(7)使用execute()執(zhí)行可執(zhí)行物理計劃。

(8)生成DataFrame。

登錄后復制

在整個運行過程中涉及到多個SparkSQL的組件,如SqlParse、yzer、optimizer、SparkPlan等等

某電商平臺,需要對訂單數(shù)據(jù)進行分析,已知訂單數(shù)據(jù)包括兩個文件,分別為訂單數(shù)據(jù)orders和訂單明細數(shù)據(jù)order_items,orders記錄了用戶購買商品的訂單ID,訂單號,用戶ID及下單日期。order_items記錄了商品ID,訂單ID以及明細ID。它們的結構與關系如下圖所示:

orders表:(order_id,order_number,buyer_id,create_dt)

訂單大歷ID 訂單號用戶ID 下單日期

-15 04:58:21

-15 04:45:31

-15 03:12:23

-15 02:37:32

-15 02:18:56

-15 01:33:46

-15 01:04:41

-15 01:02:20

-15 00:38:02

-15 00:18:43

order_items表:(item_id,order_id,goods_id )

明細ID 訂單ID 商品ID

登錄后復制

?

創(chuàng)建orders表和order_items表,并統(tǒng)計該電商網(wǎng)站都有哪些用戶購買了什么商品。

操作

在spark-shell下,使用case class方式定義RDD,創(chuàng)建orders表

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._

case class Orders(order_id:String,order_number:String,buyer_id:String,create_dt:String)

val dforders = sc.textFile(“/myspark5/orders”).map(_.split(‘\t’)).map(line=>Orders(line(0),line(1),line(2),line(3))).toDF()

dforders.registerTempTable(“orders”)

登錄后復制

驗證創(chuàng)建的表是否成功。

sqlContext.sql(“show tables”).map(t=>”tableName is:”+t(0)).collect().foreach(println)

sqlContext.sql(“select order_id,buyer_id from orders”).collect

登錄后復制

在Spark Shell下,使用applyScheme方式定義RDD,創(chuàng)建order_items表。

import org.apache.spark.sql._

import org.apache.spark.sql.types._

val rddorder_items = sc.textFile(“/myspark5/order_items”)

val roworder_items = rddorder_items.map(_.split(“\t”)).map( p=>Row(p(0),p(1),p(2) ) )

val schemaorder_items = “item_id order_id goods_id”

val schema = StructType(schemaorder_items.split(” “).map(fieldName=>StructField(fieldName,StringType,true)) )

val dforder_items = sqlContext.applySchema(roworder_items, schema)

dforder_items.registerTempTable(“order_items”)

登錄后復制

驗證創(chuàng)建表是否成功

sqlContext.sql(“show tables”).map(t=>”tableName is:”+t(0)).collect().foreach(println)

sqlContext.sql(“select order_id,goods_id from order_items “).collect

登錄后復制

將order表及order_items表進行join操作,統(tǒng)計該電商網(wǎng)站,都有哪些用戶購買了什么商品

sqlContext.sql(“select orders.buyer_id, order_items.goods_id from order_items join orders on order_items.order_id=orders.order_id “).collect

登錄后復制

Spark SQL

spark-sql

創(chuàng)建表orders及表order_items。

create table orders (order_id string,order_number string,buyer_id string,create_dt string)

row format delimited fields terminated by ‘\t’ stored as textfile;

create table order_items(item_id string,order_id string,goods_id string)

row format delimited fields terminated by ‘\t’ stored as textfile;

登錄后復制

查看已創(chuàng)建的表。

show tables;

登錄后復制

表名后的false意思是該表不是臨時表。

將HDFS中/myspark5下的orders表和order_items表中數(shù)據(jù)加載進剛創(chuàng)建的兩個表中。

load data inpath ‘/myspark5/orders’ into table orders;

load data inpath ‘/myspark5/order_items’ into table order_items;

登錄后復制

14.驗證數(shù)據(jù)是否加載成功。

select * from orders;

select * from order_items;

登錄后復制

15.處理文件,將order表及order_items表進行join操作,統(tǒng)計該電商網(wǎng)站,都有哪些用戶購買了什么商品。

spark 一列變一條數(shù)據(jù)庫的介紹就聊到這里吧,感謝你花時間閱讀本站內(nèi)容,更多關于spark 一列變一條數(shù)據(jù)庫,用Spark實現(xiàn)一列數(shù)據(jù)變成一條數(shù)據(jù)庫記錄,Spark SQL(十):Hive On Spark,創(chuàng)建sparksqltable代碼的信息別忘了在本站進行查找喔。

香港服務器選創(chuàng)新互聯(lián),2H2G首月10元開通。
創(chuàng)新互聯(lián)(www.cdcxhl.com)互聯(lián)網(wǎng)服務提供商,擁有超過10年的服務器租用、服務器托管、云服務器、虛擬主機、網(wǎng)站系統(tǒng)開發(fā)經(jīng)驗。專業(yè)提供云主機、虛擬主機、域名注冊、VPS主機、云服務器、香港云服務器、免備案服務器等。


分享題目:用Spark實現(xiàn)一列數(shù)據(jù)變成一條數(shù)據(jù)庫記錄(spark一列變一條數(shù)據(jù)庫)
網(wǎng)址分享:http://uogjgqi.cn/article/djcsjeg.html
掃二維碼與項目經(jīng)理溝通

我們在微信上24小時期待你的聲音

解答本文疑問/技術咨詢/運營咨詢/技術建議/互聯(lián)網(wǎng)交流