当前位置:首页 » 编程语言 » hadoop实例python

hadoop实例python

发布时间: 2023-06-01 04:10:49

Ⅰ 如何使用python为Hadoop编写一个简单的MapRece程序

使用Python编写MapRece代码技巧于我使用 HadoopStreaming 帮助蚂贺我Map Rece间传递数据通STDIN (标首型准输入)STDOUT (标准输).我仅仅使用Pythonsys.stdin输入闷芹派数据使用sys.stdout输数据做HadoopStreaming帮我办其事真别相信!

Ⅱ hadoop用python写的Map部分哪里有问题啊

这个item.txt和'user_profile.txt'是什么文件?

如果是数据文件那应该放到HDFS上,或者自己实现inputformat来提供访问方式。程序中从标准输入获取数据。
如果是运行中的一些参数信息,那应该使用-files选项让Hadoop框架帮你把文件发送到目标机器上,和maprece的jar包放到相同的临时目录下,你才能找到。-files要加在前面,例如:
hadoop jar \$HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar -files item.txt -mapper ./python/map.py -recer ./python/rece.py -input /home/hadoop/hello -output /home/hadoop/outpath
如果保证每台主机的相同路径下都存在这个文件,也可以使用绝对路径。

命令写的也有问题,没有指定输入输出目录。
hadoop jar \$HADOOP_HOME/contrib/streaming/hadoop-0.20.2-streaming.jar -mapper ./python/map.py -recer ./python/rece.py -input /home/hadoop/hello -output /home/hadoop/outpath
其中输出路径/home/hadoop/outpath需要是一个之前不存在的路径,执行maprece的时候会校验并创建。

Ⅲ hadoop2.6.0下运行Python出现 subprocess failed with code 2

hadoop streaming pyhton 时候出唯伏颤现厅隐:subprocess failed with code 127,code 后面错误码不管是什么,都是 脚本环境的问题
在python脚本里面声明的:#!/usr/bin/env python,这里声明指败了位置,所以必须要放到 /usr/bin 下面,才能有效。
python 执行程序必须放到 /usr/bin 下面,在环境变量的其他位置不管用。

Ⅳ 如何使用Python为Hadoop编写一个简单的MapRece程序

转载:我们将编写一个简单的 MapRece 程序,使用的是C-Python,而不是Jython编写后打包成jar包的程序。
我们的这个例子将模仿 WordCount 并使用Python来实现,例子通过读取文本文件来统计出单词的出现次数。结果也以文本形式输出,每一行包含一个单词和单词出现的次数,两者中间使用制表符来想间隔。

先决条件

编写这个程序之前,你学要架设好Hadoop 集群,这样才能不会在后期工作抓瞎。如果你没有架设好,那么在后面有个简明教程来教你在Ubuntu linux 上搭建(同样适用于其他发行版linux、unix)

如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立单悄中节点的 Hadoop 集群

如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多节点的 Hadoop 集群

Python的MapRece代码

使用Python编写MapRece代码的技巧就在于我们使用了 HadoopStreaming 来帮助启拍山我们在Map 和 Rece间传递数据通过STDIN (标准输入)和STDOUT (标准输出).我们仅仅使用Python的sys.stdin来输入数据,使用sys.stdout输出数据,这样做是因为HadoopStreaming会帮我们办好其他事。这是真的,别不相信!

Map: mapper.py

将下列的代码保存在/home/hadoop/mapper.py中,他将从STDIN读取数据并将单词成行分隔开,生成一个列表映射单词与发生次数的关系:
注意:要确保这个脚本有足够权限(chmod +x /home/hadoop/mapper.py)。

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Rece step, i.e. the input for recer.py
#
# tab-delimited; the trivial word count is 1
print '%s\\t%s' % (word, 1)在这个脚本中,并不计算出单词出现的总数,它将输出 "<word> 1" 迅速地,尽管<word>可能会在输入中出现多次,计算是留给后来的Rece步骤(或叫做程序)来实现。当然你可以改变下编码风格,完全尊重你的习惯。

Rece: recer.py

将代码存储在/home/hadoop/recer.py 中,这个脚本的作用是从mapper.py 的STDIN中读取结果,然后计算每个单词出现次数的总和,并输出结果到STDOUT。
同样,要注意脚本权限:chmod +x /home/hadoop/recer.py

