OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

How to run Multi threaded jobs in apache spark using scala or python?

  • Thread starter Thread starter Suraj
  • Start date Start date
S

Suraj

Guest
I am facing a problem related to concurrency in spark which is stopping me from using it in production but I know there is a way out of it. I am trying to run Spark ALS on 7 million users for a billion products using order history. Firstly I am taking a list of distinct Users and then running a loop on these users to get recommendations, which is pretty slow process and will take days to get recommendations for all users. I tried doing cartesian users and products to get recommendations for all at once but again to feed this to elasticsearch I have to filter and sort records for each users and only then I can feed it to elasticsearch to be consumed by other APIs.

So please suggest me a solution which is pretty scalable in such use case and to be used in production with realtime recommendations.

Here is my code snippet in scala which will give you an idea how I am currently approaching to solve the problem:

Code:
  //    buy_values -> RDD with Rating(<int user_id>, <int product_id>, <double rating>)
  def recommend_for_user(user: Int): Unit = {
      println("Recommendations for User ID: " + user);
      // Product IDs which are not bought by user 
      val candidates = buys_values
        .filter(x => x("customer_id").toString.toInt != user)
        .map(x => x("product_id").toString.toInt)
        .distinct().map((user, _))
      // find 30 products with top rating
      val recommendations = bestModel.get
        .predict(candidates)
        .takeOrdered(30)(Ordering[Double].reverse.on(x => x.rating))

      var i = 1
      var ESMap = Map[String, String]()
      recommendations.foreach { r =>
        ESMap += r.product.toString -> bitem_ids.value(r.product)
      }
      //  push to elasticsearch with user as id
      client.execute {
        index into "recommendation" / "items" id user fields ESMap
      }.await
      // remove candidate RDD from memory
      candidates.unpersist()
  }
  // iterate on each user to get recommendations for the user [slow process]
  user_ids.foreach(recommend_for_user)
<p>I am facing a problem related to concurrency in spark which is stopping me from using it in production but I know there is a way out of it. I am trying to run Spark ALS on 7 million users for a billion products using order history. Firstly I am taking a list of distinct Users and then running a loop on these users to get recommendations, which is pretty slow process and will take days to get recommendations for all users. I tried doing cartesian users and products to get recommendations for all at once but again to feed this to elasticsearch I have to filter and sort records for each users and only then I can feed it to elasticsearch to be consumed by other APIs.</p>

<p>So please suggest me a solution which is pretty scalable in such use case and to be used in production with realtime recommendations.</p>

<p>Here is my code snippet in scala which will give you an idea how I am currently approaching to solve the problem:</p>

<pre><code> // buy_values -> RDD with Rating(<int user_id>, <int product_id>, <double rating>)
def recommend_for_user(user: Int): Unit = {
println("Recommendations for User ID: " + user);
// Product IDs which are not bought by user
val candidates = buys_values
.filter(x => x("customer_id").toString.toInt != user)
.map(x => x("product_id").toString.toInt)
.distinct().map((user, _))
// find 30 products with top rating
val recommendations = bestModel.get
.predict(candidates)
.takeOrdered(30)(Ordering[Double].reverse.on(x => x.rating))

var i = 1
var ESMap = Map[String, String]()
recommendations.foreach { r =>
ESMap += r.product.toString -> bitem_ids.value(r.product)
}
// push to elasticsearch with user as id
client.execute {
index into "recommendation" / "items" id user fields ESMap
}.await
// remove candidate RDD from memory
candidates.unpersist()
}
// iterate on each user to get recommendations for the user [slow process]
user_ids.foreach(recommend_for_user)
</code></pre>
 

Latest posts

Top