ETLApp.java 6.76 KB
package com.bigdata.test;

import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ETLApp {

    public static void main(String[] args) throws Exception{
        System.setProperty("HADOOP_USER_NAME","test");
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS","hdfs://localhost:8020");

        FileSystem fileSystem = FileSystem.get(configuration);
        Path outputPath = new Path(args[1]);
//        Path outputPath = new Path("./input/etl");
        if(fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath,true);
        }

        Job job = Job.getInstance(configuration);
        job.setJarByClass(ETLApp.class);
        job.addArchiveToClassPath(new Path("/jar/fastjson-1.2.79.jar"));

        job.setMapperClass(MyMapper.class);

        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
//        FileInputFormat.setInputPaths(job, new Path("./input/data/log-2021-04-13.03.log"));
//        FileOutputFormat.setOutputPath(job, new Path("./input/etl"));

        job.waitForCompletion(true);
    }

    static class MyMapper extends Mapper<LongWritable, Text, NullWritable, Text> {

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String timestamp = StringUtil.isNull;
            String rest = StringUtil.isNull;
            String trace = StringUtil.isNull;
            String requestUri = StringUtil.isNull;
            String status = StringUtil.isNull;
            String request = StringUtil.isNull;
            String response = StringUtil.isNull;
            String method = StringUtil.isNull;
            String uid = StringUtil.isNull;
            String biz_data = StringUtil.isNull;
            String code = StringUtil.isNull;

            String log = value.toString();
            if(StringUtil.isMessyCode(log)) {
                rest = "messy";
            } else {
                JSONObject obj = JSONObject.parseObject(log);

                timestamp = obj.getString("timestamp");
                timestamp = StringUtil.format(timestamp);
                if(StringUtil.isMessyCode(timestamp)) {
                    timestamp = "";
                }
                rest = obj.getString("rest");
                rest = StringUtil.format(rest);
                if(StringUtil.isMessyCode(rest)) {
                    rest = "";
                }
                trace = obj.getString("trace");
                trace = StringUtil.format(trace);
                if(StringUtil.isMessyCode(trace)) {
                    trace = "";
                }

                String[] split_rest = rest.split("] \\[");
                if(split_rest != null && split_rest.length == 11) {
                    rest = "ok";
                    requestUri = split_rest[2] != null ? split_rest[2].replace("requestUri:","") : StringUtil.isNull;
                    requestUri = StringUtil.format(requestUri);
                    if(StringUtil.isMessyCode(requestUri)) {
                        requestUri = "";
                    }
                    status = split_rest[5] != null ? split_rest[5].replace("status:","") : StringUtil.isNull;
                    status = StringUtil.format(status);
                    if(StringUtil.isMessyCode(status)) {
                        status = "";
                    }
                    request = split_rest[8] != null ? split_rest[8].replace("request:","") : StringUtil.isNull;
                    request = StringUtil.format(request);
                    if(StringUtil.isMessyCode(request)) {
                        request = "messy";
                        status = "messy";
                    }
                    response = split_rest[9] != null ? split_rest[9].replace("response:","") : StringUtil.isNull;
                    response = StringUtil.format(response);
                    if(StringUtil.isMessyCode(response)) {
                        response = "";
                    }
                    if(request != null && !"".equals(request)) {
                        String[] requests = request.split("&");
                        if(requests != null && requests.length == 10) {
                            method = requests[1] != null ? requests[1].replace("method=","") : StringUtil.isNull;
                            method = StringUtil.format(method);
                            if(StringUtil.isMessyCode(method)) {
                                method = "";
                            }
                            uid = requests[2] != null ? requests[2].replace("uid=","") : StringUtil.isNull;
                            uid = StringUtil.format(uid);
                            if(StringUtil.isMessyCode(uid)) {
                                uid = "";
                            }
                            biz_data = requests[6] != null ? requests[6].replace("biz_data=","") : StringUtil.isNull;
                            biz_data = StringUtil.format(biz_data);
                            if(StringUtil.isMessyCode(biz_data)) {
                                biz_data = "";
                            }
                        }
                    }
                    if(response != null && !"".equals(response) && JsonUtil.isJson(response)) {
                        code = StringUtil.format(JSONObject.parseObject(response).getString("code"));
                        if(StringUtil.isMessyCode(code)) {
                            code = "";
                        }
                    }
                }
            }

            StringBuilder builder = new StringBuilder();
            builder.append(rest).append("\t");
            builder.append(requestUri).append("\t");
            builder.append(request).append("\t");
            builder.append(response).append("\t");
            builder.append(status).append("\t");
            builder.append(method).append("\t");
            builder.append(uid).append("\t");
            builder.append(biz_data).append("\t");
            builder.append(code).append("\t");
            builder.append(trace).append("\t");
            builder.append(timestamp);

            context.write(NullWritable.get(), new Text(builder.toString()));
        }
    }
}