Giter Club home page Giter Club logo

full-bigdata-docker's Introduction

基于Docker的大数据开发测试环境搭建及使用说明

1.基本软件环境介绍

1.1 软件版本

  • 操作系统: CentOS 6
  • Java环境: OpenJDK 8
  • 基于docker-compose管理镜像和容器,并进行集群的编排
工具 介绍
flume Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
kafka Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。
zookeeper ZooKeeper 是一个开源的分布式协调服务,由雅虎创建,是 Google Chubby 的开源实现。 分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协 调/通知、集群管理、Master 选举、配置维护,名字服务、分布式同步、分布式锁和分布式队列 等功能。
hadoop Hadoop是一个由Apache基金会所开发的分布式系统基础架构。
spark Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
hive hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。
hbase HBase是一个分布式的、面向列的开源数据库,HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。另一个不同的是HBase基于列的而不是基于行的模式。
yarn Apache Hadoop YARN (Yet Another Resource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。

1.2 镜像依赖关系

镜像依赖关系图 上图中,灰色的镜像(centos:6)为docker hub官方基础镜像。其它镜像(twinsen/hadoop:2.7.2等)都是在下层镜像的基础上实现的。这一镜像之间的依赖关系,决定了镜像的编译顺序.

2.使用方法简介

2.1 安装docker

具体安装方法请自行百度,安装完成后,在命令行下输入docker info进行测试,输出结果如下图所示,说明安装成功 docker安装测试结果

2.2 构建镜像

首先,下载工程文件( https://github.com/ruoyu-chen/hadoop-docker/archive/1.1.zip ),解压到任意目录下。 接下来,可以在工程根目录下(包含有docker-compose-build-all.yml文件),在系统命令行中,依次使用下列命令构建镜像:

  • 拉取MySQL 5.7 官方镜像

docker pull mysql:5.7

  • 拉取CentOS 6 官方镜像

docker pull centos:6

  • 拉取基本操作系统和OpenJDK环境,包含CentOS 6和OpenJDK 8

docker pull twinsen/os-jvm:centos6-openjdk8

  • 拉取Hadoop环境,包含Hadoop 2.7.2

docker pull twinsen/hadoop:2.7.2

  • 拉取Hive环境,包含Hive 2.1.1

docker pull twinsen/hive:2.1.1

  • 拉取Spark环境,包含Spark 2.1.0

docker pull twinsen/spark:2.1.0

2.3 环境准备

完成上一步的镜像编译工作后,在系统命令行中,可以使用docker images命令查看目前docker环境下的镜像,如下图所示: 查看docker本机镜像列表 为了方便使用,在工程根目录下放置了一个docker-compose.yml文件,这一文件中已经预先配置好了由3个slave节点和1个master节点组成的Spark集群。

在使用集群之前,需要先完成初始化


#[创建容器]
docker-compose up -d
#[格式化HDFS。第一次启动集群前,需要先格式化HDFS;以后每次启动集群时,都不需要再次格式化HDFS]
docker-compose exec spark-master hdfs namenode -format
#[初始化Hive数据库。仅在第一次启动集群前执行一次]
docker-compose exec spark-master schematool -dbType mysql -initSchema
#[将Spark相关的jar文件打包,存储在/code目录下,命名为spark-libs.jar]
docker-compose exec spark-master jar cv0f /code/spark-libs.jar -C /root/spark/jars/ .
#[启动HDFS]
docker-compose exec spark-master start-dfs.sh
#[在HDFS中创建/user/spark/share/lib/目录]
docker-compose exec spark-master hadoop fs -mkdir -p /user/spark/share/lib/
#[将/code/spark-libs.jar文件上传至HDFS下的/user/spark/share/lib/目录下]
docker-compose exec spark-master hadoop fs -put /code/spark-libs.jar /user/spark/share/lib/
#[关闭HDFS]
docker-compose exec spark-master stop-dfs.sh

2.4 启动及停止集群

下面简要介绍启动和关闭Spark集群的步骤(以下步骤均在命令行环境下完成,在工程根目录下执行)

  • 启动集群进程,依次执行:

#[启动HDFS]
docker-compose exec spark-master start-dfs.sh
#[启动YARN]
docker-compose exec spark-master start-yarn.sh
#[启动Spark]
docker-compose exec spark-master start-all.sh
  • 停止Spark集群,依次执行:

#[停止Spark]
docker-compose exec spark-master stop-all.sh
#[停止YARN]
docker-compose exec spark-master stop-yarn.sh
#[停止HDFS]
docker-compose exec spark-master stop-dfs.sh
#[停止容器]
docker-compose down

2.5 开发与测试过程中的集群使用方法

目前集群中采用的是1个master节点和3个slave节点的分配方案,可以通过调整docker-compose配置文件以及相应软件的配置文件来实现集群扩容,暂时无法做到自动化扩容。

编写程序可以使用任意的IDE和操作系统,程序编写完成后,打包为jar文件,然后放在工程根目录下的./volume/code/目录下。任何一个集群环境下,都会在集群启动时将code目录挂载在master节点的/code路径下。

如果要执行wordcount程序(在volume/code/tests/mapreduce-test目录下已经包含了)。在启动集群并启动各服务进程后。执行下列语句,可以进入master节点的命令行环境:

docker-compose exec spark-master /bin/bash

然后可以进入/code目录提交任务,完成计算。如下图所示: 命令行环境下提交任务

3.Docker相关

  • docker ps -a(查看所有的容器,包括已经停止的)
  • 重启:首先将容器启动起来,然后依次执行上面的脚本,docker-compose exec spark-master /bin/bash进入shell.
  • 查看挂载信息:docker inspect spark-master | grep volume

4.集群相关

(1) HDFS上传下载文件

  • 上传:hdfs dfs -put /code.yun.csv /user
  • 下载:hdfs dfs -get /user/yun.csv /code
  • 查看:dfs dfs -ls /user

(2) 在web ui中查看文件

(3)spark web ui

(4)yarn web ui

(5)使用idea打包spark

  • 首先需要注意的是jdk的版本一定要和这里的1.8.0对应
  • 使用maven构建项目,pom.xml文件
  • 求最大值:
package yun.mao

/**
  * @Classname MaxPrice
  * @Description TODO
  * @Date 19-3-18 下午2:34
  * @Created by mao<[email protected]>
  */
import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf,SparkContext}
object MaxPrice {
  def main(args: Array[String]){
    val conf = new SparkConf().setAppName("Max Price")
    val sc = new SparkContext(conf)

    sc.textFile(args(0))
      .map(_.split(","))
      .map(rec => ((rec(0).split("-"))(0).toInt, rec(1).toFloat))
      .reduceByKey((a,b) => Math.max(a,b))
      .saveAsTextFile(args(1))
  }
}

(6)使用spark-submit提交到集群

spark-submit --class yun.mao.MaxPrice	--master yarn	--deploy-mode cluster	yunmao.jar	hdfs://hadoop-master:54310/user/yun.csv	hdfs://hadoop-master:54310/user/mao.txt

(7)hive的使用(使用mysql作为metadata)

(8)hbase的使用

(9)spark shell统计行数

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

(10)使用mysql数据库

  • 进入mysql容器:docker exec -it mysql容器ID /bin/bash
# 在配置文件中查看mysql配置信息
  mysql:
    image: mysql:5.7
    volumes:
      - "./volume/mysql:/var/lib/mysql"
    container_name: mysql
    hostname: mysql
    networks:
      - spark
    environment:
      - MYSQL_ROOT_PASSWORD=hadoop
    tty: true

CREATE TABLE `people` (
  `name` varchar(150) NOT NULL,
  `user_id` int(11) NOT NULL,
  PRIMARY KEY  (`age`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `people` VALUES("mao",24);
INSERT INTO `people` VALUES("yun",24);

  • spark连接数据库
val sqlContext=new org.apache.spark.sql.SQLContext(sc)
// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
//首先使用docker inspect查看容器的ip
// 使用useSSL=false,其他参数分割使用&符号
val url ="jdbc:mysql://172.20.0.2:3306/test?useSSL=false&user=root&password=hadoop"
val df = sqlContext.read.format("jdbc").option("url", url).option("dbtable", "people").load()

// Looks the schema of this DataFrame.
df.printSchema()

// Counts people by age
val countsByAge = df.groupBy("age").count()
countsByAge.show()

// Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")

(11)机器学习的第一个例子

 val sqlContext=new org.apache.spark.sql.SQLContext(sc)
 import sqlContext.implicits._
 val df = Seq(
  (1,5),
  (1,6),
  (1,7),
  (1,8),
  (0,1),
  (0,2),
  (0,3),
  (0,4)
).toDF("label", "features")
import org.apache.spark.ml.classification.LogisticRegression
val model = lr.fit(df)
val weights = model.weights
model.transform(df).show()

(12)logistic regression完整案例戳这里

5.完整的pipeline

(1)zookeeper+flume+kafka

  • flume的监控方式
# 监控一个文件,导向kafka中
agent.sources=s1
agent.sinks=k1
agent.channels=c1

agent.sources.s1.type=exec
agent.sources.s1.command = tail -f /home/logdfs/log
agent.sources.s1.channels=c1
agent.sources.s1.shell = /bin/sh -c


agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100

agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.brokerList=kafka2:9093
agent.sinks.k1.topic=test0
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder

agent.sinks.k1.channel=c1



# 其他方式

agent.sources.s1.type = netcat 
agent.sources.s1.bind = localhost 
agent.sources.s1.port = 5678
agent.sources.s1.channels = c1 

agent.sinks.sk1.type = logger 
agent.sinks.sk1.channel = c1 


agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000 
agent.channels.c1.transactionCapacity = 100


agent.sources.s1.type = spooldir 
agent.sources.s1.spoolDir =/var/log
agent.sources.s1.fileHeader = true 
agent.sources.s1.channels = c1 
agent.sinks.sk1.type = logger 
agent.sinks.sk1.channel = c1
agent.channels.c1.type = memory 
agent.channels.c1.capacity = 10004 
agent.channels.c1.transactionCapacity = 100

  • zookeeper相关的配置
clientPort=2181
dataDir=/data
dataLogDir=/datalog
tickTime=2000
initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
maxClientCnxns=60
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
  • 测试kafka(kafka依赖与zookeeper,在docker-compose文件中已经进行了配置,必须在zookeeper服务启动起来的时候kafka才能够被启动起来)
# 创建topic
kafka-topics.sh --create --zookeeper zoo1:2181, zoo2:2181, zoo3:2181 --replication-factor 3 --partitions 3 --topic test
# 查看topic
kafka-topics.sh --describe --zookeeper zoo1:2181, zoo2:2181, zoo3:2181 --topic test
# 创建生产者
kafka-console-producer.sh --broker-list kafka1:9092 -topic test
# 创建消费者
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic test --from-beginning

(2)kafka+spark streaming

  • 导入相关的jars:spark-shell --jars
# 注意jar之间需要使用逗号进行分割
spark-shell --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar,kafka_2.12-2.1.1.jar,kafka-clients-2.1.1.jar 
  • spark-shell脚本
# 导入包
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.mllib.classification.SVMModel
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.SparkSession

# 配置
val kafkaParams = Map[String, Object]("bootstrap.servers" -> "kafka1:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"auto.offset.reset" -> "latest","group.id" -> "group_1","enable.auto.commit" -> (false: java.lang.Boolean))
val streamingContext = new StreamingContext(sc, Seconds(1))
val topics = Array("test0")
# 接受消息
val kafkaStream = KafkaUtils.createDirectStream[String, String](streamingContext,PreferConsistent,Subscribe[String, String](topics, kafkaParams))
# 分词
val words = kafkaStream.transform { rdd =>rdd.flatMap(record => (record.value().toString.split(" ")))}
# 打印
words.print()
# 启动
streamingContext.start() 
# 关闭
streamingContext.awaitTermination()
  • 执行结果(flume监控log文件,log文件增加一行触发,flume将增加的行输入到kafka,spark从kafka订阅了topic接收消息,spark streaming自动对接收到的行进行分词并打印)

(3)kafka消息订阅测试软件(Linux)

full-bigdata-docker's People

Contributors

damaohongtu avatar ruoyu-chen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.