[code]vim /etc/profile[/code]
1
2
3
4
5
export JAVA_HOME=/usr/java/jdk1.8.0_144
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export SCALA_HOME=/usr/local/scala-2.11.12
export PATH=$PATH:$SCALA_HOME/bin
[code]source /etc/profile[/code]二、配置zookeeper、maven环境
[code]wget http://mirrors.hust.edu.cn/apa ... ar.gz[/code]
tar -zxvf apache-maven-3.5.2-bin.tar.gz -C /usr/local
wget http://mirrors.hust.edu.cn/apa ... ar.gz
tar -zxvf zookeeper-3.4.11.tar.gz -C /usr/local
vim /etc/profile
1
2
export MAVEN_HOME=/usr/local/apache-maven-3.5.2
export PATH=$PATH:$MAVEN_HOME/bin
[code]source /etc/profile[/code]
[code]cp /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.11/conf/zoo.cfg[/code]
[code]/usr/local/zookeeper-3.4.11/bin/zkServer.sh start[/code]
[code]wget http://mirrors.hust.edu.cn/apa ... ar.gz[/code]
tar -zxvf hadoop-2.9.0.tar.gz -C /usr/local
vim /etc/profile
1
2
export HADOOP_HOME=/usr/local/hadoop-2.9.0
export PATH=$PATH:$HADOOP_HOME/bin
[code]source /etc/profile[/code]
[code]cd /usr/local/hadoop-2.9.0/etc/hadoop[/code]
[code]mkdir /opt/data/hadoop[/code]
[code]wget http://mirrors.hust.edu.cn/apa ... 0.tgz[/code]
tar -zxvf kafka_2.11-1.0.0.tgz -C /usr/local
vim /etc/profile
1
2
export KAFKA_HOME=/usr/local/kafka_2.11-1.0.0
export PATH=$KAFKA_HOME/bin:$PATH
[code]source /etc/profile[/code]
[code]kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties[/code]
[code]kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic testTopic[/code]
kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic
1
2
Topic:testTopic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: testTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
[code]wget http://mirrors.hust.edu.cn/apa ... ar.gz[/code]
tar -zxvf hbase-1.2.6-bin.tar.gz -C /usr/local/
vim /etc/profile
1
2
export HBASE_HOME=/usr/local/hbase-1.2.6
export PATH=$PATH:$HBASE_HOME/bin
[code]source /etc/profile[/code]
[code]cd /usr/local/hbase-1.2.6/conf[/code]
vim hbase-env.sh
1
2
3
4
5
6
export JAVA_HOME=/usr/local/java/jdk1.8.0_144/
HBASE_CLASSPATH=/usr/local/hbase-1.2.6/conf
export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=256m -XX:MaxPermSize=1024m"
export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=256m -XX:MaxPermSize=1024m"
export HBASE_PID_DIR=/opt/data/hbase
export HBASE_MANAGES_ZK=false
[code]vim hbase-site.xml[/code]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://tsk1:9000/hbase</value>
</property>
<property>
<name>hbase.master</name>
<value>tsk1:60000</value>
</property>
<property>
<name>hbase.master.port</name>
<value>60000</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>192.168.70.135</value>
</property>
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/data/zookeeper</value>
</property>
<property>
<name>hbase.master.info.bindAddress</name>
<value>tsk1</value>
</property>
</configuration>
[code]hbase shell[/code]
hbase(main):001:0>create 'myTestTable','info'
0 row(s) in 2.2460 seconds
=> Hbase::Table - myTestTable
hbase(main):003:0>list
TABLE
testTable
1 row(s) in 0.1530 seconds
=> ["myTestTable"]
[code]wget http://mirrors.hust.edu.cn/apa ... 7.tgz[/code]
tar -zxvf spark-2.2.1-bin-hadoop2.7.tgz -C /usr/local
vim /etc/profile
1
2
export SPARK_HOME=/usr/local/spark-2.2.1-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
[code]source /etc/profile[/code]七、测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class HBaseHelper {
private static HBaseHelper ME;
private static Configuration config;
private static Connection conn;
private static HBaseAdmin admin;
public static HBaseHelper getInstances() {
if (null == ME) {
ME = new HBaseHelper();
config = HBaseConfiguration.create();
config.set("hbase.rootdir", "hdfs://tsk1:9000/hbase");
config.set("hbase.zookeeper.quorum", "tsk1");
config.set("hbase.zookeeper.property.clientPort", "2181");
config.set("hbase.defaults.for.version.skip", "true");
}
if (null == conn) {
try {
conn = ConnectionFactory.createConnection(config);
admin = new HBaseAdmin(config);
} catch (IOException e) {
e.printStackTrace();
}
}
return ME;
}
public Table getTable(String tableName) {
Table table = null;
try {
table = conn.getTable(TableName.valueOf(tableName));
} catch (Exception ex) {
ex.printStackTrace();
}
return table;
}
public void putAdd(String tableName, String rowKey, String cf, String column, Long value) {
Table table = this.getTable(tableName);
try {
table.incrementColumnValue(rowKey.getBytes(), cf.getBytes(), column.getBytes(), value);
System.out.println("OK!");
} catch (IOException e) {
e.printStackTrace();
}
}
//......以下省略
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.test.spark.spark_test;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaRecHbase {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
Logger.getLogger("org").setLevel(Level.ERROR);
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("kafkaRecHbase");
sparkConf.setMaster("local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = args[2].split(",");
for (String topic : topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> kafkaStream =
KafkaUtils.createStream(ssc, args[0], args[1], topicMap);
JavaDStream<String> lines = kafkaStream.map(Tuple2::_2);
JavaDStream<String> lineStr = lines.map(line -> {
if (null == line || line.equals("")) {
return "";
}
String[] strs = SPACE.split(line);
if (strs.length < 1) {
return "";
}
try {
for (String str : strs) {
HBaseHelper.getInstances().putAdd("myTestTable", str, "info", "wordCunts", 1l);
}
return "strs:" + line;
} catch (Exception ex) {
System.out.println(line);
return "报错了:" + ex.getMessage();
}
});
lineStr.print();
ssc.start();
System.out.println("spark 启动!!!");
ssc.awaitTermination();
}
}
[code]spark-submit --jars $(echo /usr/local/hbase-1.2.6/lib/*.jar | tr ' ' ',') --class com.test.spark.spark_test.KafkaRecHbase --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 /opt/FileTemp/streaming/spark-test-0.1.1.jar tsk1:2181 test testTopic 1[/code]
[code]wget http://mirrors.hust.edu.cn/apa ... ar.gz[/code]
tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local
vim /etc/profile
1
2
export FLUME_HOME=/usr/local/apache-flume-1.8.0-bin/
export PATH=$FLUME_HOME/bin:$PATH
[code]source /etc/profile[/code]
[code]vim nginxStreamingKafka.conf[/code]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
agent1.sources=r1
agent1.channels=logger-channel
agent1.sinks=kafka-sink
agent1.sources.r1.type=exec
agent1.sources.r1.deserializer.outputCharset= UTF-8
agent1.sources.r1.command=tail -F /opt/data/nginxLog/nginxLog.log
agent1.channels.logger-channel.type=memory
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = flumeKafka
agent1.sinks.kafka-sink.brokerList = tsk1:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20
agent1.sources.r1.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel
[code]flume-ng agent --name agent1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/nginxStreamingKafka.conf -Dflume.root.logger=INFO,console[/code]
本文为 @ 21CTO 创作并授权 21CTO 发布,未经许可,请勿转载。
内容授权事宜请您联系 webmaster@21cto.com或关注 21CTO 公众号。
该文观点仅代表作者本人,21CTO 平台仅提供信息存储空间服务。