大数据项目实战之新闻话题的实时统计分析-情缘小电影

大数据项目实战之新闻话题的实时统计分析

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
日志服务 SLS,月写入数据量 50GB 1个月
简介: 本文讲解一个完整的企业级大数据项目实战,实时|离线统计分析用户的搜索话题,并用酷炫的前端界面展示出来。这些指标对网站的精准营销、运营都有极大帮助。

前言:本文是一个完整的大数据项目实战,实时|离线统计分析用户的搜索话题,并用酷炫的前端界面展示出来。这些指标对网站的精准营销、运营都有极大帮助。架构大致是按照企业标准来的,从日志的采集、转化处理、实时计算、JAVA后台开发、WEB前端展示,一条完整流程线下来,甚至每个节点都用的高可用架构,都考虑了故障转移和容错性。所用到的框架包括:Hadoop(HDFS+MapReduce+Yarn)+Flume+KafKa+Hbase+Hive+Spark(SQL、Structured Streaming )+Hue+Mysql+SpringMVC+Mybatis+Websocket+AugularJs+Echarts。所涉及到的语言包括:JAVA、Scala、Shell
由于本文并非零基础教学,所以只讲架构和流程,基础性知识自行查缺补漏。Github已经上传完整项目代码:liuyanling41-Github

最终效果图如下:

view

项目架构图如下:

_

环境准备

image

模拟网站实时产生日志信息

  • 获取数据源,本文是利用搜狗的数据:搜狗实验室
  • 编写java类模拟实时采集网站日志。主要利用Java中的输入输出流。写好后打成jar包传到服务器上
public class ReadWebLog {

    private static String readFileName;
    private static String writeFileName;

    public static void main(String args[]) {
        readFileName = args[0];
        writeFileName = args[1];
        readFile(readFileName);

    }

