RDD a Spark használatával: Az Apache Spark építőköve



Ez a Sparkot használó RDD-blog részletes és átfogó ismereteket nyújt Önnek az RDD-ről, amely a Spark és mennyire hasznos alapeleme.

, Maga a szó elegendő ahhoz, hogy szikrát generáljon minden Hadoop mérnök fejében. NAK NEK n memóriában feldolgozó eszköz amely villámgyors a klaszteres számítástechnikában. A MapReduce-hoz képest a memóriában lévő adatok megosztása teszi RDD-ket 10-100x gyorsabban mint a hálózati és a lemezmegosztás, és mindez az RDD-k (Rugalmas elosztott adatkészletek) miatt lehetséges. A legfontosabb pontok, amelyekre ma ebben a RDD-ben összpontosítunk a Spark segítségével, a következők:

Szüksége van RDD-kre?

Miért van szükségünk RDD-re? -RDD a Spark segítségével





A világ fejlődik és Adattudomány ben történt előrelépés miatt . Algoritmusok alapján Regresszió , , és ami tovább fut Megosztott Iteratív számítás ation divat, amely magában foglalja az adatok újrafelhasználását és megosztását több számítási egység között.

A hagyományos technikákhoz stabil, köztes és elosztott tárolóra volt szükség HDFS amely ismétlődő számításokat tartalmaz adatreplikációval és adatszerializációval, ami sokkal lassabbá tette a folyamatot. A megoldás megtalálása soha nem volt egyszerű.



Ez az, ahol RDD-k (Resilient Distributed Datasets) nagy képet mutat.

RDD Azok könnyen kezelhetőek és könnyedén létrehozhatók, mivel az adatokat adatforrásokból importálják, és RDD-kbe dobják. Ezenkívül a műveleteket feldolgozásukra alkalmazzák. Ők a osztott memóriagyűjtemény as jogosultságokkal Csak olvasható és ami a legfontosabb: azok Hibatűrő .



Ha van ilyen adatpartíció nak,-nek az RDD az elveszett , ugyanezzel az alkalmazással regenerálható átalakítás művelet az elveszett partíción leszármazás , ahelyett, hogy az összes adatot a semmiből kezelné. Ez a fajta megközelítés valós idejű forgatókönyvekben csodákra adhat okot adatvesztési helyzetekben vagy egy rendszer leállása esetén.

mit csinál a tostring a java-ban

Mik az RDD-k?

RDD vagy ( Rugalmas elosztott adatkészlet ) alapvető adatszerkezet a Sparkban. A kifejezés Rugalmas meghatározza azt a képességet, amely az adatokat automatikusan vagy adatokat generálja visszagörgetni hoz eredeti állapot amikor váratlan baleset következik be az adatvesztés valószínűségével.

Az RDD-kbe írt adatok felosztva és tárolják több futtatható csomópont . Ha egy végrehajtó csomópont nem sikerül futási időben, akkor azonnal visszakapja a következő futtatható csomópont . Ezért tekintik az RDD-ket az adatstruktúrák fejlett típusának, összehasonlítva más hagyományos adatstruktúrákkal. Az RDD-k strukturált, strukturálatlan és félig strukturált adatokat tárolhatnak.

Haladjunk tovább a RDD-vel a Spark blog segítségével, és ismerjük meg az RDD-k egyedi jellemzőit, amelyek előnyt biztosítanak más típusú adatstruktúrákkal szemben.

Az RDD jellemzői

  • Emlékül (RAM) Számítások : A memóriában lévő számítás koncepciója az adatfeldolgozást egy gyorsabb és hatékonyabb szakaszba viszi, ahol összességében teljesítmény a rendszer frissített.
  • L értékelése : A Lusta értékelés kifejezés a átalakulások az RDD-ben lévő adatokra kerülnek, de a kimenet nem jön létre. Ehelyett az alkalmazott transzformációk naplózott.
  • Kitartás : Az eredő RDD-k mindig újrafelhasználható.
  • Durva szemcsés műveletek : A felhasználó az adatkészletek összes elemére transzformációkat alkalmazhat térkép, szűrő vagy csoportosít tevékenységek.
  • Hibatűrő : Adatvesztés esetén a rendszer képes visszagurulni annak eredeti állapot a naplózott használatával átalakulások .
  • Állandóság : Meghatározott, lekért vagy létrehozott adatok nem lehetnek megváltozott miután bejelentkezett a rendszerbe. Abban az esetben, ha hozzá kell férnie a meglévő RDD-hez és módosítania kell, akkor egy új RDD-t kell létrehoznia egy készlet használatával átalakítás funkciók az aktuális vagy az azt megelőző RDD-re.
  • Felosztás : Ez a döntő egység a párhuzamosság a Sparkban RDD. Alapértelmezés szerint a létrehozott partíciók száma az adatforráson alapul. Még azt is eldöntheti, hogy hány partíciót kíván használni egyedi partíció funkciókat.

