Skip to content
Toggle navigation
Toggle navigation
This project
Loading...
Sign in
张浩
/
BrBigDataTest
Go to a project
Toggle navigation
Toggle navigation pinning
Projects
Groups
Snippets
Help
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Issue Boards
Files
Commits
Network
Compare
Branches
Tags
Commit
0630c5d3
...
0630c5d3ccd02c4fc13d613c9cce763149eca196
authored
2022-02-08 17:48:22 +0800
by
zhanghao
Browse Files
Options
Browse Files
Tag
Download
Email Patches
Plain Diff
commit
1 parent
af8f0e61
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
158 additions
and
0 deletions
src/main/java/com/bigdata/test/ETLAppDebug.java
src/main/java/com/bigdata/test/ETLAppDebug.java
0 → 100755
View file @
0630c5d
package
com
.
bigdata
.
test
;
import
com.alibaba.fastjson.JSONObject
;
import
org.apache.hadoop.conf.Configuration
;
import
org.apache.hadoop.fs.FileSystem
;
import
org.apache.hadoop.fs.Path
;
import
org.apache.hadoop.io.LongWritable
;
import
org.apache.hadoop.io.NullWritable
;
import
org.apache.hadoop.io.Text
;
import
org.apache.hadoop.mapreduce.Job
;
import
org.apache.hadoop.mapreduce.Mapper
;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat
;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
;
import
java.io.IOException
;
public
class
ETLAppDebug
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// System.setProperty("HADOOP_USER_NAME","test");
Configuration
configuration
=
new
Configuration
();
// configuration.set("fs.defaultFS","hdfs://localhost:8020");
FileSystem
fileSystem
=
FileSystem
.
get
(
configuration
);
// Path outputPath = new Path(args[1]);
Path
outputPath
=
new
Path
(
"./input/etl"
);
if
(
fileSystem
.
exists
(
outputPath
))
{
fileSystem
.
delete
(
outputPath
,
true
);
}
Job
job
=
Job
.
getInstance
(
configuration
);
job
.
setJarByClass
(
ETLAppDebug
.
class
);
job
.
addArchiveToClassPath
(
new
Path
(
"./input/data/fastjson-1.2.79.jar"
));
// job.addArchiveToClassPath(new Path("/jar/fastjson-1.2.79.jar"));
job
.
setMapperClass
(
MyMapper
.
class
);
job
.
setMapOutputKeyClass
(
NullWritable
.
class
);
job
.
setMapOutputValueClass
(
Text
.
class
);
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileInputFormat
.
setInputPaths
(
job
,
new
Path
(
"./input/data/app.out"
));
FileOutputFormat
.
setOutputPath
(
job
,
new
Path
(
"./input/etl"
));
job
.
waitForCompletion
(
true
);
}
static
class
MyMapper
extends
Mapper
<
LongWritable
,
Text
,
NullWritable
,
Text
>
{
@Override
protected
void
map
(
LongWritable
key
,
Text
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
String
timestamp
=
StringUtil
.
isNull
;
String
rest
=
StringUtil
.
isNull
;
String
trace
=
StringUtil
.
isNull
;
String
requestUri
=
StringUtil
.
isNull
;
String
status
=
StringUtil
.
isNull
;
String
request
=
StringUtil
.
isNull
;
String
response
=
StringUtil
.
isNull
;
String
method
=
StringUtil
.
isNull
;
String
uid
=
StringUtil
.
isNull
;
String
biz_data
=
StringUtil
.
isNull
;
String
code
=
StringUtil
.
isNull
;
String
log
=
value
.
toString
();
if
(
StringUtil
.
isMessyCode
(
log
))
{
rest
=
"messy"
;
}
else
{
JSONObject
obj
=
JSONObject
.
parseObject
(
log
);
timestamp
=
obj
.
getString
(
"timestamp"
);
timestamp
=
StringUtil
.
format
(
timestamp
);
if
(
StringUtil
.
isMessyCode
(
timestamp
))
{
timestamp
=
""
;
}
rest
=
obj
.
getString
(
"rest"
);
rest
=
StringUtil
.
format
(
rest
);
if
(
StringUtil
.
isMessyCode
(
rest
))
{
rest
=
""
;
}
trace
=
obj
.
getString
(
"trace"
);
trace
=
StringUtil
.
format
(
trace
);
if
(
StringUtil
.
isMessyCode
(
trace
))
{
trace
=
""
;
}
String
[]
split_rest
=
rest
.
split
(
"] \\["
);
if
(
split_rest
!=
null
&&
split_rest
.
length
==
11
)
{
rest
=
"ok"
;
requestUri
=
split_rest
[
2
]
!=
null
?
split_rest
[
2
].
replace
(
"requestUri:"
,
""
)
:
StringUtil
.
isNull
;
requestUri
=
StringUtil
.
format
(
requestUri
);
if
(
StringUtil
.
isMessyCode
(
requestUri
))
{
requestUri
=
""
;
}
status
=
split_rest
[
5
]
!=
null
?
split_rest
[
5
].
replace
(
"status:"
,
""
)
:
StringUtil
.
isNull
;
status
=
StringUtil
.
format
(
status
);
if
(
StringUtil
.
isMessyCode
(
status
))
{
status
=
""
;
}
request
=
split_rest
[
8
]
!=
null
?
split_rest
[
8
].
replace
(
"request:"
,
""
)
:
StringUtil
.
isNull
;
request
=
StringUtil
.
format
(
request
);
if
(
StringUtil
.
isMessyCode
(
request
))
{
request
=
"messy"
;
status
=
"messy"
;
}
response
=
split_rest
[
9
]
!=
null
?
split_rest
[
9
].
replace
(
"response:"
,
""
)
:
StringUtil
.
isNull
;
response
=
StringUtil
.
format
(
response
);
if
(
StringUtil
.
isMessyCode
(
response
))
{
response
=
""
;
}
if
(
request
!=
null
&&
!
""
.
equals
(
request
))
{
String
[]
requests
=
request
.
split
(
"&"
);
if
(
requests
!=
null
&&
requests
.
length
==
10
)
{
method
=
requests
[
1
]
!=
null
?
requests
[
1
].
replace
(
"method="
,
""
)
:
StringUtil
.
isNull
;
method
=
StringUtil
.
format
(
method
);
if
(
StringUtil
.
isMessyCode
(
method
))
{
method
=
""
;
}
uid
=
requests
[
2
]
!=
null
?
requests
[
2
].
replace
(
"uid="
,
""
)
:
StringUtil
.
isNull
;
uid
=
StringUtil
.
format
(
uid
);
if
(
StringUtil
.
isMessyCode
(
uid
))
{
uid
=
""
;
}
biz_data
=
requests
[
6
]
!=
null
?
requests
[
6
].
replace
(
"biz_data="
,
""
)
:
StringUtil
.
isNull
;
biz_data
=
StringUtil
.
format
(
biz_data
);
if
(
StringUtil
.
isMessyCode
(
biz_data
))
{
biz_data
=
""
;
}
}
}
if
(
response
!=
null
&&
!
""
.
equals
(
response
)
&&
JsonUtil
.
isJson
(
response
))
{
code
=
StringUtil
.
format
(
JSONObject
.
parseObject
(
response
).
getString
(
"code"
));
if
(
StringUtil
.
isMessyCode
(
code
))
{
code
=
""
;
}
}
}
}
StringBuilder
builder
=
new
StringBuilder
();
builder
.
append
(
rest
).
append
(
"\t"
);
builder
.
append
(
requestUri
).
append
(
"\t"
);
builder
.
append
(
request
).
append
(
"\t"
);
builder
.
append
(
response
).
append
(
"\t"
);
builder
.
append
(
status
).
append
(
"\t"
);
builder
.
append
(
method
).
append
(
"\t"
);
builder
.
append
(
uid
).
append
(
"\t"
);
builder
.
append
(
biz_data
).
append
(
"\t"
);
builder
.
append
(
code
).
append
(
"\t"
);
builder
.
append
(
trace
).
append
(
"\t"
);
builder
.
append
(
timestamp
);
context
.
write
(
NullWritable
.
get
(),
new
Text
(
builder
.
toString
()));
}
}
}
Please
register
or
sign in
to post a comment