Verbessern Sie die Geschwindigkeit der Funken-App

Dies ist Teil meines Python-Funken-Codes, dessen Teile zu langsam für meine Bedürfnisse laufen. Besonders dieser Teil des Codes, den ich wirklich gerne verbessern würde, ist Geschwindigkeit, aber weiß nicht, wie es geht. Es dauert derzeit ca. 1 Minute für 60 Millionen Datenzeilen und ich möchte es auf unter 10 Sekunden verbessern.

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() 

Mehr Kontext meiner Funken-App:

  • Boost.Python: Wrap-Funktionen, um die GIL freizugeben
  • Schreiben von Pythonbindungen für C ++ - Code, die OpenCV verwenden
  • Boost.python Argument Typ Mismatch (numpy.int64 -> int)
  • Wie man benutzerdefinierte Float-Typ numpy dtypes (C-API)
  • Wie man Python / Boost Python Projekte organisiert
  • Boost.python: Argumenttypen stimmen nicht mit C ++ - Signatur überein
  •  article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2) axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \ .map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \ .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \ .filter(lambda x:len(x[1])>=2) \ .map(lambda x:x[1][-1]) \ .map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0)))) 

    Vielen Dank für Ihre Anregungen.

    BEARBEITEN:

    Graf nimmt die meiste Zeit (50s) nicht teil

    Ich habe auch versucht, Parallelität zu machen, aber es hatte keine offensichtliche Wirkung:

     sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number) 

    und

     sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load() 

    Dies ist das Bild vom Funken, wie lange jede Operation dauert

  • So importiere ich Standardbibliothek anstelle des gleichnamigen Moduls im Modulpfad
  • Python Fibonacci Generator
  • Underscore _ als Variablenname in Python [Duplikat]
  • Python: Wie Batch umbenennen gemischte Fall zu Kleinbuchstaben mit Unterstrichen
  • Was ist der Zweck der einzelnen Unterstrich "_" Variable in Python?
  • One Solution collect form web for “Verbessern Sie die Geschwindigkeit der Funken-App”

    Zuerst sollten Sie herausfinden, was tatsächlich die meiste Zeit in Anspruch nimmt.

    Zum Beispiel bestimmen, wie lange nur das Lesen der Daten dauert

     axes = sqlContext .read .format("org.apache.spark.sql.cassandra") .options(table="axes", keyspace=source) .load() .count() 

    Die Erhöhung der Parallelität oder die Anzahl der parallelen Leser kann dazu beitragen, aber nur, wenn Sie nicht maxing aus dem IO Ihres Cassandra Clusters.

    Zweitens, wenn du alles mit dem Dataframes api machen kannst. Jedes Mal, wenn Sie eine Python Lambda Sie sind Serialisierung Kosten zwischen der Python und Scala-Typen.

    Bearbeiten:

     sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number) 

    Wird erst nach Abschluss der Last wirksam werden, so wird dir das nicht helfen.

     sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load() 

    Ist kein gültiger Parameter für den Spark Cassandra Connector, also wird das nichts machen.

    Siehe https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#read-tuning-parameters Input Split Size bestimmt, wie viele C * -Partitionen eine Spark Partition einfügen.

    Python ist die beste Programmiersprache der Welt.