1、Web端管理工作流
Azkaban提供了易操作的Web管理界面,具体操作可参考:http://azkaban.github.io/azkaban/docs/2.5/#ajax-api
值得注意的是,可使用Azkaban提供的Web界面,定义或覆盖工作流的具体履行参数,操作界面以下:
2.1、登录认证
命令:curl -k -X POST --data "action=login&username=azkaban&password=azkaban" https://localhost:8443
返回json格式:
<span style="white-space:pre"> </span>{
"status" : "success",
"session.id" : "c001aba5-a90f⑷daf⑻f11⑹2330d034c0a"
}
2.2、创建1个Project
命令:curl -k -X POST --data "session.id=9089beb2⑸76d⑷7e3-b040⑻6dbdc7f523e&name=aaaa&description=11" https://localhost:8443/manager?action=create
返回json格式:
<span style="white-space:pre"> </span>{
"status":"success",
"path":"manager?project=aaaa",
"action":"redirect"
}
2.3、上传1个Project的zip文件
命令:curl -k -i -H "Content-Type: multipart/mixed" -X POST --form 'session.id=e7a29776⑸783⑷9d7-afa0-b0e688096b5e' --form 'ajax=upload' --form 'file=@myproject.zip;type=application/zip' --form 'project=MyProject' https://localhost:8443/manager
注意:在zip文件的目录下履行该命令,否则没法找到zip文件
返回json格式:
<span style="white-space:pre"> </span>{
"error" : "Installation Failed.
Error unzipping file.",
"projectId" : "192",
"version" : "1"
}
2.4、履行工作流
命令:curl -k --get --data 'session.id=189b956b-f39f⑷21e⑼a95-e3117e7543c9' --data 'ajax=executeFlow' --data 'project=azkaban-test-project' --data 'flow=test' https://localhost:8443/executor
返回json格式:
<span style="white-space:pre"> </span>{
message: "Execution submitted successfully with exec id 295",
project: "foo-demo",
flow: "test",
execid: 295
}
3、使用Ajax API操作工作流
在程序中履行工作流的话,则需要使用Azkaban提供的ajax api了,以下是使用Java摹拟https要求的代码:
AzkabanHttpsPost类
package hadoop.azkaban;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.util.Properties;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import net.sf.json.JSONObject;
public class AzkabanHttpsPost {
static String keystorePassword;
static String keystore;
static String truststore;
static{
InputStream is=Thread.currentThread().getContextClassLoader().
getResourceAsStream("azkaban.properties");
Properties p=new Properties();
try{
p.load(is);
keystorePassword = p.getProperty("PASSWORD");
keystore = p.getProperty("KEYSTORE");
truststore = p.getProperty("TRUSTSTORE");
}catch(Exception e){
e.printStackTrace();
}
}
/**
* 取得KeyStore.
*
* @param storePath
* 密钥库路径
* @param password
* 密码
* @return 密钥库
* @throws Exception
*/
public static KeyStore getKeyStore(String password, String storePath)
throws Exception {
// 实例化密钥库
KeyStore ks = KeyStore.getInstance("JKS");
// 取得密钥库文件流
FileInputStream is = new FileInputStream(storePath);
// 加载密钥库
ks.load(is, password.toCharArray());
// 关闭密钥库文件流
is.close();
return ks;
}
/**
* 取得SSLSocketFactory.
*
* @param password
* 密码
* @param keyStorePath
* 密钥库路径
* @param trustStorePath
* 信任库路径
* @return SSLSocketFactory
* @throws Exception
*/
public static SSLContext getSSLContext() throws Exception {
// 实例化密钥库
KeyManagerFactory keyManagerFactory = KeyManagerFactory
.getInstance(KeyManagerFactory.getDefaultAlgorithm());
// 取得密钥库
KeyStore keyStore = getKeyStore(AzkabanHttpsPost.keystorePassword, AzkabanHttpsPost.keystore);
// 初始化密钥工厂
keyManagerFactory.init(keyStore, AzkabanHttpsPost.keystorePassword.toCharArray());
// 实例化信任库
TrustManagerFactory trustManagerFactory = TrustManagerFactory
.getInstance(TrustManagerFactory.getDefaultAlgorithm());
// 取得信任库
KeyStore trustStore = getKeyStore(AzkabanHttpsPost.keystorePassword, AzkabanHttpsPost.truststore);
// 初始化信任库
trustManagerFactory.init(trustStore);
// 实例化SSL上下文
SSLContext ctx = SSLContext.getInstance("TLS");
// 初始化SSL上下文
ctx.init(keyManagerFactory.getKeyManagers(),
trustManagerFactory.getTrustManagers(), null);
// 取得SSLSocketFactory
return ctx;
}
/**
* 初始化HttpsURLConnection.
*
* @param password
* 密码
* @param keyStorePath
* 密钥库路径
* @param trustStorePath
* 信任库路径
* @throws Exception
*/
public static void initHttpsURLConnection() throws Exception {
// 声明SSL上下文
SSLContext sslContext = null;
// 实例化主机名验证接口
HostnameVerifier hnv = new MyHostnameVerifier();
try {
sslContext = getSSLContext();
} catch (GeneralSecurityException e) {
e.printStackTrace();
}
if (sslContext != null) {
HttpsURLConnection.setDefaultSSLSocketFactory(sslContext
.getSocketFactory());
}
HttpsURLConnection.setDefaultHostnameVerifier(hnv);
}
/**
* 发送要求.
*
* @param httpsUrl
* 要求的地址,如https://localhost:8043
* @param xmlStr
* 要求的数据,如action=login&username=azkaban&password=azkaban
* @throws Exception
*/
public static JSONObject post(String url,String xmlStr) throws Exception {
initHttpsURLConnection();
JSONObject jsonObj = null;
HttpsURLConnection urlCon = null;
try {
urlCon = (HttpsURLConnection) (new URL(url)).openConnection();
urlCon.setDoInput(true);
urlCon.setDoOutput(true);
urlCon.setRequestMethod("POST");
// 以下设置后,azkaban才能辨认出是以ajax的方式访问,从而返回json格式的操作信息
urlCon.setRequestProperty("Content-Type",
"application/x-www-form-urlencoded");
urlCon.setRequestProperty("X-Requested-With", "XMLHttpRequest");
urlCon.setUseCaches(true);
// 设置为gbk可以解决
服务器接收时读取的数据中文乱码问题
urlCon.getOutputStream().write(xmlStr.getBytes("gbk"));
urlCon.getOutputStream().flush();
urlCon.getOutputStream().close();
BufferedReader in = new BufferedReader(new InputStreamReader(
urlCon.getInputStream()));
String line="";
String temp;
while ((temp = in.readLine()) != null) {
line = line + temp;
}
jsonObj = JSONObject.fromObject(line);
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return jsonObj;
}
MyHostnameVerifier类
package hadoop.azkaban;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLSession;
/**
* 实现用于主机名验证的基接口。
* 在握手期间,如果 URL 的主机名和
服务器的标识主机名不匹配,则验证机制可以回调此接口的实现程序来肯定是不是应当允许此连接。
*/
public class MyHostnameVerifier implements HostnameVerifier {
@Override
public boolean verify(String hostname, SSLSession session) {
if("localhost".equals(hostname)){
return true;
} else {
return false;
}
}
}
以下是调用https,操作Azkaban的示例:
package hadoop.azkaban;
import java.io.InputStream;
import java.util.Properties;
import net.sf.json.JSONObject;
/**
*
* @author hu
*
*/
public class AzkabanOperator {
public static String url;
public static String azkabanUser;
public static String azkabanPassword;
public static String GDI_Project;
public static String GDI_Workflow;
static {
InputStream is = Thread.currentThread().getContextClassLoader()
.getResourceAsStream("azkaban.properties");
Properties p = new Properties();
try {
p.load(is);
url = p.getProperty("URL");
azkabanUser = p.getProperty("AZKABANUSER");
azkabanPassword = p.getProperty("AZKABANPASSWORD");
GDI_Project = p.getProperty("GDI_Project");
GDI_Workflow = p.getProperty("GDI_Workflow");
} catch (Exception e) {
e.printStackTrace();
}
}
public JSONObject login() throws Exception {
JSONObject result = null;
String queryStr = "action=login&username=" + azkabanUser + "&password="
+ azkabanPassword;
result = AzkabanHttpsPost.post(url, queryStr);
return result;
}
public JSONObject executeGDIFlow(String sessionID, String project,
String flow, String cwParams, String smParams, String gdiParams)
throws Exception {
JSONObject result = null;
String executeStr = "session.id=" + sessionID
+ "&ajax=executeFlow&project=" + project + "&flow=" + flow
+ "&flowOverride[cw_params]=" + cwParams
+ "&flowOverride[sm_params]=" + smParams
+ "&flowOverride[gdi_params]=" + gdiParams;
String executeUrl = url + "/executor";
result = AzkabanHttpsPost.post(executeUrl, executeStr);
return result;
}
public JSONObject fetchFlow(String sessionID, String execID)
throws Exception {
JSONObject result = null;
String executeStr = "session.id=" + sessionID
+ "&ajax=fetchexecflow&execid=" + execID;
String executeUrl = url + "/executor";
result = AzkabanHttpsPost.post(executeUrl, executeStr);
return result;
}