#!/usr/bin/env python

from operator import itemgetter
import sys

# maps words to their counts
word2count = {}

贺困# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()

# parse the input we got from mapper.py
word, count = line.split('\\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass

# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))

# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\\t%s'% (word, count)
测试你的代码(cat data | map | sort | rece)

我建议你在运行MapRece job测试前尝试手工测试你的mapper.py 和 recer.py脚本,以免得不到任何返回结果
这里有一些建议,关于如何测试你的Map和Rece的功能:
——————————————————————————————————————————————
\r\n
# very basic test
hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
——————————————————————————————————————————————
hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py | sort | /home/hadoop/recer.py
bar 1
foo 3
labs 1
——————————————————————————————————————————————

# using one of the ebooks as example input
# (see below on where to get the ebooks)
hadoop@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hadoop/mapper.py
The 1
Project 1
Gutenberg 1
EBook 1
of 1
[...]
(you get the idea)

quux 2

quux 1

——————————————————————————————————————————————

在Hadoop平台上运行Python脚本

为了这个例子,我们将需要三种电子书:

The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson\r\n
The Notebooks of Leonardo Da Vinci\r\n
Ulysses by James Joyce
下载他们,并使用us-ascii编码存储 解压后的文件,保存在临时目录,比如/tmp/gutenberg.

hadoop@ubuntu:~$ ls -l /tmp/gutenberg/
total 3592
-rw-r--r-- 1 hadoop hadoop 674425 2007-01-22 12:56 20417-8.txt
-rw-r--r-- 1 hadoop hadoop 1423808 2006-08-03 16:36 7ldvc10.txt
-rw-r--r-- 1 hadoop hadoop 1561677 2004-11-26 09:48 ulyss12.txt
hadoop@ubuntu:~$

复制本地数据到HDFS

在我们运行MapRece job 前,我们需要将本地的文件复制到HDFS中:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -FromLocal /tmp/gutenberg gutenberg
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
/user/hadoop/gutenberg <dir>
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg
Found 3 items
/user/hadoop/gutenberg/20417-8.txt <r 1> 674425
/user/hadoop/gutenberg/7ldvc10.txt <r 1> 1423808
/user/hadoop/gutenberg/ulyss12.txt <r 1> 1561677

执行 MapRece job

现在,一切准备就绪,我们将在运行Python MapRece job 在Hadoop集群上。像我上面所说的,我们使用的是
HadoopStreaming 帮助我们传递数据在Map和Rece间并通过STDIN和STDOUT,进行标准化输入输出。

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper /home/hadoop/mapper.py -recer /home/hadoop/recer.py -input gutenberg/*
-output gutenberg-output
在运行中,如果你想更改Hadoop的一些设置,如增加Rece任务的数量,你可以使用“-jobconf”选项:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-jobconf mapred.rece.tasks=16 -mapper ...

一个重要的备忘是关于Hadoop does not honor mapred.map.tasks
这个任务将会读取HDFS目录下的gutenberg并处理他们,将结果存储在独立的结果文件中,并存储在HDFS目录下的
gutenberg-output目录。
之前执行的结果如下:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper /home/hadoop/mapper.py -recer /home/hadoop/recer.py -input gutenberg/*
-output gutenberg-output

additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [/usr/local/hadoop-datastore/hadoop-hadoop/hadoop-unjar54543/]
[] /tmp/streamjob54544.jar tmpDir=null
[...] INFO mapred.FileInputFormat: Total input paths to process : 7
[...] INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-hadoop/mapred/local]
[...] INFO streaming.StreamJob: Running job: job_200803031615_0021
[...]
[...] INFO streaming.StreamJob: map 0% rece 0%
[...] INFO streaming.StreamJob: map 43% rece 0%
[...] INFO streaming.StreamJob: map 86% rece 0%
[...] INFO streaming.StreamJob: map 100% rece 0%
[...] INFO streaming.StreamJob: map 100% rece 33%
[...] INFO streaming.StreamJob: map 100% rece 70%
[...] INFO streaming.StreamJob: map 100% rece 77%
[...] INFO streaming.StreamJob: map 100% rece 100%
[...] INFO streaming.StreamJob: Job complete: job_200803031615_0021

[...] INFO streaming.StreamJob: Output: gutenberg-output hadoop@ubuntu:/usr/local/hadoop$

正如你所见到的上面的输出结果,Hadoop 同时还提供了一个基本的WEB接口显示统计结果和信息。

Ⅳ python模块中使用了hadoop框架

1、什么是python?

2、python的10大Web框架有哪些,以及它们各自的特点?
3、本文只是这10大框架做了简单介绍,让你对它们有个初步的了解。

Python 是一门动态、面向对象语言。其最初就是作为一门面向对象语言设计的,并且在后期又加入了一些更高级的特性。除了语言本身的设计目的之外,Python标准 库也是值得大家称赞的,Python甚至还自带服务器。其它方面,Python拥有足够多的免费数据函数库、免费的Web网页模板竖扰系统、还有与Web服务 器进行交互的库、这些都可以设计到你的Web应用程序里面。在这篇文章里,我们将为Python Web开发者介绍基于Python的10大Web应用框架。
1、CubicWeb

1.jpg (101.51 KB, 下载次数: 2)

2015-7-16 11:04 上传

CubicWeb的最重要的支柱就是代码的可重用性。CubicWeb宣扬自己不仅是一个Web开发框架,而且还是一款语义Web开发框架。CubicWeb使用关系查询语言(RQL Relation Query Language)与数据库之间进行通信。

2、Zope2

2.jpg (53.86 KB, 下载次数: 1)

2015-7-16 11:04 上传

Zope 2是一款基于Python的Web应用框架,是所有Python Web应用程序、工具的鼻祖,是Python家族一个强有力的分支。Zope 2的“对象发布”系统非常适合面向对象开发方法,并且可以减轻开发者的学习曲线,还可以帮助你发现应用程序里一些不岁宽好的功能。

3、Web2py

3.jpg (99.94 KB, 下载次数: 1)

2015-7-16 11:04 上传

Web2py是一个用Python语言编写的免费的开源Web框架,旨在敏捷快速的开发Web应用,具有快速、可扩展、安全以及可移植的数据库驱动的应用,遵循LGPLv3开源协议。

Web2py提供一站式的解决方案,整个开发过程都可以在浏览器上进行,提供了Web版的在线开发,HTML模版编写,静态文件的上传乎纤亮,数据库的编写的功能。其它的还有日志功能,以及一个自动化的admin接口。

4、TurboGears

4.jpg (95.37 KB, 下载次数: 1)

2015-7-16 11:04 上传

它是另外一个基于 Python 的 MVC 风格的 Web 应用程序框架。

TurboGears 开发人员称这个项目是一个 “大框架(megaframework)”,这是因为它是由现有的子项目构成的。TurboGears 可以帮助将很多主要组件集成在一起:MochiKit:javaScript 库

Kid:模板语言

CherryPy:基本 Web 框架

sqlObject:对象关系映射器(ORM)

5、Pylons

5.jpg (131.51 KB, 下载次数: 1)

2015-7-16 11:04 上传

Pylons是一个开放源代码的Web应用框架,使用python语言编写。它对WSGI标准进行了扩展应用,提升了重用性且将功能分割到独立的模块中。

Pylons是最新的Web应用框架中的典型,类似于Django和TurboGears。Pylons受Ruby on Rails影响很深:它的两个组件,Routes和WebHelpers是Rails特性的Python实现。

6、Grok

6.jpg (79.28 KB, 下载次数: 0)

2015-7-16 11:04 上传

Grok 是一个为 Python 开发者提供的Web应用开发框架,Grok 的重点是敏捷开发,是一个易用而且功能强大的开发框架,基于 Zope 3 技术。

7、Web.py

7.jpg (69.53 KB, 下载次数: 2)

2015-7-16 11:10 上传

Web.py是一个轻量级的开源Python Web框架,小巧灵活、简单并且非常强大,在使用时没有任何限制。目前Web.py被广泛运用在许多大型网站,如西班牙的社交网站Frinki、主页日平均访问量达7000万次的Yandex等。

8、Pyramid

8.jpg (98 KB, 下载次数: 2)

2015-7-16 11:10 上传

Pyramid也是一款轻量级的开源Python Web框架,是Pylons项目的一部分。Pyramid只能运行在Python 2.x或2.4以后的版本上。在使用后端数据库时无需声明,在开发时也不会强制使用一些特定的模板系统。

9、CherryPy

9.jpg (83.16 KB, 下载次数: 2)

2015-7-16 11:11 上传

CherryPy是一个基于Python的Web使用程序开发框架,它极大地简化了运用 Python 的web开发人员的工作。它为Python开发人员提供了友好的HTTP协议接口。大家知道,HTTP可是万维网的支柱协议,而CherryPy将HTTP协议简化成Python API来供Python 开发人员使用,这极大地简化了Web开发人员对HTTP协议的操作。CherryPy自身内置了一个HTTP服务器,或者称为Web服务器。这样,对于CherryPy的用户来说,不用另外搭设Web服务器就能直接运行 CherryPy应用程序了。实际上,Web服务器是到达CherryPy应用程序的关口,是所有的HTTP请求和响应的必经之地。因此,可以这样理解 CherryPy内建的Web服务器:它是位于处理客户端与服务器端之间的一层软件,用于把底层TCP套按字传输的信息转换成Http请求,并传递给相应 的处理程序;同时,还把上层软件传来的信息打包成Http响应,并向下传递给底层的TCP套按字。

10、Flask

10.jpg (52.97 KB, 下载次数: 2)

2015-7-16 11:10 上传

Flask是一个轻量级的Web应用框架, 使用Python编写。基于 WerkzeugWSGI工具箱和 Jinja2模板引擎。使用 BSD 授权。

Flask也被称为 “microframework” ,因为它使用简单的核心,用 extension 增加其他功能。Flask没有默认使用的数据库、窗体验证工具。然而,Flask保留了扩增的弹性,可以用Flask-extension加入这些功 能:ORM、窗体验证工具、文件上传、各种开放式身份验证技术

Ⅵ 在hadoop集群下跑一个python实例

如下面这绝余裤句sql就是借用了weekday_mapper.py对数据进行了处理
CREATETABLEu_data_new(
useridINT,
movieidINT,
ratingINT,
weekdayINT)
ROWFORMATDELIMITED
FIELDSTERMINATEDBY' ';

addFILEweekday_mapper.py;

INSERTOVERWRITETABLEu_data_new
SELECT
TRANSFORM(userid,movieid,rating,unixtime)
USING'pythonweekday_mapper.py'
AS(userid,movieid,rating,weekday)
FROMu_data;
,其中weekday_mapper.py内容如下
importsys
importdatetime

forlineinsys.stdin:
line=毁锋line.strip()
userid,movieid,rating,unixtime=line.split(' ')
weekday=datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print' '.join([userid,movieid,rating,str(weekday)])

如下并简面的例子则是使用了shell的cat命令来处理数据
(a.foo,a.bar)AS(oof,rab)USING'/bin/cat'WHEREa.ds>'2008-08-09';

Ⅶ 如何使用Python为Hadoop编写一个简单的MapRece程序

MichaelG.Noll在他的Blog中提到如何在Hadoop中用Python编写MapRece程序,韩国的gogamza在其Bolg中也提到如何用C编汪瞎写MapRece程序(我稍微修改了一下原程序,因为他的Map对单词切分使用tab键)。我合并他们两人的文章,也让国内的Hadoop用户能够使用别的语言来编写MapRece程序。首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二安装部署)。HadoopStreaming帮助返锋我们用非Java的编程语言使用MapRece,Streaming用STDIN(标准输入)和STDOUT(标准输出)来和我们编写的Map和Rece进行数据的交换数据。任何能够使用STDIN和STDOUT都可以用来编写MapRece程序,比如我们用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。我们还是使用Hadoop的例子WordCount来做示范如何编写MapRece,在WordCount的例子中漏陵晌我们要解决计算在一批文档中每一个单词的出现频率。首先我们在Map程序中会接受到这批文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组。并对这个数组遍历按"1"用标准的输出输出来,代表这个单词出现了一次。在Rece中我们来统计单词的出现频率。PythonCodeMap:mapper.py#!/usr/bin/envpythonimportsys#={}#inputcomesfromSTDIN(standardinput)forlineinsys.stdin:#=line.strip()#=filter(lambdaword:word,line.split())#:#writetheresultstoSTDOUT(standardoutput);##Recestep,i.e.theinputforrecer.py##tab-delimited;thetrivialwordcountis1print'%s\t%s'%(word,1)Rece:recer.py#!/usr/bin/#={}#.stdin:#=line.strip()#parsetheinputwegotfrommapper.pyword,count=line.split()#convertcount(currentlyastring)tointtry:count=int(count)word2count[word]=word2count.get(word,0)+countexceptValueError:#countwasnotanumber,sosilently#ignore/discardthislinepass#sortthewordslexigraphically;##thisstepisNOTrequired,wejustdoitsothatour##wordcountexamplessorted_word2count=sorted(word2count.items(),key=itemgetter(0))#writetheresultstoSTDOUT(standardoutput)forword,countinsorted_word2count:print'%s\t%s'%(word,count)CCodeMap:Mapper.c#include#include#include#include#defineBUF_SIZE2048#defineDELIM"\n"intmain(intargc,char*argv[]){charbuffer[BUF_SIZE];while(fgets(buffer,BUF_SIZE-1,stdin)){intlen=strlen(buffer);if(buffer[len-1]=='\n')buffer[len-1]=0;char*querys=index(buffer,'');char*query=NULL;if(querys==NULL)continue;querys+=1;/*nottoinclude'\t'*/query=strtok(buffer,"");while(query){printf("%s\t1\n",query);query=strtok(NULL,"");}}return0;}h>h>h>h>Rece:Recer.c#include#include#include#include#defineBUFFER_SIZE1024#defineDELIM"\t"intmain(intargc,char*argv[]){charstrLastKey[BUFFER_SIZE];charstrLine[BUFFER_SIZE];intcount=0;*strLastKey='\0';*strLine='\0';while(fgets(strLine,BUFFER_SIZE-1,stdin)){char*strCurrKey=NULL;char*strCurrNum=NULL;strCurrKey=strtok(strLine,DELIM);strCurrNum=strtok(NULL,DELIM);/*necessarytocheckerrorbut.*/if(strLastKey[0]=='\0'){strcpy(strLastKey,strCurrKey);}if(strcmp(strCurrKey,strLastKey)){printf("%s\t%d\n",strLastKey,count);count=atoi(strCurrNum);}else{count+=atoi(strCurrNum);}strcpy(strLastKey,strCurrKey);}printf("%s\t%d\n",strLastKey,count);/*flushthecount*/return0;}h>h>h>h>首先我们调试一下源码:chmod+xmapper.pychmod+xrecer.pyecho"foofooquuxlabsfoobarquux"|./mapper.py|./recer.pybar1foo3labs1quux2g++Mapper.c-oMapperg++Recer.c-oRecerchmod+xMapperchmod+xRecerecho"foofooquuxlabsfoobarquux"|./Mapper|./你可能看到C的输出和Python的不一样,因为Python是把他放在词典里了.我们在Hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出.在Hadoop中运行程序首先我们要下载我们的测试文档wget页面中摘下的用php编写的MapRece程序,供php程序员参考:Map:mapper.php#!/usr/bin/php$word2count=array();//inputcomesfromSTDIN(standardinput)while(($line=fgets(STDIN))!==false){//$line=strtolower(trim($line));//$words=preg_split('/\W/',$line,0,PREG_SPLIT_NO_EMPTY);//increasecountersforeach($wordsas$word){$word2count[$word]+=1;}}//writetheresultstoSTDOUT(standardoutput)////Recestep,i.e.theinputforrecer.pyforeach($word2countas$word=>$count){//tab-delimitedecho$word,chr(9),$count,PHP_EOL;}?>Rece:mapper.php#!/usr/bin/php$word2count=array();//inputcomesfromSTDINwhile(($line=fgets(STDIN))!==false){//$line=trim($line);//parsetheinputwegotfrommapper.phplist($word,$count)=explode(chr(9),$line);//convertcount(currentlyastring)toint$count=intval($count);//sumcountsif($count>0)$word2count[$word]+=$count;}//sortthewordslexigraphically////thissetisNOTrequired,wejustdoitsothatour////wordcountexamplesksort($word2count);//writetheresultstoSTDOUT(standardoutput)foreach($word2countas$word=>$count){echo$word,chr(9),$count,PHP_EOL;}?>作者:马士华发表于:2008-03-05

