ETLApp.java
5.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package com.bigdata.test;
import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ETLApp {
public static void main(String[] args) throws Exception{
System.setProperty("HADOOP_USER_NAME","sifude");
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://localhost:8020");
FileSystem fileSystem = FileSystem.get(configuration);
Path outputPath = new Path(args[1]);
if(fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath,true);
}
Job job = Job.getInstance(configuration);
job.setJarByClass(ETLApp.class);
job.addArchiveToClassPath(new Path("/jar/fastjson-1.2.76.jar"));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String timestamp = StringUtil.isNull;
String rest = StringUtil.isNull;
String trace = StringUtil.isNull;
String requestUri = StringUtil.isNull;
String status = StringUtil.isNull;
String request = StringUtil.isNull;
String response = StringUtil.isNull;
String method = StringUtil.isNull;
String uid = StringUtil.isNull;
String biz_data = StringUtil.isNull;
String code = StringUtil.isNull;
String log = value.toString();
if(StringUtil.isMessyCode(log)) {
rest = "messy";
} else {
JSONObject obj = JSONObject.parseObject(log);
timestamp = obj.getString("timestamp");
timestamp = StringUtil.format(timestamp);
rest = obj.getString("rest");
rest = StringUtil.format(rest);
trace = obj.getString("trace");
trace = StringUtil.format(trace);
String[] split_rest = rest.split("] \\[");
if(split_rest != null && split_rest.length == 11) {
rest = "ok";
requestUri = split_rest[2] != null ? split_rest[2].replace("requestUri:","") : StringUtil.isNull;
requestUri = StringUtil.format(requestUri);
status = split_rest[5] != null ? split_rest[5].replace("status:","") : StringUtil.isNull;
status = StringUtil.format(status);
request = split_rest[8] != null ? split_rest[8].replace("request:","") : StringUtil.isNull;
request = StringUtil.format(request);
if(StringUtil.isMessyCode(request)) {
request = "messy";
status = "messy";
}
response = split_rest[9] != null ? split_rest[9].replace("response:","") : StringUtil.isNull;
response = StringUtil.format(response);
if(request != null && !"".equals(request)) {
String[] requests = request.split("&");
if(requests != null && requests.length == 10) {
method = requests[1] != null ? requests[1].replace("method=","") : StringUtil.isNull;
method = StringUtil.format(method);
uid = requests[2] != null ? requests[2].replace("uid=","") : StringUtil.isNull;
uid = StringUtil.format(uid);
biz_data = requests[6] != null ? requests[6].replace("biz_data=","") : StringUtil.isNull;
biz_data = StringUtil.format(biz_data);
}
}
if(response != null && !"".equals(response) && JsonUtil.isJson(response)) {
code = StringUtil.format(JSONObject.parseObject(response).getString("code"));
}
}
}
StringBuilder builder = new StringBuilder();
builder.append(rest).append("\t");
builder.append(requestUri).append("\t");
builder.append(request).append("\t");
builder.append(response).append("\t");
builder.append(status).append("\t");
builder.append(method).append("\t");
builder.append(uid).append("\t");
builder.append(biz_data).append("\t");
builder.append(code).append("\t");
builder.append(trace).append("\t");
builder.append(timestamp);
context.write(NullWritable.get(), new Text(builder.toString()));
}
}
}