forked from docs/modelarts
Reviewed-by: Jiang, Beibei <beibei.jiang@t-systems.com> Co-authored-by: proposalbot <proposalbot@otc-service.com> Co-committed-by: proposalbot <proposalbot@otc-service.com>
3.2 KiB
3.2 KiB
- original_name
modelarts_23_0178.html
PySpark
Training and Saving a Model
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
# Prepare training data using tuples.
# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
# Create a training instance. The logistic regression algorithm is used for training.
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Train the logistic regression model.
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model = lr.fit(training)
# Save the model to a local directory.
# Save model to local path.
model.save("/tmp/spark_model")
After the model is saved, it must be uploaded to the OBS directory before being published. The config.json configuration and customize_service.py must be contained during publishing. For details about the definition method, see Model Package Specifications <modelarts_23_0091>
.
Inference Code
# coding:utf-8
import collections
import json
import traceback
import model_service.log as log
from model_service.spark_model_service import SparkServingBaseService
from pyspark.ml.classification import LogisticRegression
logger = log.getLogger(__name__)
class user_Service(SparkServingBaseService):
# Pre-process data.
def _preprocess(self, data):
logger.info("Begin to handle data from user data...")
# Read data.
req_json = json.loads(data, object_pairs_hook=collections.OrderedDict)
try:
# Convert data to the spark dataframe format.
predict_spdf = self.spark.createDataFrame(pd.DataFrame(req_json["data"]["req_data"]))
except Exception as e:
logger.error("check your request data does meet the requirements ?")
logger.error(traceback.format_exc())
raise Exception("check your request data does meet the requirements ?")
return predict_spdf
# Perform model inference.
def _inference(self, data):
try:
# Load a model file.
predict_model = LogisticRegression.load(self.model_path)
# Perform data inference.
prediction_result = predict_model.transform(data)
except Exception as e:
logger.error(traceback.format_exc())
raise Exception("Unable to load model and do dataframe transformation.")
return prediction_result
# Post-process data.
def _postprocess(self, pre_data):
logger.info("Get new data to respond...")
predict_str = pre_data.toPandas().to_json(orient='records')
predict_result = json.loads(predict_str)
return predict_result