17611538698
webmaster@21cto.com

基于Hadoop生态SparkStreaming的大数据实时流处理平台的搭建

资讯 0 2415 2018-05-28 12:02:45
基于Hadoop生态SparkStreaming的大数据实时流处理平台的搭建
 发表于 2018-01-26 |  阅读次数 12627
 字数统计 2,687 字 |  阅读时长 13 分钟
 随着公司业务发展,对大数据的获取和实时处理的要求就会越来越高,日志处理、用户行为分析、场景业务分析等等,传统的写日志方式根本满足不了业务的实时处理需求,所以本人准备开始着手改造原系统中的数据处理方式,重新搭建一个实时流处理平台,主要是基于Hadoop生态,利用Kafka作为中转,SparkStreaming框架实时获取数据并清洗,将结果多维度的存储进HBase数据库。
 整个平台大致的框架如下:
 /uploads/fox/28140544_0.png
 操作系统:Centos7
 用到的框架:
 1. Flume1.8.0
 2. Hadoop2.9.0
 3. kafka2.11-1.0.0
 4. Spark2.2.1
 5. HBase1.2.6
 6. ZooKeeper3.4.11
 7. maven3.5.2
 整体的开发环境是基于JDK1.8以上以及Scala,所以得提前把java和Scala的环境给准备好,接下来就开始着手搭建基础平台:一、配置开发环境
 下载并解压JDK1.8,、下载并解压Scala,配置profile文件:
[code]vim /etc/profile
[/code]
1
2
3
4
5
export JAVA_HOME=/usr/java/jdk1.8.0_144
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export SCALA_HOME=/usr/local/scala-2.11.12
export PATH=$PATH:$SCALA_HOME/bin
[code]source /etc/profile
[/code]二、配置zookeeper、maven环境
 下载并解压zookeeper以及maven并配置profile文件
[code]wget http://mirrors.hust.edu.cn/apa ... ar.gz
tar -zxvf apache-maven-3.5.2-bin.tar.gz -C /usr/local
wget http://mirrors.hust.edu.cn/apa ... ar.gz
tar -zxvf zookeeper-3.4.11.tar.gz -C /usr/local
vim /etc/profile
[/code]
1
2
export MAVEN_HOME=/usr/local/apache-maven-3.5.2
export PATH=$PATH:$MAVEN_HOME/bin
[code]source /etc/profile
[/code]
 zookeeper的配置文件配置一下:
[code]cp /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.11/conf/zoo.cfg
[/code]
 然后配置一下zoo.cfg里面的相关配置,指定一下dataDir目录等等
 启动zookeeper:
[code]/usr/local/zookeeper-3.4.11/bin/zkServer.sh start
[/code]
 如果不报错,jps看一下是否启动成功三、安装配置Hadoop
 Hadoop的安装配置在之前文章中有说过(传送门),为了下面的步骤方便理解,这里只做一个单机版的简单配置说明:
 下载hadoop解压并配置环境:
[code]wget http://mirrors.hust.edu.cn/apa ... ar.gz
tar -zxvf hadoop-2.9.0.tar.gz -C /usr/local
vim /etc/profile
[/code]
1
2
export HADOOP_HOME=/usr/local/hadoop-2.9.0
export PATH=$PATH:$HADOOP_HOME/bin
[code]source /etc/profile
[/code]
 配置hadoop 进入/usr/local/hadoop-2.9.0/etc/hadoop目录
[code]cd /usr/local/hadoop-2.9.0/etc/hadoop
[/code]
 首先配置hadoop-env.sh、yarn-env.sh,修改JAVA_HOME到指定的JDK安装目录/usr/local/java/jdk1.8.0_144
 创建hadoop的工作目录
[code]mkdir /opt/data/hadoop
[/code]
 编辑core-site.xml、hdfs-site.xml、yarn-site.xml等相关配置文件,具体配置不再阐述请看前面的文章,配置完成之后记得执行hadoop namenode -format,否则hdfs启动会报错,启动完成后不出问题浏览器访问50070端口会看到hadoop的页面。四、安装配置kafka
 还是一样,先下载kafka,然后配置:
[code]wget http://mirrors.hust.edu.cn/apa ... 0.tgz
tar -zxvf kafka_2.11-1.0.0.tgz -C /usr/local
vim /etc/profile
[/code]
1
2
export KAFKA_HOME=/usr/local/kafka_2.11-1.0.0
export PATH=$KAFKA_HOME/bin:$PATH
[code]source /etc/profile
[/code]
 进入kafka的config目录,配置server.properties,指定log.dirs和zookeeper.connect参数;配置zookeeper.properties文件中zookeeper的dataDir,配置完成后启动kafka