    public static void readFile(String fileName) {

        try {
            FileInputStream fis = new FileInputStream(fileName);
            InputStreamReader isr = new InputStreamReader(fis, "GBK");
            //以上两步已经可以从文件中读取到一个字符了,但每次只读取一个字符不能满足大数据的需求。故需使用BufferedReader,它具有缓冲的作用,可以一次读取多个字符
            BufferedReader br = new BufferedReader(isr);
            int count = 0;
            while (br.readLine() != null) {
                String line = br.readLine();
                count++;
                // 显示行号
                Thread.sleep(300);
                String str = new String(line.getBytes("UTF8"), "GBK");
                System.out.println("row:" + count + ">>>>>>>>" + line);
                writeFile(writeFileName, line);
            }
            isr.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    public static void writeFile(String fileName, String conent) {
        try {
            FileOutputStream fos = new FileOutputStream(fileName, true);
            OutputStreamWriter osw = new OutputStreamWriter(fos);
       
            BufferedWriter bw = new BufferedWriter(osw);
            bw.write("\n");
            bw.write(conent);
            bw.close();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

image

  • 编写采集日志的shell脚本
    vim weblog.sh
#/bin/bash
echo "start log"
java -jar /home/weblog.jar /usr/local/weblog.log /home/weblogs.log
  • 运行效果图_

Flume Agent2采集日志信息

主要通过设置Source、Channel、Sink来完成日志采集。

  • 配置flume配置文件 vim agent2.conf
a2.sources = r2
a2.channels = c2
a2.sinks = k2

a2.sources.r2.type = exec
#来源于weblogs.log文件
a2.sources.r2.command = tail -F /home/weblogs.log
a2.sources.r2.channels = c2

a2.channels.c2.type = memory
a2.channels.c2.capacity = 10000
a2.channels.c2.transactionCapacity = 100
a2.channels.c2.keep-alive = 10

a2.sinks.k2.type = avro
a2.sinks.k2.channel = c2
# 落地点是master机器的5555端口(主机名和端口号都必须与master机器的flume配置保持一致)
a2.sinks.k2.hostname = master
a2.sinks.k2.port = 5555
  • 编写shell脚本,方便运行。vim flume.sh
#/bin/bash
echo "flume agent2 start"
bin/flume-ng agent --conf conf --name a2 --conf-file conf/agent2.conf -Dflume.root.logger=INFO,console
  • 运行的时候直接 ./flume.sh 即可

Flume Agent3采集日志信息

各方面配置都和Agent2完全一样、省略。

Flume Agent1整合日志信息

  • vim agent1.conf
#Flume Agent1实时整合日志信息

a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaS hbaseS

# flume + hbase
a1.sources.r1.type = avro
a1.sources.r1.channels = kafkaC hbaseC
a1.sources.r1.bind = master
a1.sources.r1.port = 5555

a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000


a1.sinks.hbaseS.type = asynchbase
a1.sinks.hbaseS.table = weblogs
a1.sinks.hbaseS.columnFamily = info
a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.hbaseS.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
a1.sinks.hbaseS.channel = hbaseC

# flume + kafka
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000

a1.sinks.kafkaS.channel = kafkaC
a1.sinks.kafkaS.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaS.topic = weblogs
a1.sinks.kafkaS.brokerList = master:9092,slave1:9092,slave2:9092
a1.sinks.kafkaS.zookeeperConnect = master:2181,slave1:2181,slave2:2181
a1.sinks.kafkaS.requiredAcks = 1
a1.sinks.kafkaS.batchSize = 20
a1.sinks.kafkaS.serializer.class = kafka.serializer.StringEncoder
  • vim flume.sh
#/bin/bash
echo "flume agent1 start"
bin/flume-ng agent --conf conf --name a1 --conf-file conf/agent1.conf -Dflume.root.logger=INFO,console

具体讲解如下:

Flume与Hbase的集成

  • 通过查看官方文档可知,Flume与Hbase的集成主要需要如下参数,表名、列簇名、以及Java类SimpleAsyncHbaseEventSerializer。

Flume官网

  • 改写SimpleAsyncHbaseEventSerializer
    下载Flume源码,需要改写如下两个Java类.

Flume源码
image
image

  • 打成jar包,上传到linux服务器中替换原有flume目录的该jar包
    image

image

  • Flume配置文件配置Sink为Hbase
a1.sinks.hbaseS.type = asynchbase
a1.sinks.hbaseS.table = weblogs
a1.sinks.hbaseS.columnFamily = info
a1.sinks.hbaseS.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.hbaseS.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl
a1.sinks.hbaseS.channel = hbaseC

Flume与Kafka的集成

  • Flume配置文件:主要配置topic、brokerlist:

image

a1.sinks.kafkaS.channel = kafkaC
a1.sinks.kafkaS.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaS.topic = weblogs
a1.sinks.kafkaS.brokerList = master:9092,slave1:9092,slave2:9092
a1.sinks.kafkaS.zookeeperConnect = master:2181,slave1:2181,slave2:2181
a1.sinks.kafkaS.requiredAcks = 1
a1.sinks.kafkaS.batchSize = 20
a1.sinks.kafkaS.serializer.class = kafka.serializer.StringEncoder
  • 编写kafka消费端脚本,消费从flume传过来的信息。
    vim flume.sh
#/bin/bash
echo "flume agent1 start"
bin/kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic weblogs  --from-beginning
  • 运行效果图

kafka_flume_

Kafka与Spark集成完成数据实时处理

这里我选择的是2.2版本中的StructuredStreaming,因为它相比SparkStreaming而言有很多优势,它的出现重点就是解决端到端的精确一次语义,保证数据的不丢失不重复,这对于流式计算极为重要。StructuredStreaming的输入源为kafka,spark对来自kafka的数据进行计算,主要就是累加话题量和访问量。具体代码参考github。

    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("streaming").getOrCreate()

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "master:9092")
      .option("subscribe", "weblogs")
      .load()

    import spark.implicits._
    val lines = df.selectExpr("CAST(value AS STRING)").as[String]
    val weblog = lines.map(_.split(",")).map(x => Weblog(x(0), x(1), x(2), x(3), x(4), x(5)))
    val titleCount = weblog.groupBy("searchname").count().toDF("titleName", "webcount")

Spark与Mysql集成

这里选择Mysql是因为,我们的需求只是报表展示,需要在前台展示的字段并不多,关系型数据库完全能够支撑。在Hbase里有几百万条数据(一个浏览话题可能有十几万人搜索过,也就是说一个话题就有十几万条数据,这么大量数据当然要存在Hbase中),而经过spark的计算,这十几万条数据在mysql中就变成了一条数据(XXX话题,XXX浏览量)。
如果业务需求变了,我需要实时查询用户各种信息(数据量很大,字段很多),那么当然就是实时的直接从Hbase里查,而不会在Mysql中。
所以企业中要根据不同的业务需求,充分考虑数据量等问题,进行架构的选择。

    val url = "jdbc:mysql://master:3306/weblog?useSSL=false"
    val username = "root"
    val password = "123456"

    val writer = new JdbcSink(url, username, password)
    val weblogcount = titleCount.writeStream
      .foreach(writer)
      .outputMode("update")
      .start()

    weblogcount.awaitTermination()
 

离线分析:HIVE集成HBASE。

我们知道Hive是一个数据仓库,主要就是转为MapReduce完成对大量数据的离线分析和决策。之前我们已经用Flume集成Hbase,使得Hbase能源源不断的插入数据。那么我们直接将HIVE集成HBase,这样只要Hbase有数据了,那Hive表也就有数据了。怎么集成呢?很简单,用【外部表】就搞定了。

CREATE EXTERNAL TABLE `weblogs`(
  `id` string COMMENT 'from deserializer', 
  `datatime` string COMMENT 'from deserializer', 
  `userid` string COMMENT 'from deserializer', 
  `searchname` string COMMENT 'from deserializer', 
  `retorder` string COMMENT 'from deserializer', 
  `cliorder` string COMMENT 'from deserializer', 
  `cliurl` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.hbase.HBaseSerDe' 
STORED BY 
  'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
WITH SERDEPROPERTIES ( 
  'hbase.columns.mapping'=':key,info:datatime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl', 
  'serialization.format'='1')
TBLPROPERTIES (
  'COLUMN_STATS_ACCURATE'='false', 
  'hbase.table.name'='weblogs', 
  'numFiles'='0', 
  'numRows'='-1', 
  'rawDataSize'='-1', 
  'totalSize'='0', 
  'transient_lastDdlTime'='1518778031')

验证一下HBASE和HIVE是不是同步的:

image
image

好了现在我们可以在Hive中尽情的离线分析和决策了~~~

SpringMVC+Mybatis完成对mysql数据的查询

个人觉得传统JDBC实在是太笨重,还是最喜欢Spring整合Mybatis对数据库进行操作。这里主要完成的操作就是对mysql的数据进行查询。详情请参考github,地址文章开头已给出。
image

WebSocket实现全双工通信

既然要实现客户端实时接收服务器端的消息,而服务器端又实时接收客户端的消息,必不可少的就是WebSocket了,WebSocket实现了浏览器与服务器全双工通信(full-duple),能更好的节省服务器资源和带宽并达到实时通讯。WebSocket用HTTP握手之后,服务器和浏览器就使用这条HTTP链接下的TCP连接来直接传输数据,抛弃了复杂的HTTP头部和格式。一旦WebSocket通信连接建立成功,就可以在全双工模式下在客户端和服务器之间来回传送WebSocket消息。即在同一时间、任何方向,都可以全双工发送消息。WebSocket 核心就是OnMessage、OnOpen、OnClose,本项目使用的是和Spring集成的方式,因此需要有configurator = SpringConfigurator.class。

@ServerEndpoint(value = "/websocket", configurator = SpringConfigurator.class)
public class WebSocket {
    @Autowired
    private WebLogService webLogService;
    @OnMessage
    public void onMessage(String message, Session session) throws IOException, InterruptedException {
        String[] titleNames = new String[10];
        Long[] titleCounts = new Long[10];
        Long[] titleSum = new Long[1];
        while (true) {
            Map<String, Object> map = new HashMap<String, Object>();
            List<WebLogBO> list = webLogService.webcount();
            System.out.print(list);
            for (int i = 0; i < list.size(); i++) {
                titleNames[i] = list.get(i).getTitleName();
                titleCounts[i] = list.get(i).getWebcount();
            }
            titleSum[0] = webLogService.websum();
            map.put("titleName", titleNames);
            map.put("titleCount", titleCounts);
            map.put("titleSum", titleSum);
            System.out.print(map);
            session.getBasicRemote().sendText(JSON.toJSONString(map));
            Thread.sleep(1000);
            map.clear();
        }
    }

    @OnOpen
    public void onOpen() {
        System.out.println("Client connected");
    }

    @OnClose
    public void onClose() {
        System.out.println("Connection closed");
    }
}

Echarts完成前端界面展示

大家可以看到开头给出的项目效果图还是蛮漂亮的,其实非常简单,就是用的Echarts这个框架。直接给它传值就ok了,其他前端那些事它都给你搞定了。详情请参考github,地址文章开头已给出。

        function webcount(json) {
            var option = {
                title: {
                    text: '搜狗新闻热点实时统计',
                    subtext: '作者:刘彦伶'
                },
                tooltip: {
                    trigger: 'axis',
                    axisPointer: {
                        type: 'shadow'
                    }
                },
                legend: {
                    data: ['浏览量']
                },
                grid: {
                    left: '3%',
                    right: '4%',
                    bottom: '3%',
                    containLabel: true
                },
                xAxis: {
                    type: 'value',
                    boundaryGap: [0, 0.01]
                },
                yAxis: {
                    type: 'category',
                    data: json.titleName
                },
                series: [
                    {
                        name: '浏览量',
                        type: 'bar',
                        data: json.titleCount
                    },

                ]
            };
            countchart.setOption(option);
        }

本文讲解的比较粗糙,有很多细节的东西,毕竟一整个项目不可能用一篇文章说清楚。。。所以实践的东西需要读者自己去领悟,但是架构、环境搭建、方法、流程还是很有参考价值的!

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
SQL JSON 分布式计算
【大数据学习篇10】Spark项目实战~网站转化率统计
【大数据学习篇10】Spark项目实战~网站转化率统计
493 0
【大数据学习篇10】Spark项目实战~网站转化率统计
|
人工智能 Cloud Native 安全
重磅嘉宾畅聊大数据&AI开源话题,零距离感受激荡开源江湖
「开源人说」第四期——大数据& AI专场在今年云栖大会举办,阿里巴巴开源委员会大数据AI领域副主席王峰和阿里云AI开源项目EasyRec负责人施兴现场分享热门开源项目背后的故事。开源中国创始人&CTO红薯,白鲸开源联合创始人代立冬,浙大博导赵俊博,InfoQ总编辑王一鹏、Apache软件基金会成员李钰等嘉宾圆桌共话,对开源热点及痛点问题展开激烈讨论。
195137 36
重磅嘉宾畅聊大数据&AI开源话题,零距离感受激荡开源江湖
|
存储 分布式计算 数据可视化
【大数据学习篇12】 Spark项目实战-数据可视化(三)
【大数据学习篇12】 Spark项目实战-数据可视化
381 0
|
分布式计算 数据可视化 Java
【大数据学习篇12】 Spark项目实战-数据可视化(二)
【大数据学习篇12】 Spark项目实战-数据可视化
711 0
|
SQL 分布式计算 数据可视化
【大数据学习篇12】 Spark项目实战-数据可视化(一)
【大数据学习篇12】 Spark项目实战-数据可视化
474 0
|
分布式计算 大数据 关系型数据库
【大数据学习篇6】 Spark操作统计分析数据操作(二)
【大数据学习篇6】 Spark操作统计分析数据操作
80 0
|
分布式计算 大数据 Spark
【大数据学习篇6】 Spark操作统计分析数据操作(一)
【大数据学习篇6】 Spark操作统计分析数据操作
79 0
|
人工智能 Cloud Native 安全
「开源人说」|大咖齐聚首,大数据&AI开源话题对碰
「开源人说」第四期——大数据& AI专场在今年云栖大会举办,阿里巴巴开源委员会大数据AI领域副主席王峰和阿里云AI开源项目EasyRec负责人施兴现场分享热门开源项目背后的故事。开源中国创始人&CTO红薯,白鲸开源联合创始人代立冬,浙大博导赵俊博,InfoQ总编辑王一鹏、Apache软件基金会成员李钰等嘉宾圆桌共话,对开源热点及痛点问题展开激烈讨论。
138902 5
「开源人说」|大咖齐聚首,大数据&AI开源话题对碰
|
分布式计算 Java Hadoop
flink hadoop 从0~1分布式计算与大数据项目实战(4)zookeeper内部原理流程简介以及java curator client操作集群注册,读取
flink hadoop 从0~1分布式计算与大数据项目实战(4)zookeeper内部原理流程简介以及java curator client操作集群注册,读取
flink hadoop 从0~1分布式计算与大数据项目实战(4)zookeeper内部原理流程简介以及java curator client操作集群注册,读取
|
SQL 数据采集 架构师

相关内容推荐

执迷不悟歌词
装修厨房橱柜
美洲原住民
81200
虫箭
傲视遮天公益服
性感丝袜视频
树懒简笔画
我不当
系统解剖
BOINC
小鹤音形
嘿基尼
绝地求生pc
仲天骐
柳州市人社局
罗勋儿
电阻分类
金刚纂
狼毫笔
数码宝贝网络侦探
南掸邦军
ug模型
数据排序
桔子兼职
MDD
小木乃伊来我家
牛奶冰沙
球球宝贝
fate樱
折纸飞镖
黑暗火花
叶中豪
芥末菜
弱监督
测试屏幕坏点
烤乳扇
腾瑞雨
小猫精灵
最小二乘解
cabe
生煎鸡
踝泵运动视频
烟台几线城市
百变少女
椎名篝
打板神器
男人天堂视频
朱轩洋
三国杀界关羽
兰博基尼康塔什
范芸熙
万能助手
水平方向
场景建模
格劳克斯
日内瓦退钱
甘肃省统计年鉴
精品视频网站
高桥圣子作品
重庆最美的女孩
宫水四叶
赵云龙胆
大乳老师
德克士汉堡
兰兰鹿鹿
车辆伤害
阿戈尔
SPARC
坦克世界动画片
植物大战僵尸土豆
日本下海女星
朴建厚
八上语文文言文
六阶魔方教程
直播帝国
我的苹果手机
4040
怎么充话费便宜
卓越会
红糖浆
宁波人社局
dota2斧王
小号指法表
赶海的视频
土豆丝的家常做法
手机控制
Ishtar
中科协
宇崎花
cad箭头画法
美女浴室
海绵宝宝照片
虎扑利物浦
青蛙的天敌
杀破狼番外
太清宝诰
滚动的天空饭制
蛇的画法
龙皇异次元
藕丝怎么切
风间飞鸟
超六类
猫咪可爱头像
杀死你
告成镇
小耶
铠甲勇士头像
人皇阿拉贡
逻辑学家
双刀狼
软件安装包
作文课
休息日英文
好例子
uu换肤助手
双吐
光脚丫
免费成人三级片
名词解释题
校园bl
什么牙膏好
矢野茜
推荐审稿人
步非烟有声
辽宁省公务员局
恋如雨止漫画
阿姨麻辣烫
小学生视频
抖音电脑网页版
w212
供热合同
999感冒灵广告
一百年很长吗
雨童
宋乔惠
多面体素描
四川省武胜县
龙血膏
YCbCr
LYY
制音
羊安镇
我想画
加气块切砖机
faceswap
chrono24
小猪佩奇中文版
芥兰头怎么做好吃
360借条逾期
红色警戒核战争
光环士官长
请你离开我
假面骑士娘化
触店
三上悠亚作品集
迷人少女
绝地求生新图
崔杋圭
正所谓
奥利奥盒子
红楼梦第八回概括
曲面屏贴膜
suika
期权交易时间
Rami
罗伯斯比尔
吸收电容
排列3开奖直播
芭比娃娃简笔画
霓虹深渊
五门齐
寸劲开天
除了我还有谁
卢东升
反时限过电流保护
周杰伦霍元甲
多刺甲龙
depthmap
格氏反应
涪城区人民政府
json注释
云顶之弈
西安交通大学网络
贝乐虎古诗
艾斯头像
点亮灯
蝴蝶拍
写轮眼大全
小炒肉的小说
造梦西游四
经典五子棋
猪脸识别
赵芬芳
快速开始
溧阳扎肝
自行车怎么变速
华为pad
第五人格抽奖
日本男团

合作伙伴

情缘小电影

www.qiyoutom.com
www.0710nk.com
zhujiaohui.com
www.zbyuzhiyuan.com
tzystec.com
www.0710nk.com
yzsdyxh.com
skbcnf.com
world-ys.cn
www.xys-piano.com
hfgsdb.com
0710nk.com
xys-piano.com
www.world-ys.cn
www.cangchu-huojia.net
0710nk.com
yzsdyxh.com
www.skbcnf.com
www.lyxzbjy.com
suqinbeiye.com