大数据常用命令

MySQL

# 启动
service mysqld start
systemctl start mysql[d]
# 关闭
service mysqld stop
#设置mysql开机启动
chkconfig mysqld on

Hadoop

# 全局组件启动与停止
start-all.sh
stop-all.sh

# HDFS 启动与停止
start-dfs.sh
stop-dfs.sh

# Yarn 启动与停止
start-yarn.sh
stop-yarn.sh

# HDFS 单个启动
hadoop-daemon.sh start namenode 

# HDFS 多个启动
hadoop-daemons.sh start datanode

# Yarn 单个启动
yarn-daemon.sh start resourcemanager 

# Yarn 多个启动
yarn-daemons.sh start nodemanager

# MR 历史 job记录,端口号 19888
mr-jobhistory-daemon.sh start historyserver

# 退出安全模式
hadoop dfsadmin -safemode leave

Hive

# 启动Hive 的元数据服务
nohup /export/server/hive-2.1.0/bin/hive --service metastore   &

# 启动Hive 客户端服务
nohup /export/server/hive-2.1.0/bin/hiveserver2 start &
# beeline
!connect jdbc:hive2://node03:10000

# hive元数据初始化和更新
schematool -dbType mysql -initSchema
schematool -dbType mysql -upgradeSchema

# 使用动态分区

# 开启动态分区
set hive.exec.dynamic.partition=true;
# 开启非严格模式 
set hive.exec.dynamic.partition.mode=nonstrict;
# 每个节点生成动态分区的最大个数
set hive.exec.max.dynamic.partitions.pernode=10000;
# 生成动态分区的最大个数
set hive.exec.max.dynamic.partitions=100000;
# 一个任务最多可以创建的文件数目
set hive.exec.max.created.files=150000;
# 限定一次最多打开的文件数
set dfs.datanode.max.xcievers=8192;

## Hive基础优化内容
# hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;

# 写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;

# 分桶
set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;

# 并行执行
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8;

# 小文件合并
-- set mapred.max.split.size=2147483648;
-- set mapred.min.split.size.per.node=1000000000;
-- set mapred.min.split.size.per.rack=1000000000;

# 矢量化查询
set hive.vectorized.execution.enabled=true;

# 关联优化器
set hive.optimize.correlation=true;

# 读取零拷贝
set hive.exec.orc.zerocopy=true;

# join数据倾斜
set hive.optimize.skewjoin=true;
-- set hive.skewjoin.key=100000;
set hive.optimize.skewjoin.compiletime=true;
set hive.optimize.union.remove=true;

# group倾斜
set hive.groupby.skewindata=false;

Zookeeper

# 全局启动
zkServer.sh start

# 标准启动
zookeeper-daemon.sh start

Kafka

启动与停止

# 启动 Kafka 启动服务,需要先启动 zookeeper
kafka-server-start.sh config/server.properties >>/dev/null 2>&1 &

# 关闭 Kafka 服务
kafka-server-stop.sh

封装启动脚本, 记得给权限

#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1