RDD létrehozása a Spark segítségével

RDD-k itt hozhatók létre háromféleképpen:

  1. Adatok olvasása innen: párhuzamos gyűjtemények
val PCRDD = spark.sparkContext.parallelize (Array ('H', 'Kedd', 'Sze', 'Cs', 'Péntek,' Szo '), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Jelentkezés átalakítás korábbi RDD-ken
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'nagyon', 'erős', 'nyelv')) val wordpair = szavak.térkép (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Adatok olvasása innen: külső tárhely vagy fájl útvonalakat, mint HDFS vagy HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

RDD-ken végrehajtott műveletek:

Az RDD-ken főleg kétféle műveletet hajtanak végre, nevezetesen:

  • Átalakulások
  • Műveletek

Átalakulások : Az tevékenységek RDD-ken alkalmazzuk a szűrő, hozzáférés és módosít a szülő RDD-ben lévő adatok a egymást követő RDD nak, nek hívják átalakítás . Az új RDD mutatót ad vissza az előző RDD-hez, biztosítva a köztük lévő függőséget.

Az átalakulások Lusta értékelések, más szavakkal, az RDD-n alkalmazott műveletek naplózásra kerülnek, de nem végrehajtott. A rendszer eredményt vagy kivételt dob, miután elindította a Akció .

Az átalakításokat két típusra oszthatjuk az alábbiak szerint:

  • Keskeny transzformációk
  • Széles átalakulások

Keskeny transzformációk Keskeny transzformációkat alkalmazunk a egyetlen partíció A szülő RDD létrehozása egy új RDD létrehozásához, mivel az RDD feldolgozásához szükséges adatok rendelkezésre állnak a partíció egyetlen partícióján szülő ASD . A keskeny transzformációk példái a következők:

  • térkép()
  • szűrő()
  • flatMap ()
  • partíció ()
  • mapPartitions ()

Széles átalakulások: A széles átalakítást alkalmazzuk több partíció hogy új RDD-t generáljon. Az RDD feldolgozásához szükséges adatok rendelkezésre állnak a szülő ASD . A széles átalakítások példái a következők:

  • reducBy ()
  • unió()

Műveletek : A műveletek utasítják az Apache Spark alkalmazását számítás és az eredményt vagy kivételt visszaküldi a meghajtó RDD-nek. Néhány akció a következőket tartalmazza:

  • gyűjt()
  • számol()
  • vesz()
  • első()

Alkalmazzuk a műveleteket gyakorlatilag RDD-ken:

IPL (indiai Premier League) egy krikettverseny, csúcsminőségű. Tehát a mai napon kezünkbe vehetjük az IPL adatkészletet, és a Spark segítségével futtathatjuk az RDD-t.

  • Először, töltsük le az IPL CSV-egyezési adatait. Letöltés után egy sorokkal és oszlopokkal rendelkező EXCEL fájlnak tűnik.

A következő lépésben felgyújtjuk a szikrát, és a helyéről betöltjük a match.csv fájlt, esetemben éncsva fájl helye „/User/edureka_566977/test/matches.csv”

Most kezdjük a átalakítás első rész:

  • térkép():

Használunk Térkép transzformáció hogy egy speciális transzformációs műveletet alkalmazzunk az RDD minden elemén. Itt létrehozunk egy RDK nevet CKfile ahol tároljukcsvfájl. Létrehozunk egy újabb RDD-t, amelynek neve: Államok tárolja a város adatait .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val államok = CKfile.map (_. split (',') (2)) államok.collect (). foreach (println)

  • szűrő():

Szűrőtranszformáció, maga a név írja le a használatát. Ezt az átalakítási műveletet használjuk a szelektív adatok kiszűrésére a megadott adatgyűjteményből. Jelentkezünk szűrő működése itt találhatja meg az év IPL-mérkőzéseinek rekordjait 2017 és tárolja a fájl RDD-ben.

java mély másolat vs sekély másolat
val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

A flatMap egy átalakítási művelet az RDD minden elemére egy új RDD létrehozására. Hasonló a Map transzformációhoz. itt alkalmazzukLapos térképnak nek köpd ki Hyderabad város meccseit és tárolja az adatokatfilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). gyűjt ()

  • partíció ():

Minden adatot, amelyet RDD-be írunk, bizonyos számú partícióra osztjuk. Ezt az átalakítást használjuk a partíciók száma az adatok valójában fel vannak osztva.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

