Kafka

配置Kafka

配置server.properties

/opt/module/kafka/config目录下的该文件,主要修改以下三行

broker.id=2
log.dirs=/opt/module/kafka/datas
zookeeper.connect=bigdata102:2181,bigdata103:2181,bigdata104:2181/kafka

修改后

#broker的全局唯一编号,不能重复,只能是数字。id对应主机名方便区分
broker.id=2
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=bigdata102:2181,bigdata103:2181,bigdata104:2181/kafka

分别修改bigdata103、bigdata104上的broker.id为3,4

追加环境变量到my_env.sh

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

切换root用户分发my_env.sh

source /etc/profile

启动Kafka

先启动zookeeper集群

zk.sh start

再启动kafka集群

kf.sh start

以下是kf.sh脚本内容

#! /bin/bash

case $1 in
"start"){
    for i in bigdata102 bigdata103 bigdata104
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
    done
};;
"stop"){
    for i in bigdata102 bigdata103 bigdata104
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
    done
};;
esac

Flume

配置Flume

将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

rm /opt/module/flume/lib/guava-11.0.2.jar

修改conf目录下的log4j.properties配置文件,配置日志文件路径

vim log4j.properties
## 修改日志目录
flume.log.dir=/opt/module/flume/logs

xsync分发flume目录

配置日志采集Flume

Flume配置文件

kafka上游:

bigdata102的Flume的job目录下创建file_to_kafka.conf

因为在线教育项目的数据,业务数据和用户行为日志数据都是一个jar包生成的,所以kafka上游只配置一个flume就可以

内容:

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.atguigu.edu.flume.interceptor.ETLInterceptor$Builder

#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = bigdata102:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1

日志生成

日志生成脚本lg.sh

#!/bin/bash
  xcall -w 'bigdata102' 'cd /opt/module/data_mocker;java -jar edu2021-mock-2022-06-18.jar >/dev/null 2>&1 &'

将jar包和脚本传到目录/opt/module/data_mocker

mkdir /opt/module/data_mocker

日志采集脚本f1.sh

log—>kafka

#!/bin/bash

case $1 in
"start"){
        for i in bigdata102
        do
                echo " --------启动 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf >/dev/null 2>&1 &"
        done
};;
"stop"){
        for i in bigdata102
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file_to_kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac

kafka下游

bigdata104的Flume的job目录下创建kafka_to_hdfs_db.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata102:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.edu.flume.interceptor.TimestampAndTableNameInterceptor$Builder

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/edu/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false


a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0


a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1


业务数据采集脚本f2.sh

kafka—>hdfs

#!/bin/bash

case $1 in
"start")
        echo " --------启动 bigdata104 日志数据flume-------"
        ssh bigdata104 "nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")

        echo " --------停止 bigdata104 日志数据flume-------"
        ssh bigdata104 "ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac

拦截器

maven项目pom.xml添加依赖

 <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>

创建com.atguigu.edu.flume.interceptor包

编写以下内容

ETLInterceptor

package com.atguigu.edu.flume.interceptor;

import com.atguigu.edu.flume.utils.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;


import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        //1、获取body当中的数据并转成字符串
        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);
        //2、判断字符串是否是一个合法的json,是:返回当前event;不是:返回null
        if (JSONUtil.isJSONValidate(log)) {
            return event;
        } else {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {

        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }

        return list;
    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {

        }

    }

    @Override
    public void close() {

    }
}

TimestampAndTableNameInterceptor

package com.atguigu.edu.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimestampAndTableNameInterceptor implements Interceptor {

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        String lines = new String(event.getBody(), StandardCharsets.UTF_8);
        JSONObject jsonObject = JSONObject.parseObject(lines);
        Long ts = jsonObject.getLong("ts");
        String timeMills = String.valueOf(ts * 1000);
        String tableName = jsonObject.getString("table");
        headers.put("timestamp", timeMills);
        headers.put("tableName", tableName);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new TimestampAndTableNameInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

TimestampInterceptor

package com.atguigu.edu.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class TimestampInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //获取event中的header和body
        Map<String, String> headers = event.getHeaders();
        String log = new String(event.getBody(), StandardCharsets.UTF_8);
        //把body转化成jsonObject对象
        JSONObject jsonObject = JSONObject.parseObject(log);
        //把header中timestamp时间字段替换成日志生成的时间戳
        String ts = jsonObject.getString("ts");
        headers.put("timestamp",ts);

        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new TimestampInterceptor();
        }
        @Override
        public void configure(Context context) {

        }
    }
}

