概述
流數(shù)據(jù)是一個(gè)在機(jī)器學(xué)習(xí)領(lǐng)域蓬勃發(fā)展的概念
學(xué)習(xí)如何使用PySpark來利用機(jī)器學(xué)習(xí)模型對流數(shù)據(jù)進(jìn)行預(yù)測
我們將介紹流數(shù)據(jù)和Spark Streaming的基礎(chǔ)知識,然后深入到實(shí)現(xiàn)部分
引言
想象一下——每一秒都有8,500多條推文發(fā)布,900多張照片被上傳到Instagram,4,200多個(gè)Skype呼叫,78,000多次Google搜索,以及200多萬封電子郵件被發(fā)送(數(shù)據(jù)來自InternetLive Stats)。
我們正在以前所未有的速度和規(guī)模生產(chǎn)數(shù)據(jù)。這是在數(shù)據(jù)科學(xué)領(lǐng)域工作的大好時(shí)候!但是有了大量的數(shù)據(jù)后,接踵而至的是復(fù)雜的挑戰(zhàn)。
首要,如何收集大規(guī)模的數(shù)據(jù)?如何確保一旦生成并收集數(shù)據(jù),機(jī)器學(xué)習(xí)管道就會繼續(xù)產(chǎn)生結(jié)果?這些都是業(yè)界面臨的重大挑戰(zhàn),以及為什么流數(shù)據(jù)的概念在企業(yè)中越來越受到關(guān)注。
增加處理流數(shù)據(jù)的能力將極大地?cái)U(kuò)展當(dāng)前的數(shù)據(jù)科學(xué)產(chǎn)品投資組合。這是業(yè)界急需的技能,若能熟練掌握它,將幫助你擔(dān)負(fù)起下一個(gè)數(shù)據(jù)科學(xué)角色。
因此,在本文中,我們將學(xué)習(xí)什么是流數(shù)據(jù),了解Spark Streaming的基礎(chǔ)知識,然后在一個(gè)業(yè)界相關(guān)的數(shù)據(jù)集上使用Spark實(shí)現(xiàn)流數(shù)據(jù)。
什么是流數(shù)據(jù)?
社交媒體產(chǎn)生的數(shù)據(jù)是驚人的。你敢于想象存儲所有數(shù)據(jù)需要些什么嗎?這是一個(gè)復(fù)雜的過程!因此,在深入探討本文的Spark方面之前,先來理解什么是流數(shù)據(jù)。
流數(shù)據(jù)沒有離散的開始或結(jié)束。這些數(shù)據(jù)是每秒從數(shù)千個(gè)數(shù)據(jù)源中生成的,它們需要盡快進(jìn)行處理和分析。大量流數(shù)據(jù)需要實(shí)時(shí)處理,例如Google搜索結(jié)果。
我們知道,在事件剛發(fā)生時(shí)一些見解會更有價(jià)值,而隨著時(shí)間的流逝它們會逐漸失去價(jià)值。以體育賽事為例——我們希望看到即時(shí)分析,即時(shí)統(tǒng)計(jì)見解,在那一刻真正享受比賽,對吧?
例如,假設(shè)你正在觀看一場羅杰·費(fèi)德勒(Roger Federer)對戰(zhàn)諾瓦克·喬科維奇(Novak Djokovic)的激動人心的網(wǎng)球比賽。
這場比賽兩局打平,你想了解與費(fèi)德勒的職業(yè)平均水平相比,其反手發(fā)球的百分比。是在幾天之后看到有意義,還是在決勝局開始前的那一刻看到有意義呢?
Spark Streaming的基礎(chǔ)知識
Spark Streaming是核心Spark API的擴(kuò)展,可實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)流的可伸縮和容錯(cuò)流處理。
在轉(zhuǎn)到實(shí)現(xiàn)部分之前,先了解一下Spark Streaming的不同組成部分。
離散流
離散流(Dstream)是一個(gè)連續(xù)的數(shù)據(jù)流。對于離散流,其數(shù)據(jù)流可以直接從數(shù)據(jù)源接收,也可以在對原始數(shù)據(jù)進(jìn)行一些處理后接收。
構(gòu)建流應(yīng)用程序的第一步是定義要從中收集數(shù)據(jù)的數(shù)據(jù)資源的批處理持續(xù)時(shí)間。如果批處理持續(xù)時(shí)間為2秒,則將每2秒收集一次數(shù)據(jù)并將其存儲在RDD中。這些RDD的連續(xù)序列鏈?zhǔn)且粋€(gè)DStream,它是不可變的,可以通過Spark用作一個(gè)分布式數(shù)據(jù)集。
考慮一個(gè)典型的數(shù)據(jù)科學(xué)項(xiàng)目。在數(shù)據(jù)預(yù)處理階段,我們需要轉(zhuǎn)換變量,包括將分類變量轉(zhuǎn)換為數(shù)字變量,創(chuàng)建分箱,去除異常值和很多其他的事。Spark保留了在數(shù)據(jù)上定義的所有轉(zhuǎn)換的歷史記錄。因此,無論何時(shí)發(fā)生故障,它都可以追溯轉(zhuǎn)換的路徑并重新生成計(jì)算結(jié)果。
我們希望Spark應(yīng)用程序7 x 24小時(shí)持續(xù)運(yùn)行。并且每當(dāng)故障發(fā)生時(shí),我們都希望它能盡快恢復(fù)。但是,在大規(guī)模處理數(shù)據(jù)的同時(shí),Spark需要重新計(jì)算所有轉(zhuǎn)換以防出現(xiàn)故障??梢韵胂?,這樣做的代價(jià)可能會非常昂貴。
緩存
這是應(yīng)對該挑戰(zhàn)的一種方法。我們可以暫時(shí)存儲已計(jì)算(緩存)的結(jié)果,以維護(hù)在數(shù)據(jù)上定義的轉(zhuǎn)換的結(jié)果。這樣,當(dāng)發(fā)生故障時(shí),就不必一次又一次地重新計(jì)算這些轉(zhuǎn)換。
DStreams允許將流數(shù)據(jù)保留在內(nèi)存中。當(dāng)我們要對同一數(shù)據(jù)執(zhí)行多種運(yùn)算時(shí),這很有用。
檢查點(diǎn)
高速緩存在正常使用時(shí)非常有用,但是它需要大量內(nèi)存。并不是每個(gè)人都有數(shù)百臺具有128 GB內(nèi)存的計(jì)算機(jī)來緩存所有內(nèi)容。
檢查點(diǎn)的概念能夠有所幫助。
檢查點(diǎn)是另一種保留轉(zhuǎn)換后的數(shù)據(jù)框結(jié)果的技術(shù)。它將不時(shí)地將正在運(yùn)行的應(yīng)用程序的狀態(tài)保存在任何可靠的存儲介質(zhì)(如HDFS)上。但是,它比緩存慢,靈活性也更差。
在擁有流數(shù)據(jù)時(shí)可以使用檢查點(diǎn)。轉(zhuǎn)換結(jié)果取決于先前的轉(zhuǎn)換結(jié)果,并且需要保存以供使用。此外,我們還存儲檢查點(diǎn)元數(shù)據(jù)信息,例如用于創(chuàng)建流數(shù)據(jù)的配置以及一系列DStream操作的結(jié)果等。
流數(shù)據(jù)的共享變量
有時(shí)候需要為必須在多個(gè)集群上執(zhí)行的Spark應(yīng)用程序定義諸如map,reduce或filter之類的函數(shù)。在函數(shù)中使用的變量會被復(fù)制到每臺機(jī)器(集群)中。
在這種情況下,每個(gè)集群都有一個(gè)不同的執(zhí)行器,我們想要一些可以賦予這些變量之間關(guān)系的東西。
例如:假設(shè)Spark應(yīng)用程序在100個(gè)不同的集群上運(yùn)行,它們捕獲了來自不同國家的人發(fā)布的Instagram圖片。
現(xiàn)在,每個(gè)集群的執(zhí)行者將計(jì)算該特定集群上的數(shù)據(jù)的結(jié)果。但是我們需要一些幫助這些集群進(jìn)行交流的東西,以便獲得匯總結(jié)果。在Spark中,我們擁有共享變量,這些變量使此問題得以克服。
累加器變量
用例包括發(fā)生錯(cuò)誤的次數(shù),空白日志的數(shù)量,我們從特定國家收到請求的次數(shù)——所有這些都可以使用累加器解決。
每個(gè)集群上的執(zhí)行程序?qū)?shù)據(jù)發(fā)送回驅(qū)動程序進(jìn)程,以更新累加器變量的值。 累加器僅適用于關(guān)聯(lián)和可交換的運(yùn)算。例如,對求和和求最大值有用,而求平均值不起作用。
廣播變量
當(dāng)我們使用位置數(shù)據(jù)(例如城市名稱和郵政編碼的映射)時(shí),這些是固定變量,是吧?現(xiàn)在,如果每次在任意集群上的特定轉(zhuǎn)換都需要這種類型的數(shù)據(jù),我們不需要向驅(qū)動程序發(fā)送請求,因?yàn)樗鼤嘿F。
相反,可以在每個(gè)集群上存儲此數(shù)據(jù)的副本。這些類型的變量稱為廣播變量。
廣播變量允許程序員在每臺計(jì)算機(jī)上保留一個(gè)只讀變量。通常,Spark使用高效的廣播算法自動分配廣播變量,但是如果有任務(wù)需要多個(gè)階段的相同數(shù)據(jù),也可以定義它們。
使用PySpark對流數(shù)據(jù)進(jìn)行情感分析
是時(shí)候啟動你最喜歡的IDE了!讓我們在本節(jié)中進(jìn)行編碼,并以實(shí)踐的方式理解流數(shù)據(jù)。
理解問題陳述
在本節(jié)我們將使用真實(shí)數(shù)據(jù)集。我們的目標(biāo)是檢測推文中的仇恨言論。為了簡單起見,如果一條推文包含帶有種族主義或性別歧視情緒的言論,我們就認(rèn)為該推文包含仇恨言論。
因此,任務(wù)是將種族主義或性別歧視的推文從其他推文中區(qū)分出來。我們將使用包含推文和標(biāo)簽的訓(xùn)練樣本,其中標(biāo)簽“1”表示推文是種族主義/性別歧視的,標(biāo)簽“0”則表示其他種類。
為什么這是一個(gè)與主題相關(guān)的項(xiàng)目?因?yàn)樯缃幻襟w平臺以評論和狀態(tài)更新的形式接收龐大的流數(shù)據(jù)。該項(xiàng)目將幫助我們審核公開發(fā)布的內(nèi)容。
設(shè)置項(xiàng)目工作流程
1. 模型構(gòu)建:構(gòu)建邏輯回歸模型管道,對推文中是否包含仇恨言論進(jìn)行分類。在這里,我們的重點(diǎn)不是建立一個(gè)完全準(zhǔn)確的分類模型,而是了解如何在流數(shù)據(jù)上使用任意模型并返回結(jié)果
2. 初始化Spark Streaming的環(huán)境:一旦模型構(gòu)建完成,需要定義獲取流數(shù)據(jù)的主機(jī)名和端口號
3. 流數(shù)據(jù):接下來,從定義的端口添加來自netcat服務(wù)器的推文,SparkStreaming API將在指定的持續(xù)時(shí)間后接收數(shù)據(jù)
4. 預(yù)測并返回結(jié)果:一旦接收到推文,就將數(shù)據(jù)傳遞到創(chuàng)建的機(jī)器學(xué)習(xí)管道中,并從模型中返回預(yù)測的情緒
這是對工作流程的簡潔說明:
訓(xùn)練數(shù)據(jù)以建立邏輯回歸模型
我們在一個(gè)CSV文件中存儲推文數(shù)據(jù)及其相應(yīng)的標(biāo)簽。使用邏輯回歸模型來預(yù)測推文是否包含仇恨言論。如果是,則模型預(yù)測標(biāo)簽為1(否則為0)。你可以參考“面向初學(xué)者的PySpark”來設(shè)置Spark環(huán)境。
可以在這里下載數(shù)據(jù)集和代碼。
首先,需要定義CSV文件的模式。否則,Spark會將每列數(shù)據(jù)的類型都視為字符串。讀取數(shù)據(jù)并檢查模式是否符合定義:
# importing required libraries
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
# initializing spark session
sc = SparkContext(appName=“PySparkShell”)
spark = SparkSession(sc)
# define the schema
my_schema = tp.StructType([
tp.StructField(name=‘id’, dataType= tp.IntegerType(), nullable=True),
tp.StructField(name=‘label’, dataType= tp.IntegerType(), nullable=True),
tp.StructField(name=‘tweet’, dataType= tp.StringType(), nullable=True)
])
# read the dataset
my_data = spark.read.csv(‘twitter_sentiments.csv’,
schema=my_schema,
header=True)
# view the data
my_data.show(5)
# print the schema of the file
my_data.printSchema()
定義機(jī)器學(xué)習(xí)管道的各個(gè)階段
現(xiàn)在已經(jīng)將數(shù)據(jù)保存在Spark數(shù)據(jù)框中,需要定義轉(zhuǎn)換數(shù)據(jù)的不同階段,然后使用它從模型中獲取預(yù)測的標(biāo)簽。
在第一階段,使用RegexTokenizer將推特文本轉(zhuǎn)換為單詞列表。然后,從單詞列表中刪除停用詞并創(chuàng)建詞向量。在最后階段,使用這些詞向量來構(gòu)建邏輯回歸模型并獲得預(yù)測的情緒。
記住——重點(diǎn)不是建立一個(gè)完全準(zhǔn)確的分類模型,而是要看看如何在流數(shù)據(jù)上使用預(yù)測模型來獲取結(jié)果。
# define stage 1: tokenize the tweet text
stage_1 = RegexTokenizer(inputCol=‘tweet’ , outputCol=‘tokens’, pattern=‘\\W’)
# define stage 2: remove the stop words
stage_2 = StopWordsRemover(inputCol=‘tokens’, outputCol=‘filtered_words’)
# define stage 3: create a word vector of the size 100
stage_3 = Word2Vec(inputCol=‘filtered_words’, outputCol=‘vector’, vectorSize=100)
# define stage 4: Logistic Regression Model
model = LogisticRegression(featuresCol=‘vector’, labelCol=‘label’)
設(shè)置機(jī)器學(xué)習(xí)管道
讓我們在Pipeline對象中添加階段,然后按順序執(zhí)行這些轉(zhuǎn)換。用訓(xùn)練數(shù)據(jù)集擬合管道,現(xiàn)在,每當(dāng)有了新的推文,只需要將其傳遞給管道對象并轉(zhuǎn)換數(shù)據(jù)即可獲取預(yù)測:
# setup the pipeline
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])
# fit the pipeline model with the training data
pipelineFit = pipeline.fit(my_data)
流數(shù)據(jù)和返回結(jié)果
假設(shè)每秒收到數(shù)百條評論,我們希望通過阻止用戶發(fā)布仇恨言論來保持平臺整潔。因此,每當(dāng)我們收到新文本,都會將其傳遞到管道中并獲得預(yù)測的情緒。
我們將定義一個(gè)函數(shù)get_prediction,該函數(shù)將刪除空白句子并創(chuàng)建一個(gè)數(shù)據(jù)框,其中每一行都包含一條推文。
初始化Spark Streaming的環(huán)境并定義3秒的批處理持續(xù)時(shí)間。這意味著我們將對每3秒收到的數(shù)據(jù)進(jìn)行預(yù)測:
# define a function to compute sentiments of the received tweets
defget_prediction(tweet_text):
try:
# filter the tweets whose length is greater than 0
tweet_text = tweet_text.filter(lambda x: len(x) 》0)
# create a dataframe with column name ‘tweet’ and each row will contain the tweet
rowRdd = tweet_text.map(lambda w: Row(tweet=w))
# create a spark dataframe
wordsDataFrame = spark.createDataFrame(rowRdd)
# transform the data using the pipeline and get the predicted sentiment
pipelineFit.transform(wordsDataFrame).select(‘tweet’,‘prediction’).show()
except :
print(‘No data’)
# initialize the streaming context
ssc = StreamingContext(sc, batchDuration=3)
# Create a DStream that will connect to hostname:port, like localhost:9991
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
# split the tweet text by a keyword ‘TWEET_APP’ so that we can identify which set of words is from a single tweet
words = lines.flatMap(lambda line : line.split(‘TWEET_APP’))
# get the predicted sentiments for the tweets received
words.foreachRDD(get_prediction)
# Start the computation
ssc.start()
# Wait for the computation to terminate
ssc.awaitTermination()
在一個(gè)終端上運(yùn)行該程序,然后使用Netcat(用于將數(shù)據(jù)發(fā)送到定義的主機(jī)名和端口號的實(shí)用工具)。你可以使用以下命令啟動TCP連接:
nc -lk port_number
最后,在第二個(gè)終端中鍵入文本,你將在另一個(gè)終端中實(shí)時(shí)獲得預(yù)測。
完美!
結(jié)語
流數(shù)據(jù)在未來幾年只會越來越熱門,因此應(yīng)該真正開始熟悉這一主題。請記住,數(shù)據(jù)科學(xué)不只是建立模型——整個(gè)流程都需要關(guān)注。
本文介紹了SparkStreaming的基礎(chǔ)知識以及如何在真實(shí)的數(shù)據(jù)集上實(shí)現(xiàn)它。我鼓勵大家使用另一個(gè)數(shù)據(jù)集或抓取實(shí)時(shí)數(shù)據(jù)來實(shí)現(xiàn)剛剛介紹的內(nèi)容(你也可以嘗試其他模型)。
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
6760瀏覽量
88619 -
機(jī)器學(xué)習(xí)
+關(guān)注
關(guān)注
66文章
8323瀏覽量
132165
發(fā)布評論請先 登錄
相關(guān)推薦
評論