for number in {1..3}
do
        host=node${number}
        echo ${host}
        /usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;export JMX_PORT=9988;${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >>/dev/null 2>&1 &"
        echo "${host} started"
done

封装关闭脚本,记得给权限

#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1

for number in {1..3}
do
  host=node${number}
  echo ${host}
  /usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;${KAFKA_HOME}/bin/kafka-server-stop.sh"
  echo "${host} stoped"
done

彻底删除 kafka 并初始化

# 1.检查 server.properties 配置文件中的 delete.topic.enable=true,所有节点都需要设置,生效需要重启

# 2.删除 kafka 中的 topic test_data
bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --delete --topic test_data

#3.打开 zkCli.sh 删除三组配置
rm-rf /brokers/topics/test_data
rm-rf /config/topics/test_data
rm-rf /admin/delete_topics/test_data

#4.如果 kafka 集群没有关闭,关闭集群

#5.清空 log.dirs=/export/data/kafka/kafka-logs 目录,也就是 kafka 集群的数据目录
rm -rf /export/data/kafka/kafka-logs/*

#6.重启 kafka 集群

#7.重新创建新的 topic
bin/kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic test_data -- partitions 3 --replication-factor 2

创建主题

kafka-topics.sh --zookeeper node3:2181 --create --topic spark_kafka --partitions 3 --replication-factor 1
kafka-topics.sh --zookeeper node3:2181 --list

启动生产者和消费者

kafka-console-producer.sh --broker-list node3:9092 --topic spark_kafka
kafka-console-consumer.sh --from-beginning --bootstrap-server node3:9092 --topic spark_kafka
kafka-console-consumer.sh --from-beginning --bootstrap-server node3:9092 --topic __consumer_offsets

Spark

启动spark-thriftserver

start-thriftserver.sh \
  --hiveconf hive.server2.thrift.port=10001 \
  --hiveconf hive.server2.thrift.bind.host=node3 \
  --master local[*]

启动 Spark HistoryServer服务, 端口号 18080

sbin/start-history-server.sh

structured Streaming

--memory sink
CREATE TABLE db_spark.tb_word_count (
  id int NOT NULL AUTO_INCREMENT,
  word varchar(255) NOT NULL,
  count int NOT NULL,
  PRIMARY KEY (id),
  UNIQUE KEY word (word)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

REPLACE INTO  tb_word_count (id, word, count) VALUES (NULL, ?, ?);

spark yarn Pi 测试

/export/server/spark/bin/spark-submit \
--master yarn \
--class org.apache.spark.examples.SparkPi \
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \
10

WordCount yarn

/export/server/spark/bin/spark-submit \
--master yarn \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 2 \
--queue default \
--class cn.test.spark._2SparkWordCount \
/opt/spark-chapter01-1.0-SNAPSHOT.jar

Spark-submit

【 Run application local on 8 cores】
/export/server/spark/bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8] \
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \
  100

# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G \
  --total-executor-cores 100 \
${SPARK_HOME}/examples/jars/spark-examples_2.11-2.4.5.jar \
  1000

# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000

# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode cluster \  # can be client for client mode
  --executor-memory 20G \
  --num-executors 50 \
  /path/to/examples.jar \
  1000

# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
  --master spark://207.184.161.138:7077 \
  examples/src/main/python/pi.py \
  1000

# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master mesos://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  http://path/to/examples.jar \
  1000

# Run on a Kubernetes cluster in cluster deploy mode
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master k8s://xx.yy.zz.ww:443 \
  --deploy-mode cluster \
  --executor-memory 20G \
  --num-executors 50 \
  http://path/to/examples.jar \
  1000

Sqoop数据抽取和数据验证

export SQOOP_HOME=/export/server/sqoop-1.4.7.bin_hadoop-2.6.0
$SQOOP_HOME/bin/sqoop import \
--connect jdbc:mysql://192.168.88.163:3306/insurance \
--username root \
--password 123456 \
--table dd_table \
--hive-table insurance_ods.dd_table \
--hive-import \
--hive-overwrite \
--fields-terminated-by '\t' \
--delete-target-dir \
-m 1

#1、查询MySQL的表dd_table的条数
mysql_log=`$SQOOP_HOME/bin/sqoop eval \
--connect jdbc:mysql://192.168.88.163:3306/insurance \
--username root \
--password 123456 \
--query "select count(1) from dd_table"
`
mysql_cnt=`echo $mysql_log | awk -F'|' {'print $4'} | awk {'print $1'}`
#2、查询hive的表dd_table的条数
hive_log=`hive -e "select count(1) from insurance_ods.dd_table"`

#3、比较2边的数字是否一样。
if [ $mysql_cnt -eq $hive_log ] ; then
echo "mysql表的数据量=$mysql_cnt,hive表的数据量=$hive_log,是相等的"
else
echo "mysql表的数据量=$mysql_cnt,hive表的数据量=$hive_log,不是相等的"
fi
  • Session 模式
# 先创建 Session 会话, d 表示后台运行,s 表示每个 jm 的 slot 个数
flink/bin/yarn-session.sh -d -jm 1024 -tm 1024 -s 2

# 提交任务
flink/bin/flink run /export/server/flink/examples/batch/WordCount.jar \
--input hdfs://node1.test.cn:8020/wordcount/input
  • Job 分离模式
# 直接提交任务,m 表示 jm 的地址,环境变量需要提前配置
/export/server/flink/bin/flink run \
-m yarn-cluster -yjm 1024 -ytm 1024 \
/export/server/flink/examples/batch/WordCount.jar \
--input hdfs://node1.test.cn:8020/wordcount/input

其它命令

ES 启动

cd /export/server/es/elasticsearch-7.6.1/
/export/server/es/elasticsearch-7.6.1/bin/elasticsearch >>/dev/null 2>&1 &

markdown代码折叠

<details>
<summary><b>点击查看完整代码</b></summary>
<pre><code>
</code></pre>
</details>

免秘钥登录

ssh-keygen -t rsa
ssh-copy-id node1
scp /root/.ssh/authorized_keys node2:/root/.ssh
scp /root/.ssh/authorized_keys node3:/root/.ssh

大数据常用命令
https://jface001.github.io/2020/05/13/大数据常用命令/
作者
惊羽
发布于
2020年5月13日
许可协议