创建com.atguigu.edu.flume.utils包

JSONUtil

package com.atguigu.edu.flume.utils;

import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;

public class JSONUtil {
    /*
     * 通过异常判断是否是json字符串
     * 是:返回true  不是:返回false
     * */
    public static boolean isJSONValidate(String log){
        try {
            JSONObject.parseObject(log);
            return true;
        }catch (JSONException e){
            return false;
        }
    }

}

MySQL

配置MySQL

卸载自带的Mysql-libs(如果之前安装过MySQL,要全都卸载掉)

 rpm -qa | grep -i -Emysql\|mariadb | xargs -n1 sudo rpm -e --nodeps

卸载MySQL依赖

sudo yum remove mysql-libs

下载依赖并安装

sudo yum install libaio
sudo yum -y install autoconf

安装MySQL依赖

sudo rpm -ivh 01_mysql-community-common-5.7.16-1.el7.x86_64.rpm
sudo rpm -ivh 02_mysql-community-libs-5.7.16-1.el7.x86_64.rpm
sudo rpm -ivh 03_mysql-community-libs-compat-5.7.16-1.el7.x86_64.rpm
sudo rpm -ivh 04_mysql-community-client-5.7.16-1.el7.x86_64.rpm
## 如果05的第一句有bug  执行第二句
sudo rpm -ivh 05_mysql-community-server-5.7.16-1.el7.x86_64.rpm
sudo rpm -ivh 05_mysql-community-server-5.7.16-1.el7.x86_64.rpm --force --nodeps

启动mysql

sudo systemctl start mysqld

查看mysql密码

sudo cat /var/log/mysqld.log | grep password

用刚刚查到的密码进入MySQL(如果报错,给密码加单引号)

mysql -uroot -p'password'

按照文档写的这部分

DataGrip连接mysql

此时的bigdata102已经开了隧道,所以这个服务器本身就是个代理,如果在localport中再次配置端口,会和在xshell中开的隧道端口冲突

Maxwell

用于同步增量表数据,数据来源于mysql,目的地是kafka对应的topic,最后由flume从kafka中写入hdfs

修改配置文件/opt/module/maxwell/config.properties

添加以下配置:

log_level=info

producer=kafka
##此处可配多个节点,视具体情况而定
kafka.bootstrap.servers=bigdata102:9092

# mysql login info
host=bigdata102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

kafka_topic=topic_db

启用MySQL Binlog

sudo vim /etc/my.cnf

主要改binlog-do-db=edu参数,此处edu是业务数据库

[mysqld]

#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row
#启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=edu

重启mysql服务

sudo systemctl restart mysqld

启停脚本mxw.sh

#!/bin/bash

MAXWELL_HOME=/opt/module/maxwell

status_maxwell(){
    result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
    return $result
}


start_maxwell(){
    status_maxwell
    if [[ $? -lt 1 ]]; then
        echo "启动Maxwell"
        $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
    else
        echo "Maxwell正在运行"
    fi
}


stop_maxwell(){
    status_maxwell
    if [[ $? -gt 0 ]]; then
        echo "停止Maxwell"
        ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
    else
        echo "Maxwell未在运行"
    fi
}


case $1 in
    start )
        start_maxwell
    ;;
    stop )
        stop_maxwell
    ;;
    status )
         status_maxwell
    ;;
    restart )
       stop_maxwell
       sleep 2
       start_maxwell
    ;;
esac
         

增量表首日全量同步脚本mysql_to_kafka_inc_init.sh

