ETLApp.java 10.4 KB
package com.bigdata.test;

import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ETLApp {

    public static void main(String[] args) throws Exception{
        System.setProperty("HADOOP_USER_NAME","test");
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS","hdfs://localhost:8020");

        FileSystem fileSystem = FileSystem.get(configuration);
        Path outputPath = new Path(args[1]);
//        Path outputPath = new Path("./input/etl");
        if(fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath,true);
        }

        Job job = Job.getInstance(configuration);
        job.setJarByClass(ETLApp.class);
        job.addArchiveToClassPath(new Path("/jar/fastjson-1.2.79.jar"));

        job.setMapperClass(MyMapper.class);

        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
//        FileInputFormat.setInputPaths(job, new Path("./input/data/log-2021-04-13.03.log"));
//        FileOutputFormat.setOutputPath(job, new Path("./input/etl"));

        job.waitForCompletion(true);
    }

    static class MyMapper extends Mapper<LongWritable, Text, NullWritable, Text> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String timestamp = StringUtil.isNull;
            String rest = StringUtil.isNull;
            String trace = StringUtil.isNull;
            String requestUri = StringUtil.isNull;
            String status = StringUtil.isNull;
            String request = StringUtil.isNull;
            String response = StringUtil.isNull;
            String method = StringUtil.isNull;
            String uid = StringUtil.isNull;
            String biz_data = StringUtil.isNull;
            String platform = StringUtil.isNull;
            String deviceInfo = StringUtil.isNull;
            String version = StringUtil.isNull;
            String imei = StringUtil.isNull;
            String app_id = StringUtil.isNull;
            String code = StringUtil.isNull;

            String log = value.toString();
            if(StringUtil.isMessyCode(log)) {
                rest = "messy";
            } else {
                JSONObject obj = JSONObject.parseObject(log);

                timestamp = obj.getString("timestamp");
                timestamp = StringUtil.format(timestamp);
                if(StringUtil.isMessyCode(timestamp)) {
                    timestamp = "";
                }
                rest = obj.getString("rest");
                rest = StringUtil.format(rest);
                if(StringUtil.isMessyCode(rest)) {
                    rest = "";
                }
                trace = obj.getString("trace");
                trace = StringUtil.format(trace);
                if(StringUtil.isMessyCode(trace)) {
                    trace = "";
                }

                String[] split_rest = rest.split("] \\[");
                if(split_rest != null && split_rest.length == 11) {
                    rest = "ok";
                    requestUri = split_rest[2] != null ? split_rest[2].replace("requestUri:","") : StringUtil.isNull;
                    requestUri = StringUtil.format(requestUri);
                    if(StringUtil.isMessyCode(requestUri)) {
                        requestUri = "";
                    }
                    status = split_rest[5] != null ? split_rest[5].replace("status:","") : StringUtil.isNull;
                    status = StringUtil.format(status);
                    if(StringUtil.isMessyCode(status)) {
                        status = "";
                    }
                    request = split_rest[8] != null ? split_rest[8].replace("request:","") : StringUtil.isNull;
                    request = StringUtil.format(request);
                    if(StringUtil.isMessyCode(request)) {
                        request = "messy";
                        status = "messy";
                    }
                    response = split_rest[9] != null ? split_rest[9].replace("response:","") : StringUtil.isNull;
                    response = StringUtil.format(response);
                    if(StringUtil.isMessyCode(response)) {
                        response = "";
                    }
                    if(request != null && !"".equals(request)) {
                        String[] requests = request.split("&");
                        if(requests != null && requests.length == 10) {
                            for(int i = 0; i < requests.length; i++) {
                                if(requests[i] != null) {
                                    if(requests[i].contains("uid=")) {
                                        uid = requests[i].replace("uid=","");
                                        uid = StringUtil.format(uid);
                                        if(StringUtil.isMessyCode(uid)) {
                                            uid = "";
                                        }
                                    }else if(requests[i].contains("method=")) {
                                        method = requests[i].replace("method=","");
                                        method = StringUtil.format(method);
                                        if(StringUtil.isMessyCode(method)) {
                                            method = "";
                                        }
                                    }else if(requests[i].contains("biz_data=")) {
                                        biz_data = requests[i].replace("biz_data=","");
                                        biz_data = StringUtil.format(biz_data);
                                        if(StringUtil.isMessyCode(biz_data)) {
                                            biz_data = "";
                                        }
                                    }else if(requests[i].contains("platform=")) {
                                        platform = requests[i].replace("platform=","");
                                        platform = StringUtil.format(platform);
                                        if(StringUtil.isMessyCode(platform)) {
                                            platform = "";
                                        }
                                    }else if(requests[i].contains("deviceInfo=")) {
                                        deviceInfo = requests[i].replace("deviceInfo=","");
                                        deviceInfo = StringUtil.format(deviceInfo);
                                        if(StringUtil.isMessyCode(deviceInfo)) {
                                            deviceInfo = "";
                                        }
                                    }else if(requests[i].contains("version=")) {
                                        version = requests[i].replace("version=","");
                                        version = StringUtil.format(version);
                                        if(StringUtil.isMessyCode(version)) {
                                            version = "";
                                        }
                                    }else if(requests[i].contains("imei=")) {
                                        imei = requests[i].replace("imei=","");
                                        imei = StringUtil.format(imei);
                                        if(StringUtil.isMessyCode(imei)) {
                                            imei = "";
                                        }
                                    }else if(requests[i].contains("app_id=")) {
                                        app_id = requests[i].replace("app_id=","");
                                        app_id = StringUtil.format(app_id);
                                        if(StringUtil.isMessyCode(app_id)) {
                                            app_id = "";
                                        }
                                    }
                                } else {
                                    uid = StringUtil.isNull;
                                    method = StringUtil.isNull;
                                    biz_data = StringUtil.isNull;
                                    platform = StringUtil.isNull;
                                    deviceInfo = StringUtil.isNull;
                                    version = StringUtil.isNull;
                                    imei = StringUtil.isNull;
                                    app_id = StringUtil.isNull;
                                }
                            }
                        }
                    }
                    if(response != null && !"".equals(response) && JsonUtil.isJson(response)) {
                        code = StringUtil.format(JSONObject.parseObject(response).getString("code"));
                        if(StringUtil.isMessyCode(code)) {
                            code = "";
                        }
                    }
                }
            }

            StringBuilder builder = new StringBuilder();
            builder.append(rest).append("\t");
            builder.append(requestUri).append("\t");
            builder.append(request).append("\t");
            builder.append(response).append("\t");
            builder.append(status).append("\t");
            builder.append(method).append("\t");
            builder.append(uid).append("\t");
            builder.append(biz_data).append("\t");
            builder.append(platform).append("\t");
            builder.append(deviceInfo).append("\t");
            builder.append(version).append("\t");
            builder.append(imei).append("\t");
            builder.append(app_id).append("\t");
            builder.append(code).append("\t");
            builder.append(trace).append("\t");
            builder.append(timestamp);

            context.write(NullWritable.get(), new Text(builder.toString()));
        }
    }
}