Spark基于ALS推荐算法实现商品推荐实战Demo(Java版/Scala版)

一、数据集准备

1,1,5
1,2,3.5
1,4,1.2
1,6,0
2,1,4.5
2,2,3.6
2,3,4.9
3,3,5.0
3,4,2.0
3,5,5.0
3,6,1.9
4,2,3.3
4,5,4.6
4,6,0
5,2,2.5
5,3,4.2
5,4,3.7

二、代码部分

Java版本

  • ALSRecommendJava.java
package top.it1002.spark.ml.Recommend;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.rdd.RDD;
import scala.Tuple2;

/**
 * @Author      王磊
 * @Date        2018/12/18
 * @ClassName   ALSRecommendJava
 * @De ion Java版ALS推荐算法demo
 **/
public class ALSRecommendJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName(\"als\").setMaster(\"local[5]\");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> lines = jsc.textFile(\"C:\\\\Users\\\\asus\\\\Desktop\\\\data\\\\Recommend.txt\");
        // 映射
        RDD<Rating> ratingRDD = lines.map(new Function<String, Rating>() {
            public Rating call(String line) throws Exception {
                String[] arr = line.split(\",\");
                return new Rating(new Integer(arr[0]), new Integer(arr[1]), Double.parseDouble(arr[2]));
            }
        }).rdd();

        MatrixFactorizationModel model = ALS.train(ratingRDD, 10, 10);
        // 通过原始数据进行测试
        JavaPairRDD<Integer, Integer> testJPRDD = ratingRDD.toJavaRDD().mapToPair(new PairFunction<Rating, Integer, Integer>() {
            public Tuple2<Integer, Integer> call(Rating rating) throws Exception {
                return new Tuple2<Integer, Integer>(rating.user(), rating.product());
            }
        });
        // 对原始数据进行推荐值预测
        JavaRDD<Rating> predict = model.predict(testJPRDD);
        System.out.println(\"原始数据测试结果为:\");
        predict.foreach(new VoidFunction<Rating>() {
            public void call(Rating rating) throws Exception {
                System.out.println(\"UID:\" + rating.user() + \",PID:\" + rating.product() + \",SCORE:\" + rating.rating());
            }
        });

        // 向指定id的用户推荐n件商品
        Rating[] predictProducts = model.recommendProducts(1, 3);
        System.out.println(\"\\r\\n向指定id的用户推荐n件商品\");
        for(Rating r1:predictProducts){
            System.out.println(\"UID:\" + r1.user() + \",PID:\" + r1.product() + \",SCORE:\" + r1.rating());
        }
        // 向指定id的商品推荐给n给用户
        Rating[] predictUsers = model.recommendUsers(2, 4);
        System.out.println(\"\\r\\n向指定id的商品推荐给n给用户\");
        for(Rating r1:predictProducts){
            System.out.println(\"UID:\" + r1.user() + \",PID:\" + r1.product() + \",SCORE:\" + r1.rating());
        }
        // 向所有用户推荐N个商品
        RDD<Tuple2< , Rating[]>> predictProductsForUsers = model.recommendProductsForUsers(3);
        System.out.println(\"\\r\\n******向所有用户推荐N个商品******\");
        predictProductsForUsers.toJavaRDD().foreach(new VoidFunction<Tuple2< , Rating[]>>() {
            public void call(Tuple2< , Rating[]> tuple2) throws Exception {
                System.out.println(\"以下为向id为:\" + tuple2._1 + \"的用户推荐的商品:\");
                for(Rating r1:tuple2._2){
                    System.out.println(\"UID:\" + r1.user() + \",PID:\" + r1.product() + \",SCORE:\" + r1.rating());
                }
            }
        });
        // 将所有商品推荐给n个用户
        RDD<Tuple2< , Rating[]>> predictUsersForProducts = model.recommendUsersForProducts(2);
        System.out.println(\"\\r\\n******将所有商品推荐给n个用户******\");
        predictUsersForProducts.toJavaRDD().foreach(new VoidFunction<Tuple2< , Rating[]>>() {
            public void call(Tuple2< , Rating[]> tuple2) throws Exception {
                System.out.println(\"以下为向id为:\" + tuple2._1 + \"的商品推荐的用户:\");
                for(Rating r1:tuple2._2){
                    System.out.println(\"UID:\" + r1.user() + \",PID:\" + r1.product() + \",SCORE:\" + r1.rating());
                }
            }
        });
    }
}

Scala版本

  • ALSRecommendScala.scala
package top.it1002.spark.ml.Recommend

import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @Author       王磊
  * @Date         2018/12/18
  * @ClassName    ALSRecommendJava
  * @De ion  Scala版ALS推荐算法demo
  **/

  ALSRecommendScala {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(\"als\").setMaster(\"local[3]\")
    val session = SparkSession.builder().config(conf).getOrCreate()
    val lines = session.sparkContext.textFile(\"C:\\\\Users\\\\asus\\\\Desktop\\\\data\\\\Recommend.txt\")

    val trainRDD = lines.map{
      lines =>
        val arr = lines.split(\",\")
        Rating(arr(0).toInt, arr(1).toInt, arr(2).toDouble)
    }
    // 训练数据生成模型
    val model = ALS.train(trainRDD, 10, 10)
    // 使用原始数据生成测试数据
    val testRDD = trainRDD.map{
      case Rating(uid, pid, score) => (uid, pid)
    }
    // val predict = model.predict(testRDD)
    // predict.foreach(println)
    // 推荐n件商品给指定id的用户(给id为1的用户推荐五件商品)
    val predictProducts = model.recommendProducts(1, 5)
    println(\"==推荐n件商品给指定id的用户==\")
    predictProducts.foreach{
      case Rating(uid, pid, score) => println(\"UID:\" + uid + \",SCORE:\" +  pid, score)
    }
    // 推荐指定id的商品给n个用户(id为2的商品推荐给三个用户)
    val predictUsers = model.recommendUsers(2, 3)
    println(\"\\r\\n==推荐指定id的商品给n个用户==\")
    predictUsers.foreach{
      case Rating(uid, pid, score) => println(\"UID:\" + uid + \",SCORE:\" +  pid, score)
    }
    // 推荐n件商品给所有用户(四件商品)
    val predictProductsForUsers = model.recommendProductsForUsers(4)
    predictProductsForUsers.foreach{
      t =>
        println(\"\\r\\n向id为:\" + t._1 + \"的用户推荐了以下四件商品:\")
        for(i <- 0 until t._2.length){
          println(\"UID:\" + t._2(i).user + \",PID:\" + t._2(i).product + \",SCORE:\" + t._2(i).rating)
        }
    }
    // 推荐n个用户给所有商品(三个用户)
    val predictUsersForProducts = model.recommendUsersForProducts(3)
    predictUsersForProducts.foreach{
      t =>
        println(\"\\r\\n向id为:\" + t._1 + \"的商品推荐了以下三个用户:\")
        for(i <- 0 until t._2.length){
          println(\"UID:\" + t._2(i).user + \",PID:\" + t._2(i).product + \",SCORE:\" + t._2(i).rating)
        }
    }

  }
}

三、测试结果

\"在这里插入图片描述\"

后续会持续更新spark mllib相关小例子

收藏 打印