发布日:2022-03-03 15:21 阅读数:
# coding=utf-8 import numpy as np class LR(object): @staticmethod def fn(w, x): '''决策函数为sigmoid函数 ''' return 1.0 / (1.0 + np.exp(-w.dot(x))) @staticmethod def loss(y, y_hat): '''交叉熵损失函数 ''' return np.sum(np.nan_to_num(-y * np.log(y_hat) - (1 - y) * np.log(1 - y_hat))) @staticmethod def grad(y, y_hat, x): '''交叉熵损失函数对权重w的一阶导数 ''' return (y_hat - y) * x class FTRL(object): def __init__(self, dim, l1, l2, alpha, beta, decisionFunc=LR): self.dim = dim self.decisionFunc = decisionFunc self.z = np.zeros(dim) self.n = np.zeros(dim) self.w = np.zeros(dim) self.l1 = l1 self.l2 = l2 self.alpha = alpha self.beta = beta def predict(self, x): return self.decisionFunc.fn(self.w, x) def update(self, x, y): self.w = np.array([0 if np.abs(self.z[i]) <= self.l1 else (np.sign( self.z[i]) * self.l1 - self.z[i]) / (self.l2 + (self.beta + np.sqrt(self.n[i])) / self.alpha) for i in xrange(self.dim)]) y_hat = self.predict(x) g = self.decisionFunc.grad(y, y_hat, x) sigma = (np.sqrt(self.n + g * g) - np.sqrt(self.n)) / self.alpha self.z += g - sigma * self.w self.n += g * g return self.decisionFunc.loss(y, y_hat) def train(self, trainSet, verbos=False, max_itr=100000000, eta=0.01, epochs=100): itr = 0 n = 0 while True: for x, y in trainSet: loss = self.update(x, y) if verbos: print "itr=" + str(n) + "\tloss=" + str(loss) if loss < eta: itr += 1 else: itr = 0 if itr >= epochs: # 损失函数已连续epochs次迭代小于eta print "loss have less than", eta, " continuously for ", itr, "iterations" return n += 1 if n >= max_itr: print "reach max iteration", max_itr return class TestData(object): def __init__(self, file, d): self.d = d self.file = file def __iter__(self): with open(self.file, 'r') as f_in: for line in f_in: arr = line.strip().split() if len(arr) >= (self.d + 1): yield (np.array([float(x) for x in arr[0:self.d]]), float(arr[self.d])) if __name__ == '__main__': d = 4 testData = TestData("train.txt", d) ftrl = FTRL(dim=d, l1=1.0, l2=1.0, alpha=0.1, beta=1.0) ftrl.train(testData, verbos=False, max_itr=100000, eta=0.01, epochs=100) w = ftrl.w print w correct = 0 wrong = 0 for x, y in testData: y_hat = 1.0 if ftrl.predict(x) > 0.5 else 0.0 if y == y_hat: correct += 1 else: wrong += 1 print "correct ratio", 1.0 * correct / (correct + wrong)
Alink 是阿里巴巴基于实时计算引擎 Flink 研发的新一代机器学习算法平台,是业界首个同时支持批式算法、流式算法的机器学习平台,Alink 中提供了在线学习算法FTRL在Alink中的实现,主要流程如下:
● 建立特征处理管道,其包括StandardScaler和FeatureHasher,进行标准化缩放和特征哈希,最后得到了特征向量。
Pipeline featurePipeline = new Pipeline().add(new StandardScaler().setSelectedCols(numericalColNames)).add(new FeatureHasher().setSelectedCols(selectedColNames).setCategoricalCols(categoryColNames).setOutputCol(vecColName).setNumFeatures(numHashFeatures)); // fit feature pipeline model// 构建特征工程流水线 PipelineModel featurePipelineModel = featurePipeline.fit(trainBatchData);
● 准备数据集这里构建kafka之类的流式数据,并进行实时切分得到原始训练数据和原始预测数据,
// 准备流式数据集 CsvSourceStreamOp data = new CsvSourceStreamOp() .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-ctr-train-8M.csv") .setSchemaStr(schemaStr) .setIgnoreFirstLine(true); // 这里可以采用kafaka数据源 KafkaSourceStreamOp soure = new KafkaSourceStreamOp() .setBootstrapServers("localhost:9092") .setTopic("train_data_topic") .setStartupMode("EARLIEST") .setGroupId(""); // 对于流数据源进行实时切分得到原始训练数据和原始预测数据 SplitStreamOp splitter = new SplitStreamOp().setFraction(0.5).linkFrom(data);
● 训练出一个逻辑回归模型作为FTRL算法的初始模型,这是为了系统冷启动的需要。
LogisticRegressionTrainBatchOp lr = new LogisticRegressionTrainBatchOp() .setVectorCol(vecColName) .setLabelCol(labelColName) .setWithIntercept(true) .setMaxIter(10); BatchOperator<?> initModel = featurePipelineModel.transform(trainBatchData).link(lr);
● 在初始模型基础上进行FTRL在线训练;
// 在初始模型基础上进行FTRL在线训练 FtrlTrainStreamOp model = new FtrlTrainStreamOp(initModel) .setVectorCol(vecColName) .setLabelCol(labelColName) .setWithIntercept(true) .setAlpha(0.1) .setBeta(0.1) .setL1(0.01) .setL2(0.01) .setTimeInterval(10) .setVectorSize(numHashFeatures) .linkFrom(featurePipelineModel.transform(splitter));
● 在FTRL在线模型的基础上,连接预测数据进行预测;/
FtrlPredictStreamOp predictResult = new FtrlPredictStreamOp(initModel) .setVectorCol(vecColName) .setPredictionCol("pred") .setReservedCols(new String[]{labelColName}) .setPredictionDetailCol("details") .linkFrom(model, featurePipelineModel.transform(splitter.getSideOutput(0)));
● 对预测结果流进行评估
// 对预测结果流进行评估 predictResult .link( new EvalBinaryClassStreamOp().setLabelCol(labelColName).setPredictionCol("pred") .setPredictionDetailCol("details").setTimeInterval(10)).link(new JsonValueStreamOp() .setSelectedCol("Data") .setReservedCols(new String[]{"Statistics"}).setOutputCols(new String[]{"Accuracy", "AUC", "ConfusionMatrix"}).setJsonPath(new String[]{".AUC", "$.ConfusionMatrixx"}) ) .print(); StreamOperator.execute();
在开发打包成jar包的时候,遇到两个问题,一个是没有把依赖包导入到jar中,提交到flink集群的时候,任务找不到相关类,另外就是打包的时候包flink相关的包打进去了造成与flink lib中的jar包冲突 ,所以注意maven 打包排除 flink包,以免报错
wget https://archive.apache.org/dist/flink/flink-1.13.0/flink-1.13.0-bin-scala_2.11.tgz tar -xf flink-1.13.0-bin-scala_2.11.tgz && cd flink-1.13.0 ./bin/start-cluster.sh
./bin/flink run -p 1 -c org.example.FTRLExample FlinkOnlineProject-1.0-SNAPSHOT.jar
提交任务到flink集群后可以通过flink web ui查看任务状态,一般如果是local模式运行,在浏览器输入 http://localhost:8081/ 就可以看到所有提交到flink 集群上的状态、以及checkpoint、反压之类的 ,如下图所示任务运行状态:
编辑:航网科技 来源:腾讯云
本文版权归原作者所有 转载请注明出处
Copyright © 2011-2020 www.hangw.com. All Rights Reserved 深圳航网科技有限公司 版权所有 增值电信业务经营许可证:粤B2-20201122 - 粤ICP备14085080号
微信扫一扫咨询客服
全国免费服务热线
0755-36300002