Tengo un conjunto de datos como el siguiente

INo,Sc,Desc,Quan,IDate,UPice,CID,Country
1,2,"H,H",6,12-1-2010 8:26,2.55,c1,United Kingdom
2,3,"WE,RN",6,12-1-2010 8:26,3.39,c2,United Kingdom
3,3,CREAM ,8,12-1-2010 8:26,2.75,c3,United Kingdom
4,4,KLE,6,12-1-2010 8:26,3.39,c4,United Kingdom
5,5,"DI,AMR,ROR, ",2,1-7-2011 13:55,11.02,c5,United Kingdom
6,6,SU"asasa,1,12-9-2010 16:26,11.02,c6,United Kingdom

He creado el marco de datos ejecutando el siguiente código:

case class Rating(CID: String, SC: String, rating: Double)
def parseRating(str: String): Rating = {
val fields = str.split(",")
Rating(fields(6), fields(1), 1.0D)
}
val ratings = spark.read.textFile("C:/Users/test/Desktop/test123.txt").map(parseRating).toDF().dropDuplicates("CID", "SC")
ratings.collect().map(t => println(t))
ratings.show()

Estoy obteniendo resultados como a continuación

+---------------+---+------+
|            CID| SC|rating|
+---------------+---+------+
|12-9-2010 16:26|  6|   1.0|
|             c3|  3|   1.0|
|            CID| Sc|   1.0|
|             c4|  4|   1.0|
|           3.39|  3|   1.0|
|              2|  5|   1.0|
|           2.55|  2|   1.0|
+---------------+---+------+

Aquí vine por field=str.split(","). Los datos de entrada son 6,6, "SET, CO, SU", 1,12-9-2010 16: 26,11.02, c6, Reino Unido

Pero mi salida esperada es

+---------------+---+------+
|            CID| SC|rating|
+---------------+---+------+
|             c6|  6|   1.0|
|             c3|  3|   1.0|
|             c2|  3|   1.0|
|             c4|  4|   1.0|
|             c5|  5|   1.0|
|             c1|  2|   1.0|
+---------------+---+------+ 

Por favor, ayúdame en esto.

-1
Sai 14 nov. 2017 a las 13:24

2 respuestas

La mejor respuesta

Si está utilizando Spark 2, puede hacerlo como se muestra a continuación:

val data = spark.read.option("header", true).csv("test123.txt")
data.select("CID", "Sc").withColumn("rating", lit(1.0)).show
+---+---+------+
|CID| Sc|rating|
+---+---+------+
| c1|  2|   1.0|
| c2|  3|   1.0|
| c3|  3|   1.0|
| c4|  4|   1.0|
| c5|  5|   1.0|
| c6|  6|   1.0|
+---+---+------+
1
pratyush sharma 14 nov. 2017 a las 13:28

No puede dividir directamente cada registro con ",". ya que parte del valor de la columna tiene "," como valor, no como separador. p. ej., "H, H" en el primer registro,

Necesitará una expresión regular para hacer la división como se muestra a continuación,

def parseRating(str: String): Rating = {
val fields = str.split("\\,(?=([^\"]*\"[^\"]*\")*[^\"]*$)")
Rating(fields(6), fields(1), 1.0D)
}

Además, su primera línea es el encabezado. Léalo con el encabezado verdadero mientras carga el archivo de datos o filtre la primera línea una vez que esté cargado.

0
Amit Kumar 14 nov. 2017 a las 11:23