[code]kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
[/code]
 可以用jps查看有没有kafka进程,然后测试一下kafka是否能够正常收发消息,开两个终端,一个用来做producer发消息一个用来做consumer收消息,首先,先创建一个topic
[code]kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic testTopic
kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic
[/code]
如果不出一下会看到如下输出:
1
2
Topic:testTopic	PartitionCount:1	ReplicationFactor:1	Configs:
Topic: testTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0

 
然后在第一个终端中输入命令:
kafka-console-producer.sh –broker-list localhost:9092 –topic testTopic
在第二个终端中输入命令:
kafka-console-consumer.sh –zookeeper 127.0.0.1:2181 –topic testTopic
如果启动都正常,那么这两个终端将进入阻塞监听状态,在第一个终端中输入任何消息第二个终端都将会接收到。五、安装配置HBase
 下载并解压HBase:
[code]wget http://mirrors.hust.edu.cn/apa ... ar.gz
tar -zxvf hbase-1.2.6-bin.tar.gz -C /usr/local/
vim /etc/profile
[/code]
1
2
export HBASE_HOME=/usr/local/hbase-1.2.6
export PATH=$PATH:$HBASE_HOME/bin
[code]source /etc/profile
[/code]
 修改hbase下的配置文件,首先修改hbase-env.sh,主要修改JAVA_HOME以及相关参数,这里要说明一下HBASE_MANAGES_ZK这个参数,因为采用了自己的zookeeper,所以这里设置为false,否则hbase会自己启动一个zookeeper
[code]cd /usr/local/hbase-1.2.6/conf
vim hbase-env.sh
[/code]
1
2
3
4
5
6
export JAVA_HOME=/usr/local/java/jdk1.8.0_144/
HBASE_CLASSPATH=/usr/local/hbase-1.2.6/conf
export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=256m -XX:MaxPermSize=1024m"
export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=256m -XX:MaxPermSize=1024m"
export HBASE_PID_DIR=/opt/data/hbase
export HBASE_MANAGES_ZK=false

 然后修改hbase-site.xml,我们设置hbase的文件放在hdfs中,所以要设置hdfs地址,其中tsk1是我安装hadoop的机器的hostname,hbase.zookeeper.quorum参数是安装zookeeper的地址,这里的各种地址最好用机器名
[code]vim hbase-site.xml
[/code]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://tsk1:9000/hbase</value>
</property>
<property>
<name>hbase.master</name>
<value>tsk1:60000</value>
</property>
<property>
<name>hbase.master.port</name>
<value>60000</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>192.168.70.135</value>
</property>
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/data/zookeeper</value>
</property>
<property>
<name>hbase.master.info.bindAddress</name>
<value>tsk1</value>
</property>
</configuration>

 配置完成后启动hbase,输入命令:
start-hbase.sh
 完成后查看日志没有报错的话测试一下hbase,用hbase shell进行测试:
[code]hbase shell
hbase(main):001:0>create 'myTestTable','info'
0 row(s) in 2.2460 seconds
=> Hbase::Table - myTestTable
hbase(main):003:0>list
TABLE
testTable
1 row(s) in 0.1530 seconds

=> ["myTestTable"]
[/code]
 至此,hbase搭建成功,访问以下hadoop的页面,查看file system(菜单栏Utilities->Browse the file system),这时可以看见base的相关文件已经载hadoop的文件系统中。
/uploads/fox/28140544_1.png六、安装spark
 下载spark并解压