因为懒得改了,将对应的表名改为以下分析出的全量表之外的增量表即可

使用执行以下命令即可

mysql_to_kafka_inc_init.sh all
#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/module/maxwell

import_data() {
    $MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"cart_info")
  import_data cart_info
  ;;
"comment_info")
  import_data comment_info
  ;;
"coupon_use")
  import_data coupon_use
  ;;
"favor_info")
  import_data favor_info
  ;;
"order_detail")
  import_data order_detail
  ;;
"order_detail_activity")
  import_data order_detail_activity
  ;;
"order_detail_coupon")
  import_data order_detail_coupon
  ;;
"order_info")
  import_data order_info
  ;;
"order_refund_info")
  import_data order_refund_info
  ;;
"order_status_log")
  import_data order_status_log
  ;;
"payment_info")
  import_data payment_info
  ;;
"refund_payment")
  import_data refund_payment
  ;;
"user_info")
  import_data user_info
  ;;
"all")
  import_data cart_info
  import_data comment_info
  import_data coupon_use
  import_data favor_info
  import_data order_detail
  import_data order_detail_activity
  import_data order_detail_coupon
  import_data order_info
  import_data order_refund_info
  import_data order_status_log
  import_data payment_info
  import_data refund_payment
  import_data user_info
  ;;
esac

DataX

用于同步全量表,全量表是哪些根据业务分析,一般是用来join的那种维度表,数据直接从MySQL写到HDFS

datax自测命令

python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json

当前在线教育项目的全量表:

base_category_info

base_province

base_source

base_subject_info

comment_info

course_info

knowledge_point

user_info

编写datax配置文件批量生成脚本gen_import_config_ha.py(datax高可用)

# ecoding=utf-8
import json
import getopt
import os
import sys
import MySQLdb

#MySQL相关配置,需根据实际情况作出修改
mysql_host = "bigdata102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "000000"

#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn = "mycluster"


#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/import"


def get_connection():
    return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)


def get_mysql_meta(database, table):
    connection = get_connection()
    cursor = connection.cursor()
    sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"
    cursor.execute(sql, [database, table])
    fetchall = cursor.fetchall()
    cursor.close()
    connection.close()
    return fetchall


def get_mysql_columns(database, table):
    return map(lambda x: x[0], get_mysql_meta(database, table))


def get_hive_columns(database, table):
    def type_mapping(mysql_type):
        mappings = {
            "bigint": "bigint",
            "int": "bigint",
            "smallint": "bigint",
            "tinyint": "bigint",
            "decimal": "string",
            "double": "double",
            "float": "float",
            "binary": "string",
            "char": "string",
            "varchar": "string",
            "datetime": "string",
            "time": "string",
            "timestamp": "string",
            "date": "string",
            "text": "string"
        }
        return mappings[mysql_type]

    meta = get_mysql_meta(database, table)
    return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)


