业务需求:
通过yarn的restful api简单的对提交到yarn的flink任务进行运行状态预警监控。
官方文档地址:
Apache Hadoop 3.2.2 – ResourceManager REST APIs.
yarn的restful api:
1,我们主要看看
2,简单找个集群测试一下,查一下
简单代码演示:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.mzlion.easyokhttp.HttpClient;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.ListUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class GetYarnApps2 {
// private static volatile ConcurrentHashMap<String, String> currentTaskMap = new ConcurrentHashMap<String, String>();
private static volatile List<String> failedTaskList = new ArrayList<String>();
private static volatile List<String> failedTaskList2 = new ArrayList<String>();
public static void main(String[] args) throws InterruptedException {
// String url = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/apps";
// String url = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/apps?queue=users"; //todo 指定队列
//todo 生产环境
String url = "http://prod-qd-ct6-cdh-master01:8088/ws/v1/cluster/apps?queue=flink"; //todo 指定队列
String jobUrl= "http://prod-qd-ct6-cdh-master01:8088/proxy/application_1609329247342_52776/jobs";
String perfer_url = "http://prod-qd-ct6-cdh-master01:8088/proxy/";
String url2 = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/apps/application_1326821518301_0005"; //todo 查看指定任务
String url3 = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/apps/application_1326821518301_0005/state"; //todo 查看指定任务状态
String url4 = "http://dev-ct6-dc-master01:8088/ws/v1/cluster/metrics"; //todo 整个集群指标
// String url="http://dev-ct6-dc-master01:8088/cluster/apps/FINISHED";
// String url="http://dev-ct6-dc-master01:7180/static/apidocs/";
String responseData = HttpClient.get(url).asString();
JSONObject jsonObject = JSONObject.parseObject(responseData);
String app = jsonObject.getJSONObject("apps").getString("app");
while (true) {
failedTaskList.clear();
failedTaskList2.clear();
List<YarnApp> appList = JSON.parseArray(app, YarnApp.class);
for (int i = 0; i < appList.size(); i++) {
String queue = appList.get(i).getQueue();
String id = appList.get(i).getId();
String state = appList.get(i).getState();
String name = appList.get(i).getName();
// System.out.println("任务状态state = " + state);
System.out.println("yarn任务名称name = " + name+",___,"+"任务state = " + state+",___,"+"任务Id = " + id);
String jobs_url = perfer_url+id+"/jobs";
String job_responseData = HttpClient.get(jobs_url).asString();
JSONObject job_json = JSONObject.parseObject(job_responseData);
String job_id = job_json.getJSONArray("jobs").getJSONObject(0).getString("id");
String job_status = job_json.getJSONArray("jobs").getJSONObject(0).getString("status");
System.out.println("job_id = " + job_id+",________,job_status = "+job_status);
System.out.println();
String erveryTastStr = name+","+id+","+state+","+job_id+","+job_status;
failedTaskList.add(erveryTastStr);
failedTaskList2.add(erveryTastStr);
}
System.out.println("failedTaskList = " + failedTaskList);
System.out.println("failedTaskList2 = " + failedTaskList2);
List<Integer> list = ListUtils.removeAll(failedTaskList, failedTaskList2);
if (list.size()>0){
System.out.println("11111111");
}else {
System.out.println(22222222);
}
TimeUnit.SECONDS.sleep(30);
}
}
//todo 循环对比目前的任务是否存在。
public static Boolean RunningTaskList(){
//todo 在这里可以获取mysql或者redis当前我们提交的任务。
//todo 这里的案例是从redis获取,或者本地写死
// private static volatile List<String> failedTaskList = Collections.synchronizedList(new ArrayList<String>());
List<String> list1 = Arrays.asList("Zeppelin Flink on yarn");
// List<String> list2= Arrays.asList("Zeppelin Flink on yarn");
boolean equalCollection = CollectionUtils.isEqualCollection(list1, failedTaskList);
return equalCollection;
}
}
3, flink on yarn任务监控,我们去需要拿到applicationId 和 JobId
~通过applicationId 也可以拿到JobId
但是我们正常启动flink任务之后,在打印日志里面可以获取到applicationId和jobId,我们可以保存到数据库,这个在我之前写的文章里面有写到。
4,通过restful api监控flink任务,我这里只展示几个有用的,简单的。
参考官方文档:
Apache Flink 1.11 Documentation: Metrics
1,flink checkpoint的metric
案例:
通过restful api查询flink任务最后一次checkpoint的地址(任务要在运行中)
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.mzlion.easyokhttp.HttpClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.stream.Collectors;
public class YarnHttpUtils2 {
private static String getCheckpointUrl(String mater, String applicationId, String jobId) {
// String url = "http://prod-qd-ct6-cdh-master01:8088/ws/v1/cluster/apps";
String url = mater + "/proxy/" + applicationId + "/jobs/" + jobId + "/metrics?get=lastCheckpointExternalPath";
// String url = "http://ct7-cdh-master02:8088/proxy/"+applicationId+"/jobs/"+jobId+"/metrics?get=lastCheckpointExternalPath";
// String url= mater + "/proxy/" + applicationId+"/jobs";
String value = "";
try {
String responseData = HttpClient.get(url).asString();
JSONArray jsonArray = JSONArray.parseArray(responseData);
value = jsonArray.getJSONObject(0).getString("value");
System.out.println("value = " + value);
return value;
} catch (Exception ex) {
ex.getStackTrace();
System.out.println("第一次没查询到...");
}
System.out.println("value = " + value);
return "";
}
public static void main(String[] args) throws InterruptedException {
// String url = "http://ct7-cdh-master02:8088/proxy/application_1641381557441_0067/jobs/fd3e4ba2d3cb7e8d0785905f2a9868c2/metrics?get=lastCheckpointExternalPath";
String checkpointUrl = getCheckpointUrl("http://ct7-cdh-master02:8088",
"application_1641381557441_0109",
"1b78e99add472bf0db9e011a363ebd43");
if (StringUtils.isEmpty(checkpointUrl) || !checkpointUrl.contains("checkpoints/flink-1.13.0/cdc/rocksDBStateBackend")) {
checkpointUrl = getCheckpointUrl("http://ct7-cdh-master02:8088",
"application_1641381557441_0109",
"1b78e99add472bf0db9e011a363ebd43");
}
System.out.println("checkpointUrl = " + checkpointUrl);
}
}
输出结果:
checkpointUrl = hdfs://ct7-cdh-master02:8020/checkpoints/flink-1.13.0/cdc/rocksDBStateBackend/6a9299c415b3f7acd3a3d5bfa13f6ac9/chk-361
主要是看job的信息,这里就不演示了 很简单,跟着官网做: