admin管理员组文章数量:1794759
使用java代码提交flink job 任务
以下代码是使用java程序客户端提交flink job的示例代码
package client; import org.apache.flink.apimon.ExecutionConfig; import org.apache.flink.apimon.JobID; import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.streaming.api.graph.StreamGraph; import java.io.File; import java.util.ArrayList; import java.util.concurrent.CompletableFuture; /** * @ClassName FlinkClient * @Description TODO * @Author Getech * @Date 2021/6/24 17:59 */ public class FlinkClient { public static void main(String[] args) { String jarFilePath = "D:\\\\02develop\\\\2020workspace\\\\apache-flink\\\\example\\\\WordCountSQL.jar"; RestClusterClient<StandaloneClusterId> client = null; try { // 集群信 Configuration configuration = new Configuration(); configuration.setString(JobManagerOptions.ADDRESS, "10.8.4.129"); configuration.setInteger(JobManagerOptions.PORT, 6123); configuration.setInteger(RestOptions.PORT, 8081); client = new RestClusterClient<StandaloneClusterId>(configuration, StandaloneClusterId.getInstance()); int parallelism = 1; File jarFile=new File(jarFilePath); SavepointRestoreSettings savepointRestoreSettings=SavepointRestoreSettings.none(); PackagedProgram program = PackagedProgram.newBuilder() .setConfiguration(configuration) .setEntryPointClassName("org.apache.flink.table.examples.java.WordCountSQL") .setJarFile(jarFile) .setSavepointRestoreSettings(savepointRestoreSettings).build(); JobGraph jobGraph=PackagedProgramUtils.createJobGraph(program,configuration,parallelism,false); CompletableFuture<JobID> result = client.submitJob(jobGraph); JobID jobId= result.get(); System.out.println("提交完成"); System.out.println("jobId:"+ jobId.toString()); } catch (Exception e) { e.printStackTrace(); } } }
版权声明:本文标题:使用java代码提交flink job 任务 内容由林淑君副主任自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.xiehuijuan.com/baike/1686842603a109187.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论