def generate_json(source_database, source_table):
    job = {
        "job": {
            "setting": {
                "speed": {
                    "channel": 3
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [{
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": mysql_user,
                        "password": mysql_passwd,
                        "column": get_mysql_columns(source_database, source_table),
                        "splitPk": "",
                        "connection": [{
                            "table": [source_table],
                            "jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database]
                        }]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://" + hdfs_nn,
                        "fileType": "text",
                        "path": "${targetdir}",
                        "hadoopConfig":{
                            "dfs.nameservices": "mycluster",
                            "dfs.ha.namenodes.mycluster": "nn1,nn2",
                            "dfs.namenode.rpc-address.mycluster.nn1": "bigdata102:8020",
                            "dfs.namenode.rpc-address.mycluster.nn2": "bigdata103:8020",
                            "dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                        },
                        "fileName": source_table,
                        "column": get_hive_columns(source_database, source_table),
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }
                }
            }]
        }
    }
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:
        json.dump(job, f)


def main(args):
    source_database = ""
    source_table = ""

    options, arguments = getopt.getopt(args, '-d:-t:', ['sourcedb=', 'sourcetbl='])
    for opt_name, opt_value in options:
        if opt_name in ('-d', '--sourcedb'):
            source_database = opt_value
        if opt_name in ('-t', '--sourcetbl'):
            source_table = opt_value

    generate_json(source_database, source_table)


if __name__ == '__main__':
    main(sys.argv[1:])

全量表数据同步脚本mysql_to_hdfs_full.sh

脚本使用样例:

mysql_to_hdfs_full.sh all 2020-10-01

#!/bin/bash

DATAX_HOME=/opt/module/datax

# 如果传入日期则do_date等于传入的日期,否则等于前一天日期
if [ -n "$2" ] ;then
    do_date=$2
else
    do_date=`date -d "-1 day" +%F`
fi

#处理目标路径,此处的处理逻辑是,如果目标路径不存在,则创建;若存在,则清空,目的是保证同步任务可重复执行
handle_targetdir() {
  hadoop fs -test -e $1
  if [[ $? -eq 1 ]]; then
    echo "路径$1不存在,正在创建......"
    hadoop fs -mkdir -p $1
  else
    echo "路径$1已经存在"
    fs_count=$(hadoop fs -count $1)
    content_size=$(echo $fs_count | awk '{print $3}')
    if [[ $content_size -eq 0 ]]; then
      echo "路径$1为空"
    else
      echo "路径$1不为空,正在清空......"
      hadoop fs -rm -r -f $1/*
    fi
  fi
}

#数据同步
import_data() {
  datax_config=$1
  target_dir=$2

  handle_targetdir $target_dir
  python $DATAX_HOME/bin/datax.py -p"-Dtargetdir=$target_dir" $datax_config
}

case $1 in
"base_category_info")
  import_data /opt/module/datax/job/import/edu.base_category_info.json /origin_data/edu/db/activity_info_full/$do_date
  ;;                                       
"base_province")                           
  import_data /opt/module/datax/job/import/edu.base_province.json /origin_data/edu/db/base_province_full/$do_date
  ;;                                       
"base_source")                          
  import_data /opt/module/datax/job/import/edu.base_source.json /origin_data/edu/db/base_source_full/$do_date
  ;;                                      
"base_subject_info")                         
  import_data /opt/module/datax/job/import/edu.base_subject_info.json /origin_data/edu/db/base_subject_info_full/$do_date
  ;;                                       
"comment_info")                          
  import_data /opt/module/datax/job/import/edu.comment_info.json /origin_data/edu/db/comment_info_full/$do_date
  ;;                                       
"course_info")                                
  import_data /opt/module/datax/job/import/edu.course_info.json /origin_data/edu/db/course_info_full/$do_date
  ;;                                                                  
"knowledge_point")                             
  import_data /opt/module/datax/job/import/edu.knowledge_point.json /origin_data/edu/db/knowledge_point_full/$do_date
  ;;
"user_info")
  import_data /opt/module/datax/job/import/edu.user_info.json /origin_data/edu/db/user_info_full/$do_date
  ;;

"all")
  import_data /opt/module/datax/job/import/edu.base_category_info.json /origin_data/edu/db/base_category_info/$do_date
  import_data /opt/module/datax/job/import/edu.base_province.json /origin_data/edu/db/base_province_full/$do_date
  import_data /opt/module/datax/job/import/edu.base_source.json /origin_data/edu/db/base_source_full/$do_date
  import_data /opt/module/datax/job/import/edu.base_subject_info.json /origin_data/edu/db/base_subject_info_full/$do_date
  import_data /opt/module/datax/job/import/edu.comment_info.json /origin_data/edu/db/comment_info_full/$do_date
  import_data /opt/module/datax/job/import/edu.course_info.json /origin_data/edu/db/course_info_full/$do_date
  import_data /opt/module/datax/job/import/edu.knowledge_point.json /origin_data/edu/db/knowledge_point_full/$do_date
  import_data /opt/module/datax/job/import/edu.user_info.json /origin_data/edu/db/user_info_full/$do_date
  ;;
esac

Q.E.D.