Ⅷ 如何使用Python为Hadoop编写一个简单的MapRece程序

我们将编写一个简槐迅单的 MapRece 程序,使用的是C-Python,而不是Jython编写后打包成jar包的程序。
我们的这个例子将模仿 WordCount 并使用Python来实现,例子通过读取文本文件来统计出单词的出现次数。结果也以文本形式输出,每一陵此行包含一个单词和单尺明迅词出现的次数,两者中间使用制表符来想间隔。

先决条件

编写这个程序之前,你学要架设好Hadoop 集群,这样才能不会在后期工作抓瞎。如果你没有架设好,那么在后面有个简明教程来教你在Ubuntu Linux 上搭建(同样适用于其他发行版linux、unix)

如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立单节点的 Hadoop 集群

如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多节点的 Hadoop 集群

Python的MapRece代码

使用Python编写MapRece代码的技巧就在于我们使用了 HadoopStreaming 来帮助我们在Map 和 Rece间传递数据通过STDIN (标准输入)和STDOUT (标准输出).我们仅仅使用Python的sys.stdin来输入数据,使用sys.stdout输出数据,这样做是因为HadoopStreaming会帮我们办好其他事。这是真的,别不相信!

Ⅸ 如何在Hadoop环境下搭建Python

搭建 Python 环境在 Hadoop 上的步骤如下:

  • 安装 Hadoop:在你的计算机上安装 Hadoop。

  • 安装 Python:请确保你的计孙拿算机上已经安装了 Python。

  • 配置 Hadoop 环境:编辑 Hadoop 的配置文件,以确保 Hadoop 可以与 Python 配合使用。

  • 安装相关模块:请安装所需的 Python 模块,以便在 Hadoop 环境下使用 Python。

  • 测试灶行 Python 安装:请运行一些测试脚本,以确保 Python 可以在 Hadoop 环境下正常工作。

  • 这些步骤可以帮助你在 Hadoop 环境下搭建 Python。请注意,具体的步骤可能因 Hadoop 的版本和环境而异,请仔细查则辩搭看相关文档。