[code]wget http://mirrors.hust.edu.cn/apa ... 7.tgz
tar -zxvf spark-2.2.1-bin-hadoop2.7.tgz -C /usr/local
vim /etc/profile
[/code]
1
2
export SPARK_HOME=/usr/local/spark-2.2.1-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
[code]source /etc/profile
[/code]七、测试
 至此,环境基本搭建完成,以上搭建的环境仅是服务器生产环境的一部分,涉及服务器信息、具体调优信息以及集群的搭建就不写在这里了,下面我们写一段代码整体测试一下从kafka生产消息到spark streaming接收到,然后处理消息并写入HBase。先写一个HBase的连接类HBaseHelper:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class HBaseHelper {
private static HBaseHelper ME;
private static Configuration config;
private static Connection conn;
private static HBaseAdmin admin;

public static HBaseHelper getInstances() {
if (null == ME) {
ME = new HBaseHelper();
config = HBaseConfiguration.create();
config.set("hbase.rootdir", "hdfs://tsk1:9000/hbase");
config.set("hbase.zookeeper.quorum", "tsk1");
config.set("hbase.zookeeper.property.clientPort", "2181");
config.set("hbase.defaults.for.version.skip", "true");
}
if (null == conn) {
try {
conn = ConnectionFactory.createConnection(config);
admin = new HBaseAdmin(config);
} catch (IOException e) {
e.printStackTrace();
}
}
return ME;
}

public Table getTable(String tableName) {
Table table = null;
try {
table = conn.getTable(TableName.valueOf(tableName));
} catch (Exception ex) {
ex.printStackTrace();
}

return table;
}

public void putAdd(String tableName, String rowKey, String cf, String column, Long value) {
Table table = this.getTable(tableName);
try {
table.incrementColumnValue(rowKey.getBytes(), cf.getBytes(), column.getBytes(), value);
System.out.println("OK!");
} catch (IOException e) {
e.printStackTrace();
}

}
//......以下省略
}

 再写一个测试类KafkaRecHbase用来做spark-submit提交
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.test.spark.spark_test;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class KafkaRecHbase {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {
Logger.getLogger("org").setLevel(Level.ERROR);
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("kafkaRecHbase");
sparkConf.setMaster("local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = args[2].split(",");
for (String topic : topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(ssc, args[0], args[1], topicMap);
JavaDStream<String> lines = kafkaStream.map(Tuple2::_2);
JavaDStream<String> lineStr = lines.map(line -> {
if (null == line || line.equals("")) {
return "";
}
String[] strs = SPACE.split(line);
if (strs.length < 1) {
return "";
}
try {
for (String str : strs) {
HBaseHelper.getInstances().putAdd("myTestTable", str, "info", "wordCunts", 1l);
}
return "strs:" + line;
} catch (Exception ex) {
System.out.println(line);
return "报错了:" + ex.getMessage();
}
});
lineStr.print();
ssc.start();
System.out.println("spark 启动!!!");
ssc.awaitTermination();
}
}

 编译提交到服务器,执行命令:
[code]spark-submit --jars $(echo /usr/local/hbase-1.2.6/lib/*.jar | tr ' ' ',') --class com.test.spark.spark_test.KafkaRecHbase --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 /opt/FileTemp/streaming/spark-test-0.1.1.jar tsk1:2181 test testTopic 1
[/code]
 没报错的话执行kafka的producer,输入几行数据在HBase内就能看到结果了!八、装一个Flume实时采集Nginx日志写入Kafka
 Flume是一个用来日志采集的框架,安装和配置都比较简单,可以支持多个数据源和输出,具体可以参考Flume的文档,写的比较全传送门
 下载Flume并配置环境
[code]wget http://mirrors.hust.edu.cn/apa ... ar.gz
tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local
vim /etc/profile
[/code]
1
2
export FLUME_HOME=/usr/local/apache-flume-1.8.0-bin/
export PATH=$FLUME_HOME/bin:$PATH
[code]source /etc/profile
[/code]
 写一个Flume的配置文件在flume的conf目录下:
[code]vim nginxStreamingKafka.conf
[/code]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
agent1.sources=r1
agent1.channels=logger-channel
agent1.sinks=kafka-sink

agent1.sources.r1.type=exec
agent1.sources.r1.deserializer.outputCharset= UTF-8
agent1.sources.r1.command=tail -F /opt/data/nginxLog/nginxLog.log

agent1.channels.logger-channel.type=memory

agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = flumeKafka
agent1.sinks.kafka-sink.brokerList = tsk1:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20

agent1.sources.r1.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

 kafka创建一个名为flumeKafka的topic用来接收,然后启动flume:
[code]flume-ng agent --name agent1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/nginxStreamingKafka.conf -Dflume.root.logger=INFO,console
[/code]
 如果没有报错,Flume将开始采集opt/data/nginxLog/nginxLog.log中产生的日志并实时推送给kafka,再按照上面方法写一个spark streaming的处理类进行相应的处理就好。
 OK!全部搞定,然而~~~~就这样就搞定了?NO!!!这只是万里长征的第一步!呵呵!

 

评论