A MapPatitions-t a Map () ésaz egyes() együtt. A mapPartitions itt található a sorok száma van a fájlunkban RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => tömb (idx.size) .iterator) .collect

  • csökkenteniBy ():

HasználunkReduceBy() tovább Kulcs-érték párok . Ezt az átalakulást használtuk a miénkencsvfájlt a lejátszó megkereséséhez a meccsek legmagasabb embere .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • unió():

A név megmagyarázza az egészet, használjuk az unió átalakítását klub két RDD együtt . Itt két RDD-t hozunk létre, nevezetesen a fil és a fil2. A fil RDD tartalmazza a 2017-es IPL-mérkőzések rekordjait, a fil2 RDD pedig a 2016-os IPL-mérkőzések rekordjait.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Kezdjük a Akció rész, ahol a tényleges teljesítményt mutatjuk:

  • gyűjt():

A Gyűjtés az a művelet, amelyet használunk a tartalom megjelenítése az RDD-ben.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • számol():

Számololyan művelet, amelyet a számláláshoz használunk rekordok száma jelen van az RDD-ben.Ittezt a műveletet arra használjuk, hogy megszámoljuk a match.csv fájlban szereplő rekordok teljes számát.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • vesz():

A Take egy művelet, amely hasonló a gyűjtéshez, de az egyetlen különbség az, hogy bármelyiket ki tudja nyomtatni szelektív sorszám a felhasználói kérés szerint. Itt a következő kódot alkalmazzuk a az első tíz vezető jelentés.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. take (10) .foreach (println)

  • első():

Az First () a gyűjtéshez () és a take () -hoz hasonló művelet.azta legfelső jelentés kinyomtatására használták a kimenetet Itt az első () műveletet használjuk a egy adott városban lejátszott mérkőzések maximális száma és Mumbai-t kapjuk kimenetként.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / mérkőzések.csv') val államok = CKfile.map (_. split (',') (2)) val Scount = állapotok.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach [println] val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Hogy még érdekesebbé tegyük a Spark használatával történő RDD-tanulás folyamatát, érdekes felhasználási esettel álltam elő.

RDD a Spark: Pokemon használati eset használatával

  • Először, Töltsünk le egy Pokemon.csv fájlt, és töltsük be a szikra-shellbe, mint a Matches.csv fájlhoz.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

A Pokemonok valójában sokféle változatban kaphatók. Keressünk néhány fajtát.

  • Séma eltávolítása a Pokemon.csv fájlból

Lehet, hogy nincs szükségünk a Séma a Pokemon.csv fájlból. Ezért eltávolítjuk.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Számának megkeresése partíciók a pokemon.csv fájlunkra oszlik.
println ('Partíciók száma =' + NoHeader.partitions.size)

  • Víz Pokemon

Megtalálása a a Víz pokemon száma

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Tűz Pokemon

Megtalálása a a Fire pokemon száma

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Kimutathatjuk a népesség egy másik típusú pokemonról a count függvény segítségével
WaterRDD.count () FireRDD.count ()

  • Mivel szeretem a játékot védekező stratégia keressük meg a pokemont maximális védekezés.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Legmagasabb_védelem:' + defenceList.max ())

  • A maximumot tudjuk védelmi erőérték de nem tudjuk, melyik pokemonról van szó. tehát keressük meg, hogy mi az pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) ([Dupla] .reverse.on (_._ 1) rendelése] MaxDefencePokemon.foreach (println)

  • Most rendezzük a pokemont legkevesebb Védelem
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Most nézzük meg a Pokemont a-val kevésbé védekező stratégia.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head) val def2 WithPokemonNév .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Rendelés [Dupla ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Tehát ezzel a Spark cikk segítségével véget értünk ennek az RDD-nek. Remélem, rávilágítottunk egy kis ismeretre az RDD-kről, azok jellemzőiről és a velük végrehajtható különféle típusú műveletekről.

Ez a cikk alapján célja, hogy felkészítse Önt a Cloudera Hadoop és a Spark Developer tanúsító vizsgára (CCA175). Részletes ismereteket szerez az Apache Sparkról és a Spark ökoszisztémáról, amely magában foglalja a Spark RDD-t, a Spark SQL-t, a Spark MLlib-et és a Spark Streaming-et. Átfogó ismereteket szerez a Scala programozási nyelvről, a HDFS-ről, a Sqoop-ról, a Flume-ról, a Spark GraphX-ről és az olyan üzenetkezelő rendszerről, mint a Kafka.

hogyan kell használni a Google felhő platformját