Kafka集群部署分分快三全天计划网站

作者:电脑系统

交流学习

由于学识有限,如有错误,望各位指正、共同学习

2018-05-15 20:55:02

 

Kafka集群部署

一. 关于kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

关于Kafka的更多介绍请参考:

二.准备工作

1. 配置各主机IP。将各主机IP配置为静态IP(保证各主机可以正常通信,为避免过多的网络传输,建议在同一网段)

  1. 修改机器主机名。Kafka集群中的所有主机都需要修改。

  2. 配置各主机映射。修改hosts文件,加入各主机IP和主机名的映射。

4. 开放相应端口。后面文档中配置的端口都需要开放(或者关闭防火墙),root权限。

5. 保证Zookeeper集群服务能够正常运行。其实只要Zookeeper集群部署成功,上面的准备工作基本都能做好了。关于Zookeeper的部署请参考:

三.安装Kafka

1. 下载kafka安装包,访问Kafka官网下载对应版本即可。这里使用的版本为2.9.2-0.8.1.1。

   2.  使用下面的命令解压安装包

tar -zxvf kafka_2.9.2-0.8.1.1.tgz

  1. 修改配置文件,简单配置只需要修改/config/server.properties文件即可。

vim config/server.properties

需要修改的内容:

broker.id(标示当前server在集群中的id,从0开始);port;host.name(当前的server host name);zookeeper.connect(连接的zookeeper集群);log.dirs(log的存储目录,需要提前创建)。

示例:

分分快三全天计划网站 1

分分快三全天计划网站 2

分分快三全天计划网站 3

分分快三全天计划网站 4

  1. 把配置好的kafka上传到其他节点上

scp -r kafka node2:/usr/

注意,上传之后不要忘了修改broker.id和host.nam等每个节点独有的配置。

四.启动并测试Kafka

1.首先启动Zookeeper,之后使用一下命令启动Kafka,启动成功之后会有信息提示。

./bin/kafka-server-start.sh config/server.properties &

分分快三全天计划网站 5

分分快三全天计划网站 6

2.对Kafka进行测试。分别创建topic,producer,consumer,最好是在不同的节点上创建。在producer的控制台上输入信息,观察consumer控制台是否能够接收到。

创建topic:

./bin/kafka-topics.sh -zookeeper node1:2181,node2:2181,node3:2181 -topic test -replication-factor 2 -partitions 3 -create

查看topic:

./bin/kafka-topics.sh -zookeeper node1:2181,node2:2181,node3:2181 -list

分分快三全天计划网站 7

分分快三全天计划网站 8

创建producer:

./bin/kafka-console-producer.sh -broker-list node1:9092,node2:9092,node3:9092 -topic test

创建consumer:

./bin/kafka-console-consumer.sh -zookeeper node1:2181,node2:2181,node3:2181 - from-begining -topic test

测试:

在producer的控制台输入信息,查看consumer的控制台能否接收到。

producer:

分分快三全天计划网站 9

consumer

分分快三全天计划网站 10

经过以上的配置和测试,Kafka已经初步部署好了,接下来可以根据具体的需求配置和操作Kafka。关于Kafka的更多操作和更具体的使用方法请参考官网文档。

分布式发布订阅消息系统 Kafka 架构设计

Apache Kafka 代码实例

Apache Kafka 教程笔记

Apache kafka原理与特性(0.8V) 

Kafka部署与代码实例 

Kafka介绍和集群环境搭建 

Kafka 的详细介绍:请点这里
Kafka 的下载地址:请点这里

本文永久更新链接地址:

一. 关于kafka Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作...

1. Kafka重要概念和技术架构:

写在前面

Rocketmq采用apache rockemq 4.2.0release版本。

