hadoop實例python
Ⅰ 如何使用python為Hadoop編寫一個簡單的MapRece程序
使用Python編寫MapRece代碼技巧於我使用 HadoopStreaming 幫助螞賀我Map Rece間傳遞數據通STDIN (標首型准輸入)STDOUT (標准輸).我僅僅使用Pythonsys.stdin輸入悶芹派數據使用sys.stdout輸數據做HadoopStreaming幫我辦其事真別相信!
Ⅱ hadoop用python寫的Map部分哪裡有問題啊
這個item.txt和'user_profile.txt'是什麼文件?
如果是數據文件那應該放到HDFS上,或者自己實現inputformat來提供訪問方式。程序中從標准輸入獲取數據。
如果是運行中的一些參數信息,那應該使用-files選項讓Hadoop框架幫你把文件發送到目標機器上,和maprece的jar包放到相同的臨時目錄下,你才能找到。-files要加在前面,例如:
hadoop jar \$HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar -files item.txt -mapper ./python/map.py -recer ./python/rece.py -input /home/hadoop/hello -output /home/hadoop/outpath
如果保證每台主機的相同路徑下都存在這個文件,也可以使用絕對路徑。
命令寫的也有問題,沒有指定輸入輸出目錄。
hadoop jar \$HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar -mapper ./python/map.py -recer ./python/rece.py -input /home/hadoop/hello -output /home/hadoop/outpath
其中輸出路徑/home/hadoop/outpath需要是一個之前不存在的路徑,執行maprece的時候會校驗並創建。
Ⅲ hadoop2.6.0下運行Python出現 subprocess failed with code 2
hadoop streaming pyhton 時候出唯伏顫現廳隱:subprocess failed with code 127,code 後面錯誤碼不管是什麼,都是 腳本環境的問題
在python腳本裡面聲明的:#!/usr/bin/env python,這里聲明指敗了位置,所以必須要放到 /usr/bin 下面,才能有效。
python 執行程序必須放到 /usr/bin 下面,在環境變數的其他位置不管用。
Ⅳ 如何使用Python為Hadoop編寫一個簡單的MapRece程序
轉載:我們將編寫一個簡單的 MapRece 程序,使用的是C-Python,而不是Jython編寫後打包成jar包的程序。
我們的這個例子將模仿 WordCount 並使用Python來實現,例子通過讀取文本文件來統計出單詞的出現次數。結果也以文本形式輸出,每一行包含一個單詞和單詞出現的次數,兩者中間使用製表符來想間隔。
先決條件
編寫這個程序之前,你學要架設好Hadoop 集群,這樣才能不會在後期工作抓瞎。如果你沒有架設好,那麼在後面有個簡明教程來教你在Ubuntu linux 上搭建(同樣適用於其他發行版linux、unix)
如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立單悄中節點的 Hadoop 集群
如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多節點的 Hadoop 集群
Python的MapRece代碼
使用Python編寫MapRece代碼的技巧就在於我們使用了 HadoopStreaming 來幫助啟拍山我們在Map 和 Rece間傳遞數據通過STDIN (標准輸入)和STDOUT (標准輸出).我們僅僅使用Python的sys.stdin來輸入數據,使用sys.stdout輸出數據,這樣做是因為HadoopStreaming會幫我們辦好其他事。這是真的,別不相信!
Map: mapper.py
將下列的代碼保存在/home/hadoop/mapper.py中,他將從STDIN讀取數據並將單詞成行分隔開,生成一個列表映射單詞與發生次數的關系:
注意:要確保這個腳本有足夠許可權(chmod +x /home/hadoop/mapper.py)。
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Rece step, i.e. the input for recer.py
#
# tab-delimited; the trivial word count is 1
print '%s\\t%s' % (word, 1)在這個腳本中,並不計算出單詞出現的總數,它將輸出 "<word> 1" 迅速地,盡管<word>可能會在輸入中出現多次,計算是留給後來的Rece步驟(或叫做程序)來實現。當然你可以改變下編碼風格,完全尊重你的習慣。
Rece: recer.py
將代碼存儲在/home/hadoop/recer.py 中,這個腳本的作用是從mapper.py 的STDIN中讀取結果,然後計算每個單詞出現次數的總和,並輸出結果到STDOUT。
同樣,要注意腳本許可權:chmod +x /home/hadoop/recer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
賀困# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\\t%s'% (word, count)
測試你的代碼(cat data | map | sort | rece)
我建議你在運行MapRece job測試前嘗試手工測試你的mapper.py 和 recer.py腳本,以免得不到任何返回結果
這里有一些建議,關於如何測試你的Map和Rece的功能:
——————————————————————————————————————————————
\r\n
# very basic test
hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
——————————————————————————————————————————————
hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py | sort | /home/hadoop/recer.py
bar 1
foo 3
labs 1
——————————————————————————————————————————————
# using one of the ebooks as example input
# (see below on where to get the ebooks)
hadoop@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hadoop/mapper.py
The 1
Project 1
Gutenberg 1
EBook 1
of 1
[...]
(you get the idea)
quux 2
quux 1
——————————————————————————————————————————————
在Hadoop平台上運行Python腳本
為了這個例子,我們將需要三種電子書:
The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson\r\n
The Notebooks of Leonardo Da Vinci\r\n
Ulysses by James Joyce
下載他們,並使用us-ascii編碼存儲 解壓後的文件,保存在臨時目錄,比如/tmp/gutenberg.
hadoop@ubuntu:~$ ls -l /tmp/gutenberg/
total 3592
-rw-r--r-- 1 hadoop hadoop 674425 2007-01-22 12:56 20417-8.txt
-rw-r--r-- 1 hadoop hadoop 1423808 2006-08-03 16:36 7ldvc10.txt
-rw-r--r-- 1 hadoop hadoop 1561677 2004-11-26 09:48 ulyss12.txt
hadoop@ubuntu:~$
復制本地數據到HDFS
在我們運行MapRece job 前,我們需要將本地的文件復制到HDFS中:
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -FromLocal /tmp/gutenberg gutenberg
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
/user/hadoop/gutenberg <dir>
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg
Found 3 items
/user/hadoop/gutenberg/20417-8.txt <r 1> 674425
/user/hadoop/gutenberg/7ldvc10.txt <r 1> 1423808
/user/hadoop/gutenberg/ulyss12.txt <r 1> 1561677
執行 MapRece job
現在,一切准備就緒,我們將在運行Python MapRece job 在Hadoop集群上。像我上面所說的,我們使用的是
HadoopStreaming 幫助我們傳遞數據在Map和Rece間並通過STDIN和STDOUT,進行標准化輸入輸出。
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper /home/hadoop/mapper.py -recer /home/hadoop/recer.py -input gutenberg/*
-output gutenberg-output
在運行中,如果你想更改Hadoop的一些設置,如增加Rece任務的數量,你可以使用「-jobconf」選項:
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-jobconf mapred.rece.tasks=16 -mapper ...
一個重要的備忘是關於Hadoop does not honor mapred.map.tasks
這個任務將會讀取HDFS目錄下的gutenberg並處理他們,將結果存儲在獨立的結果文件中,並存儲在HDFS目錄下的
gutenberg-output目錄。
之前執行的結果如下:
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper /home/hadoop/mapper.py -recer /home/hadoop/recer.py -input gutenberg/*
-output gutenberg-output
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [/usr/local/hadoop-datastore/hadoop-hadoop/hadoop-unjar54543/]
[] /tmp/streamjob54544.jar tmpDir=null
[...] INFO mapred.FileInputFormat: Total input paths to process : 7
[...] INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-hadoop/mapred/local]
[...] INFO streaming.StreamJob: Running job: job_200803031615_0021
[...]
[...] INFO streaming.StreamJob: map 0% rece 0%
[...] INFO streaming.StreamJob: map 43% rece 0%
[...] INFO streaming.StreamJob: map 86% rece 0%
[...] INFO streaming.StreamJob: map 100% rece 0%
[...] INFO streaming.StreamJob: map 100% rece 33%
[...] INFO streaming.StreamJob: map 100% rece 70%
[...] INFO streaming.StreamJob: map 100% rece 77%
[...] INFO streaming.StreamJob: map 100% rece 100%
[...] INFO streaming.StreamJob: Job complete: job_200803031615_0021
[...] INFO streaming.StreamJob: Output: gutenberg-output hadoop@ubuntu:/usr/local/hadoop$
正如你所見到的上面的輸出結果,Hadoop 同時還提供了一個基本的WEB介面顯示統計結果和信息。
Ⅳ python模塊中使用了hadoop框架
1、什麼是python?
2、python的10大Web框架有哪些,以及它們各自的特點?
3、本文只是這10大框架做了簡單介紹,讓你對它們有個初步的了解。
Python 是一門動態、面向對象語言。其最初就是作為一門面向對象語言設計的,並且在後期又加入了一些更高級的特性。除了語言本身的設計目的之外,Python標准 庫也是值得大家稱贊的,Python甚至還自帶伺服器。其它方面,Python擁有足夠多的免費數據函數庫、免費的Web網頁模板豎擾系統、還有與Web服務 器進行交互的庫、這些都可以設計到你的Web應用程序裡面。在這篇文章里,我們將為Python Web開發者介紹基於Python的10大Web應用框架。
1、CubicWeb
1.jpg (101.51 KB, 下載次數: 2)
2015-7-16 11:04 上傳
CubicWeb的最重要的支柱就是代碼的可重用性。CubicWeb宣揚自己不僅是一個Web開發框架,而且還是一款語義Web開發框架。CubicWeb使用關系查詢語言(RQL Relation Query Language)與資料庫之間進行通信。
2、Zope2
2.jpg (53.86 KB, 下載次數: 1)
2015-7-16 11:04 上傳
Zope 2是一款基於Python的Web應用框架,是所有Python Web應用程序、工具的鼻祖,是Python家族一個強有力的分支。Zope 2的「對象發布」系統非常適合面向對象開發方法,並且可以減輕開發者的學習曲線,還可以幫助你發現應用程序里一些不歲寬好的功能。
3、Web2py
3.jpg (99.94 KB, 下載次數: 1)
2015-7-16 11:04 上傳
Web2py是一個用Python語言編寫的免費的開源Web框架,旨在敏捷快速的開發Web應用,具有快速、可擴展、安全以及可移植的資料庫驅動的應用,遵循LGPLv3開源協議。
Web2py提供一站式的解決方案,整個開發過程都可以在瀏覽器上進行,提供了Web版的在線開發,HTML模版編寫,靜態文件的上傳乎纖亮,資料庫的編寫的功能。其它的還有日誌功能,以及一個自動化的admin介面。
4、TurboGears
4.jpg (95.37 KB, 下載次數: 1)
2015-7-16 11:04 上傳
它是另外一個基於 Python 的 MVC 風格的 Web 應用程序框架。
TurboGears 開發人員稱這個項目是一個 「大框架(megaframework)」,這是因為它是由現有的子項目構成的。TurboGears 可以幫助將很多主要組件集成在一起:MochiKit:javaScript 庫
Kid:模板語言
CherryPy:基本 Web 框架
sqlObject:對象關系映射器(ORM)
5、Pylons
5.jpg (131.51 KB, 下載次數: 1)
2015-7-16 11:04 上傳
Pylons是一個開放源代碼的Web應用框架,使用python語言編寫。它對WSGI標准進行了擴展應用,提升了重用性且將功能分割到獨立的模塊中。
Pylons是最新的Web應用框架中的典型,類似於Django和TurboGears。Pylons受Ruby on Rails影響很深:它的兩個組件,Routes和WebHelpers是Rails特性的Python實現。
6、Grok
6.jpg (79.28 KB, 下載次數: 0)
2015-7-16 11:04 上傳
Grok 是一個為 Python 開發者提供的Web應用開發框架,Grok 的重點是敏捷開發,是一個易用而且功能強大的開發框架,基於 Zope 3 技術。
7、Web.py
7.jpg (69.53 KB, 下載次數: 2)
2015-7-16 11:10 上傳
Web.py是一個輕量級的開源Python Web框架,小巧靈活、簡單並且非常強大,在使用時沒有任何限制。目前Web.py被廣泛運用在許多大型網站,如西班牙的社交網站Frinki、主頁日平均訪問量達7000萬次的Yandex等。
8、Pyramid
8.jpg (98 KB, 下載次數: 2)
2015-7-16 11:10 上傳
Pyramid也是一款輕量級的開源Python Web框架,是Pylons項目的一部分。Pyramid只能運行在Python 2.x或2.4以後的版本上。在使用後端資料庫時無需聲明,在開發時也不會強制使用一些特定的模板系統。
9、CherryPy
9.jpg (83.16 KB, 下載次數: 2)
2015-7-16 11:11 上傳
CherryPy是一個基於Python的Web使用程序開發框架,它極大地簡化了運用 Python 的web開發人員的工作。它為Python開發人員提供了友好的HTTP協議介面。大家知道,HTTP可是萬維網的支柱協議,而CherryPy將HTTP協議簡化成Python API來供Python 開發人員使用,這極大地簡化了Web開發人員對HTTP協議的操作。CherryPy自身內置了一個HTTP伺服器,或者稱為Web伺服器。這樣,對於CherryPy的用戶來說,不用另外搭設Web伺服器就能直接運行 CherryPy應用程序了。實際上,Web伺服器是到達CherryPy應用程序的關口,是所有的HTTP請求和響應的必經之地。因此,可以這樣理解 CherryPy內建的Web伺服器:它是位於處理客戶端與伺服器端之間的一層軟體,用於把底層TCP套按字傳輸的信息轉換成Http請求,並傳遞給相應 的處理程序;同時,還把上層軟體傳來的信息打包成Http響應,並向下傳遞給底層的TCP套按字。
10、Flask
10.jpg (52.97 KB, 下載次數: 2)
2015-7-16 11:10 上傳
Flask是一個輕量級的Web應用框架, 使用Python編寫。基於 WerkzeugWSGI工具箱和 Jinja2模板引擎。使用 BSD 授權。
Flask也被稱為 「microframework」 ,因為它使用簡單的核心,用 extension 增加其他功能。Flask沒有默認使用的資料庫、窗體驗證工具。然而,Flask保留了擴增的彈性,可以用Flask-extension加入這些功 能:ORM、窗體驗證工具、文件上傳、各種開放式身份驗證技術
Ⅵ 在hadoop集群下跑一個python實例
如下面這絕余褲句sql就是借用了weekday_mapper.py對數據進行了處理
CREATETABLEu_data_new(
useridINT,
movieidINT,
ratingINT,
weekdayINT)
ROWFORMATDELIMITED
FIELDSTERMINATEDBY' ';
addFILEweekday_mapper.py;
INSERTOVERWRITETABLEu_data_new
SELECT
TRANSFORM(userid,movieid,rating,unixtime)
USING'pythonweekday_mapper.py'
AS(userid,movieid,rating,weekday)
FROMu_data;
,其中weekday_mapper.py內容如下
importsys
importdatetime
forlineinsys.stdin:
line=毀鋒line.strip()
userid,movieid,rating,unixtime=line.split(' ')
weekday=datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print' '.join([userid,movieid,rating,str(weekday)])
如下並簡面的例子則是使用了shell的cat命令來處理數據
(a.foo,a.bar)AS(oof,rab)USING'/bin/cat'WHEREa.ds>'2008-08-09';
Ⅶ 如何使用Python為Hadoop編寫一個簡單的MapRece程序
MichaelG.Noll在他的Blog中提到如何在Hadoop中用Python編寫MapRece程序,韓國的gogamza在其Bolg中也提到如何用C編汪瞎寫MapRece程序(我稍微修改了一下原程序,因為他的Map對單詞切分使用tab鍵)。我合並他們兩人的文章,也讓國內的Hadoop用戶能夠使用別的語言來編寫MapRece程序。首先您得配好您的Hadoop集群,這方面的介紹網上比較多,這兒給個鏈接(Hadoop學習筆記二安裝部署)。HadoopStreaming幫助返鋒我們用非Java的編程語言使用MapRece,Streaming用STDIN(標准輸入)和STDOUT(標准輸出)來和我們編寫的Map和Rece進行數據的交換數據。任何能夠使用STDIN和STDOUT都可以用來編寫MapRece程序,比如我們用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。我們還是使用Hadoop的例子WordCount來做示範如何編寫MapRece,在WordCount的例子中漏陵晌我們要解決計算在一批文檔中每一個單詞的出現頻率。首先我們在Map程序中會接受到這批文檔每一行的數據,然後我們編寫的Map程序把這一行按空格切開成一個數組。並對這個數組遍歷按"1"用標準的輸出輸出來,代表這個單詞出現了一次。在Rece中我們來統計單詞的出現頻率。PythonCodeMap:mapper.py#!/usr/bin/envpythonimportsys#={}#inputcomesfromSTDIN(standardinput)forlineinsys.stdin:#=line.strip()#=filter(lambdaword:word,line.split())#:#writetheresultstoSTDOUT(standardoutput);##Recestep,i.e.theinputforrecer.py##tab-delimited;thetrivialwordcountis1print'%s\t%s'%(word,1)Rece:recer.py#!/usr/bin/#={}#.stdin:#=line.strip()#parsetheinputwegotfrommapper.pyword,count=line.split()#convertcount(currentlyastring)tointtry:count=int(count)word2count[word]=word2count.get(word,0)+countexceptValueError:#countwasnotanumber,sosilently#ignore/discardthislinepass#sortthewordslexigraphically;##thisstepisNOTrequired,wejustdoitsothatour##wordcountexamplessorted_word2count=sorted(word2count.items(),key=itemgetter(0))#writetheresultstoSTDOUT(standardoutput)forword,countinsorted_word2count:print'%s\t%s'%(word,count)CCodeMap:Mapper.c#include#include#include#include#defineBUF_SIZE2048#defineDELIM"\n"intmain(intargc,char*argv[]){charbuffer[BUF_SIZE];while(fgets(buffer,BUF_SIZE-1,stdin)){intlen=strlen(buffer);if(buffer[len-1]=='\n')buffer[len-1]=0;char*querys=index(buffer,'');char*query=NULL;if(querys==NULL)continue;querys+=1;/*nottoinclude'\t'*/query=strtok(buffer,"");while(query){printf("%s\t1\n",query);query=strtok(NULL,"");}}return0;}h>h>h>h>Rece:Recer.c#include#include#include#include#defineBUFFER_SIZE1024#defineDELIM"\t"intmain(intargc,char*argv[]){charstrLastKey[BUFFER_SIZE];charstrLine[BUFFER_SIZE];intcount=0;*strLastKey='\0';*strLine='\0';while(fgets(strLine,BUFFER_SIZE-1,stdin)){char*strCurrKey=NULL;char*strCurrNum=NULL;strCurrKey=strtok(strLine,DELIM);strCurrNum=strtok(NULL,DELIM);/*necessarytocheckerrorbut.*/if(strLastKey[0]=='\0'){strcpy(strLastKey,strCurrKey);}if(strcmp(strCurrKey,strLastKey)){printf("%s\t%d\n",strLastKey,count);count=atoi(strCurrNum);}else{count+=atoi(strCurrNum);}strcpy(strLastKey,strCurrKey);}printf("%s\t%d\n",strLastKey,count);/*flushthecount*/return0;}h>h>h>h>首先我們調試一下源碼:chmod+xmapper.pychmod+xrecer.pyecho"foofooquuxlabsfoobarquux"|./mapper.py|./recer.pybar1foo3labs1quux2g++Mapper.c-oMapperg++Recer.c-oRecerchmod+xMapperchmod+xRecerecho"foofooquuxlabsfoobarquux"|./Mapper|./你可能看到C的輸出和Python的不一樣,因為Python是把他放在詞典里了.我們在Hadoop時,會對這進行排序,然後相同的單詞會連續在標准輸出中輸出.在Hadoop中運行程序首先我們要下載我們的測試文檔wget頁面中摘下的用php編寫的MapRece程序,供php程序員參考:Map:mapper.php#!/usr/bin/php$word2count=array();//inputcomesfromSTDIN(standardinput)while(($line=fgets(STDIN))!==false){//$line=strtolower(trim($line));//$words=preg_split('/\W/',$line,0,PREG_SPLIT_NO_EMPTY);//increasecountersforeach($wordsas$word){$word2count[$word]+=1;}}//writetheresultstoSTDOUT(standardoutput)////Recestep,i.e.theinputforrecer.pyforeach($word2countas$word=>$count){//tab-delimitedecho$word,chr(9),$count,PHP_EOL;}?>Rece:mapper.php#!/usr/bin/php$word2count=array();//inputcomesfromSTDINwhile(($line=fgets(STDIN))!==false){//$line=trim($line);//parsetheinputwegotfrommapper.phplist($word,$count)=explode(chr(9),$line);//convertcount(currentlyastring)toint$count=intval($count);//sumcountsif($count>0)$word2count[$word]+=$count;}//sortthewordslexigraphically////thissetisNOTrequired,wejustdoitsothatour////wordcountexamplesksort($word2count);//writetheresultstoSTDOUT(standardoutput)foreach($word2countas$word=>$count){echo$word,chr(9),$count,PHP_EOL;}?>作者:馬士華發表於:2008-03-05
Ⅷ 如何使用Python為Hadoop編寫一個簡單的MapRece程序
我們將編寫一個簡槐迅單的 MapRece 程序,使用的是C-Python,而不是Jython編寫後打包成jar包的程序。
我們的這個例子將模仿 WordCount 並使用Python來實現,例子通過讀取文本文件來統計出單詞的出現次數。結果也以文本形式輸出,每一陵此行包含一個單詞和單尺明迅詞出現的次數,兩者中間使用製表符來想間隔。
先決條件
編寫這個程序之前,你學要架設好Hadoop 集群,這樣才能不會在後期工作抓瞎。如果你沒有架設好,那麼在後面有個簡明教程來教你在Ubuntu Linux 上搭建(同樣適用於其他發行版linux、unix)
如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立單節點的 Hadoop 集群
如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多節點的 Hadoop 集群
Python的MapRece代碼
使用Python編寫MapRece代碼的技巧就在於我們使用了 HadoopStreaming 來幫助我們在Map 和 Rece間傳遞數據通過STDIN (標准輸入)和STDOUT (標准輸出).我們僅僅使用Python的sys.stdin來輸入數據,使用sys.stdout輸出數據,這樣做是因為HadoopStreaming會幫我們辦好其他事。這是真的,別不相信!
Ⅸ 如何在Hadoop環境下搭建Python
搭建 Python 環境在 Hadoop 上的步驟如下:
安裝 Hadoop:在你的計算機上安裝 Hadoop。
安裝 Python:請確保你的計孫拿算機上已經安裝了 Python。
配置 Hadoop 環境:編輯 Hadoop 的配置文件,以確保 Hadoop 可以與 Python 配合使用。
安裝相關模塊:請安裝所需的 Python 模塊,以便在 Hadoop 環境下使用 Python。
測試灶行 Python 安裝:請運行一些測試腳本,以確保 Python 可以在 Hadoop 環境下正常工作。
這些步驟可以幫助你在 Hadoop 環境下搭建 Python。請注意,具體的步驟可能因 Hadoop 的版本和環境而異,請仔細查則辯搭看相關文檔。
Ⅹ hadoop怎麼使用演算法
實例一、對以下數據進行排序,根據收入減去支出得到最後結余從大到小排序,數據如下:
SortStep運行之後結果為上圖根據結余從大到小排序。
代碼如下:
[java]view plain
<InfoBean>{
privateStringaccount;
privatedoubleincome;
privatedoubleexpenses;
privatedoublesurplus;
publicvoidset(Stringaccount,doubleincome,doubleexpenses){
this.account=account;
this.income=income;
this.expenses=expenses;
this.surplus=income-expenses;
}
@Override
publicStringtoString(){
returnthis.income+" "+this.expenses+" "+this.surplus;
}
/**
*serialize
*/
publicvoidwrite(DataOutputout)throwsIOException{
out.writeUTF(account);
out.writeDouble(income);
out.writeDouble(expenses);
out.writeDouble(surplus);
}
/**
*deserialize
*/
publicvoidreadFields(DataInputin)throwsIOException{
this.account=in.readUTF();
this.income=in.readDouble();
this.expenses=in.readDouble();
this.surplus=in.readDouble();
}
publicintcompareTo(InfoBeano){
if(this.income==o.getIncome()){
returnthis.expenses>o.getExpenses()?1:-1;
}else{
returnthis.income>o.getIncome()?-1:1;
}
}
publicStringgetAccount(){
returnaccount;
}
publicvoidsetAccount(Stringaccount){
this.account=account;
}
publicdoublegetIncome(){
returnincome;
}
publicvoidsetIncome(doubleincome){
this.income=income;
}
publicdoublegetExpenses(){
returnexpenses;
}
publicvoidsetExpenses(doubleexpenses){
this.expenses=expenses;
}
publicdoublegetSurplus(){
returnsurplus;
}
publicvoidsetSurplus(doublesurplus){
this.surplus=surplus;
}
}
publicclassSumStep{
publicstaticvoidmain(String[]args)throwsException{
Configurationconf=newConfiguration();
Jobjob=Job.getInstance(conf);
job.setJarByClass(SumStep.class);
job.setMapperClass(SumMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
FileInputFormat.setInputPaths(job,newPath(args[0]));
job.setRecerClass(SumRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job,newPath(args[1]));
job.waitForCompletion(true);
}
<LongWritable,Text,Text,InfoBean>{
privateInfoBeanbean=newInfoBean();
privateTextk=newText();
@Override
protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)
throwsIOException,InterruptedException{
//split
Stringline=value.toString();
String[]fields=line.split(" ");
//getusefulfield
Stringaccount=fields[0];
doubleincome=Double.parseDouble(fields[1]);
doubleexpenses=Double.parseDouble(fields[2]);
k.set(account);
bean.set(account,income,expenses);
context.write(k,bean);
}
}
<Text,InfoBean,Text,InfoBean>{
privateInfoBeanbean=newInfoBean();
@Override
protectedvoidrece(Textkey,Iterable<InfoBean>v2s,Contextcontext)
throwsIOException,InterruptedException{
doublein_sum=0;
doubleout_sum=0;
for(InfoBeanbean:v2s){
in_sum+=bean.getIncome();
out_sum+=bean.getExpenses();
}
bean.set("",in_sum,out_sum);
context.write(key,bean);
}
}
}
publicclassSortStep{
publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{
Configurationconf=newConfiguration();
Jobjob=Job.getInstance(conf);
job.setJarByClass(SortStep.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(InfoBean.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,newPath(args[0]));
job.setRecerClass(SortRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job,newPath(args[1]));
job.waitForCompletion(true);
}
<LongWritable,Text,InfoBean,NullWritable>{
privateInfoBeanbean=newInfoBean();
@Override
protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)
throwsIOException,InterruptedException{
Stringline=value.toString();
String[]fields=line.split(" ");
Stringaccount=fields[0];
doubleincome=Double.parseDouble(fields[1]);
doubleexpenses=Double.parseDouble(fields[2]);
bean.set(account,income,expenses);
context.write(bean,NullWritable.get());
}
}
<InfoBean,NullWritable,Text,InfoBean>{
privateTextk=newText();
@Override
protectedvoidrece(InfoBeanbean,Iterable<NullWritable>v2s,Contextcontext)
throwsIOException,InterruptedException{
Stringaccount=bean.getAccount();
k.set(account);
context.write(k,bean);
}
}
}
Map階段
<0,"hellotom">
....
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
--------------------------------------------------------
combiner階段
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>
context.write("hello","a.txt->5");
context.write("hello","b.txt->3");
--------------------------------------------------------
Recer階段
<"hello",{"a.txt->5","b.txt->3"}>
context.write("hello","a.txt->5b.txt->3");
-------------------------------------------------------
hello"a.txt->5b.txt->3"
tom"a.txt->2b.txt->1"
kitty"a.txt->1"
.......
publicclassInverseIndex{
publicstaticvoidmain(String[]args)throwsException{
Configurationconf=newConfiguration();
Jobjob=Job.getInstance(conf);
//設置jar
job.setJarByClass(InverseIndex.class);
//設置Mapper相關的屬性
job.setMapperClass(IndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,newPath(args[0]));//words.txt
//設置Recer相關屬性
job.setRecerClass(IndexRecer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job,newPath(args[1]));
job.setCombinerClass(IndexCombiner.class);
//提交任務
job.waitForCompletion(true);
}
<LongWritable,Text,Text,Text>{
privateTextk=newText();
privateTextv=newText();
@Override
protectedvoidmap(LongWritablekey,Textvalue,
Mapper<LongWritable,Text,Text,Text>.Contextcontext)
throwsIOException,InterruptedException{
Stringline=value.toString();
String[]fields=line.split("");
FileSplitinputSplit=(FileSplit)context.getInputSplit();
Pathpath=inputSplit.getPath();
Stringname=path.getName();
for(Stringf:fields){
k.set(f+"->"+name);
v.set("1");
context.write(k,v);
}
}
}
<Text,Text,Text,Text>{
privateTextk=newText();
privateTextv=newText();
@Override
protectedvoidrece(Textkey,Iterable<Text>values,
Recer<Text,Text,Text,Text>.Contextcontext)
throwsIOException,InterruptedException{
String[]fields=key.toString().split("->");
longsum=0;
for(Textt:values){
sum+=Long.parseLong(t.toString());
}
k.set(fields[0]);
v.set(fields[1]+"->"+sum);
context.write(k,v);
}
}
<Text,Text,Text,Text>{
privateTextv=newText();
@Override
protectedvoidrece(Textkey,Iterable<Text>values,
[java]view plain
此處的輸入為SumStep的輸出而不是源文件作為輸入,當然也可以將兩個job合並到一起執行,此處不再討論。
[java]view plain
實例二、倒排索引,過程如下:
[plain]view plain
代碼如下:
[java]view plain