星期日, 二月 08, 2015

在ipython notebook上运行spark

周末无事,看到了这篇文章。于是照猫画虎尝试了一番,顺便把官网上的文档看了一些。记录以下以备忘。(目前只尝试了spark在单机上的运行)

安装spark
只需要去官网下载预编译好的最新版本即可,然后回来解压
tar -xzf spark-1.2.0-bin-hadoop2.4.tgz
放到自己选定的目录中,加一个软链接
ln -s /srv/spark-1.2.0 /srv/spark
然后设置环境变量,~/.bash_profile or ~/.profile
export SPARK_HOME=/srv/spark
export PATH=$SPARK_HOME/bin:$PATH

之后在终端上运行pyspark应该可以看到欢迎画面了。

设置notebook
为了在ipython notebook中运行spark需要设置一个启动文件。先设置一个profile
ipython profile create spark
在路径下加一个py文件 $HOME/.ipython/profile_spark/startup/00-pyspark-setup.py

文件中放如下内容,注意和自己的目录有关。
import os
import sys
# Configure the environment
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = '/srv/spark'
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))

之后在启动notebook时要加一个profile参数
ipython notebook --profile spark

hello world
在notebook中尝试了spark的hello world任务,即word count。
from pyspark import  SparkContext
sc = SparkContext( 'local[4]')
lines = sc.textFile("some.txt")
words = lines.flatMap(lambda line: line.split())
pairs = words.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
result = counts.collect()
#counts.saveAsTextFile("wc")

后面有空再去研究如何用spark做数据分析和机器学习。

星期日, 二月 01, 2015

python数据挖掘模型的API部署

前文谈到了如何把一个R语言的挖掘模型进行在线部署,也就是生成一个API。本文则是同样的思路,只不过是来尝试将python的数据挖掘模型部署成一个API。由于python是通用型的编程语言,部署起来方便一些。下面的例子仍是一个简单的模型,用来预测iris种类。在mac系统中完成,使用了python的几个包:

flask 一个轻量级的web框架
flask.ext.restful 快速生成restful api
numpy 数值计算包
pickle 用来保存模型
sklearn 用来建模

只需要四个步骤:

步骤1:建立模型
步骤2:模型写入pickle文件
步骤3:构建一个基于flask的API
步骤4:通过API调用模型

第一步是用sklearn建模。

from sklearn.tree import DecisionTreeClassifier
import pandas as pd
from pickle import dump
df = pd.read_csv('iris.csv')
X = df.ix[:,:4].values
y = df.ix[:,4].values
model = DecisionTreeClassifier()
model.fit(X,y)

第二步是将模型写入到pickle文件中

dump(model, open('model.pickle','wb'))

第三步是构建一个API,写一个server.py文件

from flask import Flask, request
from flask.ext.restful import Resource, Api
import pandas as pd
from pickle import load

app = Flask(__name__)
api = Api(app)
model = load(open('model.pickle','rb'))

class Model(Resource):
    def post(self):
     res = pd.DataFrame(request.json,index=[0]) # df
    res = model.predict(res) # array
    res = res.tolist()[0] # str
    res = {'Species':res} # dict
        return res

api.add_resource(Model, '/')

if __name__ == '__main__':
    app.run(debug=True)

保存好py文件后启用它。
python server.py

API会在如下地址监听
Running on http://127.0.0.1:5000/

第四步是调用这个API。另开一个终端,输入
curl -H "Content-type: application/json"  -X POST http://127.0.0.1:5000/ -d '{"Sepal_Length":4.9,"Sepal_Width":3,"Petal_Length":1.4,"Petal_Width":0.2}'

成功返回
{
    "Species": "virginica"

}
也可以在python中实验。
from requests import post
import json

url = 'http://127.0.0.1:5000'
data = '{"Sepal_Width": 3, "Petal_Width": 0.2, "Sepal_Length": 4.9, "Petal_Length": 1.4}'
headers = {'content-type':'application/json'}
post(url, data=json.dumps(data), headers=headers).json()

上面的代码是用了flask的一个扩展库,如果直接用原生的flask也可以写
from flask import Flask, request, jsonify
import numpy as np
import pandas as pd
from pickle import load

app = Flask(__name__)

model = load(open('model.pickle','rb'))

from flask import Flask, request,json
app = Flask(__name__)

@app.route('/model', methods = ['POST'])
def api_message():
    if request.headers['Content-Type'] == 'application/json':
    res = pd.DataFrame(request.json,index=[0]) # df
    res = model.predict(res) # array
    res = res.tolist()[0] # str
    res = {'Species':res} # dict
        return jsonify(res)
    else:
        return "415 Unsupported Media Type ;)"

if __name__ == '__main__':
    app.run(debug=True)

最后在shell中执行curl测试,那么将是输入json,输出json了
curl -H "Content-type: application/json"  -X POST http://127.0.0.1:5000/model -d '{"Sepal_Length":4.9,"Sepal_Width":3,"Petal_Length":1.4,"Petal_Width":0.2}'