源码路径(

1.安装计划部署:

计划在三台主机上安装部署Kafka集群:192.168.8.128,192.168.8.129,192.168.8.130。我选用的Kafka安装版本:kafka_2.10-0.10.1.0.tgz,在官网下载安装文件,下载后,解压压缩包:

$ tar zxvf kafka_2.10-0.10.1.0.tgz -C /opt/modules/

解决方案


1.Kafka与普通的Topology集成:

1.需要引入maven的storm-kafka依赖包。Storm的spout读取kafka数据源。

 部署方式

配置文件采用官网提供的distribution/conf/2m-2s-sync文件夹下的文件

分分快三全天计划网站 11

采用官方推荐的方式进行部署(具体指令见下图或官网),但是这里我们把name server 地址写入到配置文件(具体看下文修改过的配置文件)

分分快三全天计划网站 12

在当前场景下,采用官网推荐的方式进行部署存在两个问题

问题1 lock failed, MQ already started

问题2 Address already in use 

2.Kafka的重要概念:

1.Broker:Kafka集群中的每一个节点服务器,集群是由一个或者多个Broker组成的。
2.Producer:消息的生产者
3.Consumer:消息的消费者
4.Topic:保存消息的逻辑单元,类似database中的table。消息按照不同类别发布到Kafka集群上,每个类别称之为一个topic。
5.Partition:Topic内的消息,在物理上按照分区(Partition)存储。
6.Consumer Group:让某几个consumer共同消费一个topic中的消息,这几个consumer不会出现重复消费消息的情况。

场景

这里采用两台Ubuntu主机,分别在一台主机上部署一个name server、一个从broker服务;另一台主机上部署一个name server、两个主broker、一个从broker。


3.拷贝其他主机,修改broker.id

1.拷贝主机:

$ scp -r kafka_2.10-0.10.1.0/ natty@hadoop-senior03.pmpa.com:/opt/modules/

2.在config/server.properties配置文件中修改broker.id项
每一个broker的server.properties需要修改。

3. Kafka使用测试(ConsoleProducer、ConsoleConsumer)

Kafka集群安装配置好了之后,我们用自带的命令行生产、消费者来测试下kafka集群的使用。开启ConsolePrducer和ConsoleConsumer,使用如下2个脚本:
$KAFKA_HOME/bin/kafka-console-consumer.sh
$KAFKA_HOME/bin/kafka-console-producer.sh

不带任何参数运行这2个脚本,可以查看帮助信息(其他这些shell脚本也可以这样查看帮助)。注意,必须的参数前有个REQUIRED标志。

分分快三全天计划网站 13

帮助信息.png

先创建一个测试的topic:

$bin/kafka-topics.sh --create --topic test_natty --partitions 3  --replication-factor 1 --zookeeper vm-master:2181

启动一个命令行的producer:

$bin/kafka-console-producer.sh --broker-list vm-master:9092 --topic test_natty

启动一个命令行的consumer:

$bin/kafka-console-consumer.sh --topic test_natty --zookeeper vm-master:2181

这样就开启了2个空白的窗口,在producer的窗口输入了消息后,就可以在consumer窗口看到刚才发送的消息:如下图:
producer输入:

分分快三全天计划网站 14

producer.png

consumer输出:

分分快三全天计划网站 15

consumer.png

1.实时数据分析中的Kakfa

在实时数据分析应用,Kafka的位置非常重要。首先通过Flume将Nginx服务器的日志,直接sink到Kakfa。然后通过Storm等实时计算框架,将Kafka数据计算并写入到HBase/Redis中,最后通过web客户端直接访问HBase/Redis来展示数据。

1.Kafka消息存储:Kafka的消息存储在Broker节点的磁盘上,并且是顺序写。
2.Kafka消息的消费特性:消费者消费Kafka上的消息,不会对消息进行删除操作,消息可以被不同的消费者重复消费。在Kafka Broker上保存的时间,由参数log.retention.hours等确定

4.启动Kafka

启动kakfa,使用脚本bin/kafka-server-start.sh脚本,参数是config/server.properties文件。

$ nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

可以写一个shell来批量启停kafka集群。在每一个broker上执行启动和停止服务器的脚本。kafkaServer_batch.sh:

#!/bin/bash
KAFKA_HOME=/opt/modules/kafka_2.10-0.10.1.0
if [ $# -ne 1 ]
then
   echo "Usage:kafkaServer_batch.sh [start|stop]";
   exit -1;
fi
action=$1
if [ $action = "start" ]
#start add parameters!!!!
then
   batchscript="kafka-server-start.sh ${KAFKA_HOME}/config/server.properties";
elif [ $action = "stop" ]
then
   batchscript=kafka-server-stop.sh;
else
   echo "bad parameter!!";
   exit -1;
fi

echo $batchscript

for i in 192.168.8.128 192.168.8.129 192.168.8.130
do
  echo "${action} node ${i} Begin:"
  echo "source /etc/profile && nohup ${KAFKA_HOME}/bin/${batchscript} >/dev/null 2>&1 &"
  ssh ${i} "source /etc/profile && nohup ${KAFKA_HOME}/bin/${batchscript} >/dev/null 2>&1 &" &
  echo "${action} node ${i} End:"
done

启动后,jps查看进程,可以看到Kafka进程:

2921 QuorumPeerMain
3025 Kafka
3308 Jps

3.Kafka架构图:

分分快三全天计划网站 16

Kafka架构图.png

Kafka集群只是一个或者多个Broker节点,节点间通信通过Zookeeper完成。

2. Kafka与Trident集成:

3. 使用KafkaBolt将数据存入Kafka:

一般情况,将数据存入Kafka的Case很少,如果需要的话,使用KafkaBolt来实现。

4. Flume sink数据到Kafka

2.执行结果测试

flume启动后,会有一个Application进程在监听localhost:44444端口,如果有错误想杀掉flume进程,直接查询flume的PID后kill即可。

$ps -aux | grep flume
$kill -9 XXX

下面,我们尝试通过telnet 向localhost:44444发送数据, 再开启一个Console-consumer来监控flume是否已经开始往topic test_natty中sink了数据。
开启telnet:

$telnet localhost 44444

分分快三全天计划网站 17

telnet往44444端口发送数据.png

Console-consumer消费topic数据:

分分快三全天计划网站 18

Console-Consumer消费数据.png

2. 修改配置文件:

切换到KAFKA_HOME目录,修改配置文件config/server.properties文件。

$ cd /opt/modules/kafka_2.10-0.10.1.0/
$ vi config/server.properties

sever.properties中有很多配置项,一般地,以下需要修改来完成kafka集群配置:

#节点唯一标识号,kafka集群中的每个broker,该项都不能重复
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

#Kafka消息的存放路径,注意不是日志而是Kafka的消息
# A comma seperated list of directories under which to store log files
log.dirs=/opt/modules/kafka_2.10-0.10.1.0/kafka-logs

#Kafka集群保存消息的时间,单位是时间。也就是Kafka消息在集群上保存1周后删掉。
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

#Zookeeper服务器的地址,逗号分隔多个zookeeper主机
zookeeper.connect=hadoop-senior01.pmpa.com:2181,hadoop-senior02.pmpa.com:2181,hadoop-senior03.pmpa.com:2181

2. Kafaka集群安装

一定按照官方的文档来安装和部署。

1.首先增加一个flume agent配置

新增一个配置文件,kafka-sink.properties, 该配置文件实现如下flume配置:
Source: netcat (监听某个服务器44444端口的数据,nc命令)
Channel: Memory
Sink: Kafka Topic(使用之前创建的测试topic:test_natty )
在使用时候,需要注意使用的flume的版本,在确定了flume的版本后,到官网找到 Kafka Sink部分的配置信息,按照例子来填写Kafka Sink就可以了。之前在测试时候,使用了一个flume 1.8 和 kafka 0.8版本的组合,发现了问题。最终的问题的原因是Kafka版本过低,需要到0.10版本才能解决该问题,所以在测试这个case时,还需要注意使用的flume、kafka的版本。下面的例子使用的flume 1.6版本
文件 kafka-sink.properties 的详细配置文件如下:

a1.sources = s1
a1.channels = c1
a1.sinks = k1

# define the netcat source:
a1.sources.s1.type = netcat
a1.sources.s1.bind = localhost
a1.sources.s1.port = 44444


# define the memory channel:
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


#define the kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test_natty
a1.sinks.k1.brokerList = 10.198.193.189:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

启动flume agent:

$bin/flume-ng agent -n a1 -c -conf -f conf/kafka-sink.properties -Dflume.root.logger=INFO,console

启动了flume之后,查看端口监听的情况,可以看到44444端口已经被监听:

$netstat --help
$netstat -nltp

5. Storm与Kafka集成:

下面我开发了一个简单的例子,来展示storm 普通Topology 和Trident读取Kafka数据源的方法,后续的bolt的开发就根据业务来定制化。测试方法:在第4部分,配置了一个nc cluster host --> Kafka的配置。 我们就使用这个例子来进行测试,后续Storm的处理也非常简单(直接打印)。最后,测试效果是,通过nt 命令发送一些数据,storm会将nc命令输入的一些string,打印在console上,这样完成测试。

本文由分分快三计划发布,转载请注明来源

关键词: 分分快三计划 hadoop