Ⅹ hadoop怎么使用算法

实例一、对以下数据进行排序,根据收入减去支出得到最后结余从大到小排序,数据如下:


SortStep运行之后结果为上图根据结余从大到小排序。

代码如下:

[java]view plain

  • <InfoBean>{

  • privateStringaccount;

  • privatedoubleincome;

  • privatedoubleexpenses;

  • privatedoublesurplus;

  • publicvoidset(Stringaccount,doubleincome,doubleexpenses){

  • this.account=account;

  • this.income=income;

  • this.expenses=expenses;

  • this.surplus=income-expenses;

  • }

  • @Override

  • publicStringtoString(){

  • returnthis.income+" "+this.expenses+" "+this.surplus;

  • }

  • /**

  • *serialize

  • */

  • publicvoidwrite(DataOutputout)throwsIOException{

  • out.writeUTF(account);

  • out.writeDouble(income);

  • out.writeDouble(expenses);

  • out.writeDouble(surplus);

  • }

  • /**

  • *deserialize

  • */

  • publicvoidreadFields(DataInputin)throwsIOException{

  • this.account=in.readUTF();

  • this.income=in.readDouble();

  • this.expenses=in.readDouble();

  • this.surplus=in.readDouble();

  • }

  • publicintcompareTo(InfoBeano){

  • if(this.income==o.getIncome()){

  • returnthis.expenses>o.getExpenses()?1:-1;

  • }else{

  • returnthis.income>o.getIncome()?-1:1;

  • }

  • }

  • publicStringgetAccount(){

  • returnaccount;

  • }

  • publicvoidsetAccount(Stringaccount){

  • this.account=account;

  • }

  • publicdoublegetIncome(){

  • returnincome;

  • }

  • publicvoidsetIncome(doubleincome){

  • this.income=income;

  • }

  • publicdoublegetExpenses(){

  • returnexpenses;

  • }

  • publicvoidsetExpenses(doubleexpenses){

  • this.expenses=expenses;

  • }

  • publicdoublegetSurplus(){

  • returnsurplus;

  • }

  • publicvoidsetSurplus(doublesurplus){

  • this.surplus=surplus;

  • }

  • }

  • [java]view plain

  • publicclassSumStep{

  • publicstaticvoidmain(String[]args)throwsException{

  • Configurationconf=newConfiguration();

  • Jobjob=Job.getInstance(conf);

  • job.setJarByClass(SumStep.class);

  • job.setMapperClass(SumMapper.class);

  • job.setMapOutputKeyClass(Text.class);

  • job.setMapOutputValueClass(InfoBean.class);

  • FileInputFormat.setInputPaths(job,newPath(args[0]));

  • job.setRecerClass(SumRecer.class);

  • job.setOutputKeyClass(Text.class);

  • job.setOutputValueClass(InfoBean.class);

  • FileOutputFormat.setOutputPath(job,newPath(args[1]));

  • job.waitForCompletion(true);

  • }

  • <LongWritable,Text,Text,InfoBean>{

  • privateInfoBeanbean=newInfoBean();

  • privateTextk=newText();

  • @Override

  • protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)

  • throwsIOException,InterruptedException{

  • //split

  • Stringline=value.toString();

  • String[]fields=line.split(" ");

  • //getusefulfield

  • Stringaccount=fields[0];

  • doubleincome=Double.parseDouble(fields[1]);

  • doubleexpenses=Double.parseDouble(fields[2]);

  • k.set(account);

  • bean.set(account,income,expenses);

  • context.write(k,bean);

  • }

  • }

  • <Text,InfoBean,Text,InfoBean>{

  • privateInfoBeanbean=newInfoBean();

  • @Override

  • protectedvoidrece(Textkey,Iterable<InfoBean>v2s,Contextcontext)

  • throwsIOException,InterruptedException{

  • doublein_sum=0;

  • doubleout_sum=0;

  • for(InfoBeanbean:v2s){

  • in_sum+=bean.getIncome();

  • out_sum+=bean.getExpenses();

  • }

  • bean.set("",in_sum,out_sum);

  • context.write(key,bean);

  • }

  • }

  • }


  • 此处的输入为SumStep的输出而不是源文件作为输入,当然也可以将两个job合并到一起执行,此处不再讨论。

    [java]view plain

  • publicclassSortStep{

  • publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{

  • Configurationconf=newConfiguration();

  • Jobjob=Job.getInstance(conf);

  • job.setJarByClass(SortStep.class);

  • job.setMapperClass(SortMapper.class);

  • job.setMapOutputKeyClass(InfoBean.class);

  • job.setMapOutputValueClass(NullWritable.class);

  • FileInputFormat.setInputPaths(job,newPath(args[0]));

  • job.setRecerClass(SortRecer.class);

  • job.setOutputKeyClass(Text.class);

  • job.setOutputValueClass(InfoBean.class);

  • FileOutputFormat.setOutputPath(job,newPath(args[1]));

  • job.waitForCompletion(true);

  • }

  • <LongWritable,Text,InfoBean,NullWritable>{

  • privateInfoBeanbean=newInfoBean();

  • @Override

  • protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)

  • throwsIOException,InterruptedException{

  • Stringline=value.toString();

  • String[]fields=line.split(" ");

  • Stringaccount=fields[0];

  • doubleincome=Double.parseDouble(fields[1]);

  • doubleexpenses=Double.parseDouble(fields[2]);

  • bean.set(account,income,expenses);

  • context.write(bean,NullWritable.get());

  • }

  • }

  • <InfoBean,NullWritable,Text,InfoBean>{

  • privateTextk=newText();

  • @Override

  • protectedvoidrece(InfoBeanbean,Iterable<NullWritable>v2s,Contextcontext)

  • throwsIOException,InterruptedException{

  • Stringaccount=bean.getAccount();

  • k.set(account);

  • context.write(k,bean);

  • }

  • }

  • }


  • 实例二、倒排索引,过程如下:

    [plain]view plain

  • Map阶段

  • <0,"hellotom">

  • ....

  • context.write("hello->a.txt",1);

  • context.write("hello->a.txt",1);

  • context.write("hello->a.txt",1);

  • context.write("hello->a.txt",1);

  • context.write("hello->a.txt",1);

  • context.write("hello->b.txt",1);

  • context.write("hello->b.txt",1);

  • context.write("hello->b.txt",1);

  • --------------------------------------------------------

  • combiner阶段

  • <"hello->a.txt",1>

  • <"hello->a.txt",1>

  • <"hello->a.txt",1>

  • <"hello->a.txt",1>

  • <"hello->a.txt",1>

  • <"hello->b.txt",1>

  • <"hello->b.txt",1>

  • <"hello->b.txt",1>

  • context.write("hello","a.txt->5");

  • context.write("hello","b.txt->3");

  • --------------------------------------------------------

  • Recer阶段

  • <"hello",{"a.txt->5","b.txt->3"}>

  • context.write("hello","a.txt->5b.txt->3");

  • -------------------------------------------------------

  • hello"a.txt->5b.txt->3"

  • tom"a.txt->2b.txt->1"

  • kitty"a.txt->1"

  • .......

  • 代码如下:

    [java]view plain

  • publicclassInverseIndex{

  • publicstaticvoidmain(String[]args)throwsException{

  • Configurationconf=newConfiguration();

  • Jobjob=Job.getInstance(conf);

  • //设置jar

  • job.setJarByClass(InverseIndex.class);

  • //设置Mapper相关的属性

  • job.setMapperClass(IndexMapper.class);

  • job.setMapOutputKeyClass(Text.class);

  • job.setMapOutputValueClass(Text.class);

  • FileInputFormat.setInputPaths(job,newPath(args[0]));//words.txt

  • //设置Recer相关属性

  • job.setRecerClass(IndexRecer.class);

  • job.setOutputKeyClass(Text.class);

  • job.setOutputValueClass(Text.class);

  • FileOutputFormat.setOutputPath(job,newPath(args[1]));

  • job.setCombinerClass(IndexCombiner.class);

  • //提交任务

  • job.waitForCompletion(true);

  • }

  • <LongWritable,Text,Text,Text>{

  • privateTextk=newText();

  • privateTextv=newText();

  • @Override

  • protectedvoidmap(LongWritablekey,Textvalue,

  • Mapper<LongWritable,Text,Text,Text>.Contextcontext)

  • throwsIOException,InterruptedException{

  • Stringline=value.toString();

  • String[]fields=line.split("");

  • FileSplitinputSplit=(FileSplit)context.getInputSplit();

  • Pathpath=inputSplit.getPath();

  • Stringname=path.getName();

  • for(Stringf:fields){

  • k.set(f+"->"+name);

  • v.set("1");

  • context.write(k,v);

  • }

  • }

  • }

  • <Text,Text,Text,Text>{

  • privateTextk=newText();

  • privateTextv=newText();

  • @Override

  • protectedvoidrece(Textkey,Iterable<Text>values,

  • Recer<Text,Text,Text,Text>.Contextcontext)

  • throwsIOException,InterruptedException{

  • String[]fields=key.toString().split("->");

  • longsum=0;

  • for(Textt:values){

  • sum+=Long.parseLong(t.toString());

  • }

  • k.set(fields[0]);

  • v.set(fields[1]+"->"+sum);

  • context.write(k,v);

  • }

  • }

  • <Text,Text,Text,Text>{

  • privateTextv=newText();

  • @Override

  • protectedvoidrece(Textkey,Iterable<Text>values,

热点内容
服务器在线访问数由什么决定 发布:2024-05-06 11:39:15 浏览:677
途观21款哪个配置值得买 发布:2024-05-06 11:29:00 浏览:91
pythonspyder 发布:2024-05-06 11:15:53 浏览:165
线上服务器如何资源监控 发布:2024-05-06 11:15:07 浏览:298
页游脚本检测 发布:2024-05-06 11:05:05 浏览:924
七七网源码 发布:2024-05-06 10:27:36 浏览:295
shell输入脚本 发布:2024-05-06 10:19:49 浏览:985
通达信自定义板块在哪个文件夹 发布:2024-05-06 09:56:37 浏览:104
在linux搭建mqtt服务器搭建 发布:2024-05-06 09:52:00 浏览:559
windowspython23 发布:2024-05-06 09:27:50 浏览:748