Estoy tratando de formatear la cadena en una de las columnas usando pyspark udf.

A continuación se muestra mi conjunto de datos:

+--------------------+--------------------+
|             artists|                  id|
+--------------------+--------------------+
|     ['Mamie Smith']|0cS0A1fUEUd1EW3Fc...|
|"[""Screamin' Jay...|0hbkKFIJm7Z05H8Zl...|
|     ['Mamie Smith']|11m7laMUgmOKqI3oY...|
| ['Oscar Velazquez']|19Lc5SfJJ5O1oaxY0...|
|            ['Mixe']|2hJjbsLCytGsnAHfd...|
|['Mamie Smith & H...|3HnrHGLE9u2MjHtdo...|
|     ['Mamie Smith']|5DlCyqLyX2AOVDTjj...|
|['Mamie Smith & H...|02FzJbHtqElixxCmr...|
|['Francisco Canaro']|02i59gYdjlhBmbbWh...|
|          ['Meetya']|06NUxS2XL3efRh0bl...|
|        ['Dorville']|07jrRR1CUUoPb1FLf...|
|['Francisco Canaro']|0ANuF7SvPeIHanGcC...|
|        ['Ka Koula']|0BEO6nHi1rmTOPiEZ...|
|        ['Justrock']|0DH1IROKoPK5XTglU...|
|  ['Takis Nikolaou']|0HVjPaxbyfFcg8Rh0...|
|['Aggeliki Karagi...|0Hn7LWy1YcKhPaA2N...|
|['Giorgos Katsaros']|0I6DjrEfd3fKFESHE...|
|['Francisco Canaro']|0KGiP9EW1xtojDHsT...|
|['Giorgos Katsaros']|0KNI2d7l3ByVHU0g2...|
|     ['Amalia Vaka']|0LYNwxHYHPW256lO2...|
+--------------------+--------------------+

Y codigo:

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t
import logging as log

session = SparkSession.builder.master("local").appName("First Python App").getOrCreate()

df = session.read.option("header", "true").csv("/home/deepak/Downloads/spotify_data_Set/data.csv")
df = df.select("artists", "id")


# df = df.withColumn("new_atr",f.translate(f.col("artists"),'"', "")).\
#         withColumn("new_atr_2" , f.translate(f.col("artists"),'[', ""))
df.show()

def format_column(st):
    print(type(st))
    print(1)
    return st.upper()


session.udf.register("format_str", format_column)

df.select("id",format_column(df.artists)).show(truncate=False)

# schema = t.StructType(
#     [
#         t.StructField("artists", t.ArrayType(t.StringType()), True),
#         t.StructField("id", t.StringType(), True)
#
#     ]
# )

df.show(truncate=False)

La UDF todavía no está completa, pero con el error, no puedo avanzar más. Cuando ejecuto el código anterior, obtengo el siguiente error:

<class 'pyspark.sql.column.Column'>
1
Traceback (most recent call last):
  File "/home/deepak/PycharmProjects/Spark/src/test.py", line 25, in <module>
    df.select("id",format_column(df.artists)).show(truncate=False)
  File "/home/deepak/PycharmProjects/Spark/src/test.py", line 18, in format_column
    return st.upper()
TypeError: 'Column' object is not callable

La sintaxis se ve bien y no puedo averiguar qué está mal con el código.

0
Deepak_Spark_Beginner 14 mar. 2021 a las 00:36

2 respuestas

La mejor respuesta

Bueno, veo que está utilizando una función de chispa prediseñada en la definición de una UDF que es aceptable, ya que dijo que está comenzando con algunos ejemplos, su error significa que no hay un método llamado superior para una columna, sin embargo, puede corregir eso error al usar esta definición:

    @f.udf("string")
    def format_column(st):
        print(type(st))
        print(1)
        return st.upper()

Por ejemplo:

enter image description here

0
Nassereddine Belghith 13 mar. 2021 a las 22:07

Obtiene este error porque está llamando a la función de Python format_column en lugar de la UDF registrada format_str.

Deberías estar usando:

from pyspark.sql import functions as F

df.select("id", F.expr("format_str(artists)")).show(truncate=False)

Además, la forma en que registró la UDF no puede usarla con la API de DataFrame, sino solo en Spark SQL. Si desea usarlo dentro de la API de DataFrame, debe definir la función de esta manera:

format_str = F.udf(format_column, StringType())

df.select("id", format_str(df.artists)).show(truncate=False)

O usando la sintaxis de anotación:

@F.udf("string")
def format_column(st):
    print(type(st))
    print(1)
    return st.upper()

df.select("id", format_column(df.artists)).show(truncate=False) 

Dicho esto, debe usar las funciones integradas de Spark (upper en este caso) a menos que tenga una necesidad específica que no se pueda hacer con las funciones de Spark.

0
blackbishop 13 mar. 2021 a las 22:00