如何将java代码在spark运行

2025-05-05 06:04:28
推荐回答(1个)
回答1:

我们首先提出这样一个简单的需求:
现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示:

1

121.205.198.92
- - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"

2

121.205.198.92
- - [21/Feb/2014:00:00:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"

3

121.205.198.92
- - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"

4

121.205.198.92
- - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"

5

121.205.241.229
- - [21/Feb/2014:00:00:13 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"

6

121.205.241.229
- - [21/Feb/2014:00:00:15 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"

Java实现Spark应用程序(Application)

我们实现的统计分析程序,有如下几个功能点:

从HDFS读取日志数据文件
将每行的第一个字段(IP地址)抽取出来
统计每个IP地址出现的次数
根据每个IP地址出现的次数进行一个降序排序
根据IP地址,调用GeoIP库获取IP所属国家
打印输出结果,每行的格式:[国家代码] IP地址 频率

下面,看我们使用Java实现的统计分析应用程序代码,如下所示:

001

package org.shirdrn.spark.job;

002

003

import java.io.File;

004

import java.io.IOException;

005

import java.util.Arrays;

006

import java.util.Collections;

007

import java.util.Comparator;

008

import java.util.List;

009

import java.util.regex.Pattern;

010

011

import org.apache.commons.logging.Log;

012

import org.apache.commons.logging.LogFactory;

013

import org.apache.spark.api.java.JavaPairRDD;

014

import org.apache.spark.api.java.JavaRDD;

015

import org.apache.spark.api.java.JavaSparkContext;

016

import org.apache.spark.api.java.function.FlatMapFunction;

017

import org.apache.spark.api.java.function.Function2;

018

import org.apache.spark.api.java.function.PairFunction;

019

import org.shirdrn.spark.job.maxmind.Country;

020

import org.shirdrn.spark.job.maxmind.LookupService;

021

022

import scala.Serializable;

023

import scala.Tuple2;

024

025

public class IPAddressStats implements Serializable
{

026

027

private static final long serialVersionUID
= 8533489548835413763L;

028

private static final Log
LOG = LogFactory.getLog(IPAddressStats.class);

029

private static final Pattern
SPACE = Pattern.compile("
");

030

private transient LookupService
lookupService;

031

private transient final String
geoIPFile;

032

033

public IPAddressStats(String
geoIPFile) {

034

this.geoIPFile
= geoIPFile;

035

try {

036

//
lookupService: get country code from a IP address

037

File
file = new File(this.geoIPFile);

038

LOG.info("GeoIP
file: " +
file.getAbsolutePath());

039

lookupService
= new AdvancedLookupService(file,
LookupService.GEOIP_MEMORY_CACHE);

040

} catch (IOException
e) {

041

throw new RuntimeException(e);

042

}

043

}

044

045

@SuppressWarnings("serial")

046

public void stat(String[]
args) {

047

JavaSparkContext
ctx = new JavaSparkContext(args[0], "IPAddressStats",

048

System.getenv("SPARK_HOME"),
JavaSparkContext.jarOfClass(IPAddressStats.class));

049

JavaRDD
lines = ctx.textFile(args[1], 1);

050

051

//
splits and extracts ip address filed

052

JavaRDD
words = lines.flatMap(new FlatMapFunction String>() {

053

@Override

054

public Iterable
call(String s) {

055

//
121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101
Firefox/11.0"

056

//
ip address

057

return Arrays.asList(SPACE.split(s)[0]);

058

}

059

});

060

061

//
map

062

JavaPairRDD Integer> ones = words.map(new PairFunction String, Integer>() {

063

@Override

064

public Tuple2 Integer> call(String s) {

065

return new Tuple2 Integer>(s, 1);

066

}

067

});

068

069

//
reduce

070

JavaPairRDD Integer> counts = ones.reduceByKey(new Function2 Integer, Integer>() {

071

@Override

072

public Integer
call(Integer i1, Integer i2) {

073

return i1
+ i2;

074

}

075

});

076

077

List Integer>> output = counts.collect();

078

079

//
sort statistics result by value

080

Collections.sort(output, new Comparator Integer>>() {

081

@Override

082

public int compare(Tuple2 Integer> t1, Tuple2 t2) {

083

if(t1._2
< t2._2) {

084

return 1;

085

} else if(t1._2
> t2._2) {

086

return -1;

087

}

088

return 0;

089

}

090

});

091

092

writeTo(args,
output);

093

094

}

095

096

private void writeTo(String[]
args, List> output) {

097

for (Tuple2 ?> tuple : output) {

098

Country
country = lookupService.getCountry((String) tuple._1);

099

LOG.info("[" +
country.getCode() + "]
" +
tuple._1 + "\t" +
tuple._2);

100

}

101

}

102

103

public static void main(String[]
args) {

104

//
./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStatsspark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat

105

if (args.length
< 3)
{

106

System.err.println("Usage:
IPAddressStats ");

107

System.err.println("
Example: org.shirdrn.spark.job.IPAddressStatsspark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat");

108

System.exit(1);

109

}

110

111

String
geoIPFile = args[2];

112

IPAddressStats
stats = new IPAddressStats(geoIPFile);

113

stats.stat(args);

114

115

System.exit(0);

116

117

}

118

119

}