Flink使用RestApi

flink是一个非常好用的流任务计算框架, 这次我们来试用flink的restApi来提交任务. 主要阐述几个常用的restapi, 包括上传jar包, 查询jar包, 提交任务, 查询任务, 删除任务等,
其它的比如删除jar包, 查询jobmanager, 查询taskmanager等等, 类推就可以得出了, 不在这里进行重复介绍了

1, 上传jar包

   public static boolean uploadJar( File jarFile) {
RequestBody requestBody = new MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart("file", jarFile.getName(),
RequestBody.create(MediaType.parse("multipart/form-data"), jarFile))
.build();

Request request = new Request.Builder()
.url("http://host:port/jars/upload")
.addHeader(userAgent, userAgentVal)
.post(requestBody)
.build();
Response resp = OkHttpUtils.execute(request);
if (OK == resp.code()) {
JSONObject body = JSON.parseObject(resp.body().string());
if ("success".equals(body.getString("status"))) {
return true;
}
}
return false;
}

2, 查询jar包

  Request request = new Request.Builder()
.url("http://host:port/jars")
.addHeader(userAgent, userAgentVal)
.get()
.build();
Response response = OkHttpUtils.execute(request);
String body = response.body().string();

3,提交任务
(特别提示: 提交任务时, Main方法中,容易出现参数解析异常, 为了解决这一个问题, 强烈建议, 对参数进行编码转换, 对programArgs参数进行URLEncoder.encode(参数值, “utf-8”), 然后再在flink运行jar包, 进行解码.

       String baseUrl = "http://host:port/jars/${jarId}/run";
Map<String, String> params = new HashMap<>();
params.put("programArgs", "xxxxxx");
params.put("entryClass", "com.xx.oo.JsonMain");
params.put("parallelism", "2");
params.put("savepointPath", null);
Request request = new Request.Builder()
.url(baseUrl)
.addHeader(userAgent, userAgentVal)
.post(RequestBody.create(JSON.toJSONString(params), MEDIA_TYPE_JSON))
.build();
Response resp = OkHttpUtils.execute(request);
String respBody = resp.body().string();
if (OK == resp.code()) {
JSONObject body = JSON.parseObject(respBody);
return body.getString("jobid");
}

4,查询任务

       String url= "http://host:port/jobs";
Request request = new Request.Builder()
.url(url)
.addHeader(userAgent, userAgentVal)
.get()
.build();
Response resp = OkHttpUtils.execute(request);
if (OK == resp.code()) {
JSONObject body = JSON.parseObject(resp.body().string());
if (body.containsKey("jobs")) {
JSONArray jobs = body.getJSONArray("jobs");
for (int i = 0; i < jobs.size(); i++) {
JSONObject jsb = jobs.getJSONObject(i);
String id = jsb.getString("id");
String status = jsb.getString("status");
}
}
}else{
logger.error("queryJobByHttp "+resp.body().string());
}

4,删除任务

       String url= "http://host:port/jobs/${jobId}";
Request request = new Request.Builder()
.url(baseUrl)
.addHeader(userAgent, userAgentVal)
.patch(RequestBody.create("{}", MEDIA_TYPE_JSON))
.build();
Response resp = OkHttpUtils.execute(request);
if (ACCEPTED == resp.code()) {
return jobId;
}
return null;

原创:https://www.panoramacn.com
源码网提供WordPress源码,帝国CMS源码discuz源码,微信小程序,小说源码,杰奇源码,thinkphp源码,ecshop模板源码,微擎模板源码,dede源码,织梦源码等。

专业搭建小说网站,小说程序,杰奇系列,微信小说系列,app系列小说

Flink使用RestApi

免责声明,若由于商用引起版权纠纷,一切责任均由使用者承担。

您必须遵守我们的协议,如果您下载了该资源行为将被视为对《免责声明》全部内容的认可-> 联系客服 投诉资源
www.panoramacn.com资源全部来自互联网收集,仅供用于学习和交流,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。 敬请谅解! 侵权删帖/违法举报/投稿等事物联系邮箱:2640602276@qq.com
未经允许不得转载:书荒源码源码网每日更新网站源码模板! » Flink使用RestApi
关注我们小说电影免费看
关注我们,获取更多的全网素材资源,有趣有料!
120000+人已关注
分享到:
赞(0) 打赏

评论抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

您的打赏就是我分享的动力!

支付宝扫一扫打赏

微信扫一扫打赏