Közreműködött: Prithviraj Bose
Előző blogomban az Apache Spark Streaming átfogó koncepciójának segítségével átfogó átalakításokat tárgyaltam. Elolvashatja itt .
Ebben a bejegyzésben az Apache Spark Streaming kumulatív állapotjelző műveleteit fogom megvitatni. Ha még nem ismeri a Spark Streaming szolgáltatást, akkor erősen ajánlom, hogy olvassa el az előző blogomat, hogy megértse, hogyan működik az ablakolás.
Az állapotalapú átalakítás típusai a szikra streamingben (folytatás…)
> Halmozott követés
Használtuk a reducByKeyAndWindow (…) API a kulcsok állapotának nyomon követésére, azonban az ablak korlátozásokat vet fel bizonyos felhasználási esetekre. Mi van, ha a kulcsok állapotát végig akarjuk halmozni, ahelyett, hogy időablakra korlátoznánk? Ebben az esetben használnunk kellene updateStateByKey (…) TŰZ.
Ezt az API-t a Spark 1.3.0-ban vezették be, és nagyon népszerű volt. Ennek az API-nak azonban van némi teljesítménye, teljesítménye romlik, ahogy az állapotok idővel nőnek. Írtam egy mintát az API használatának bemutatására. Megtalálja a kódot itt .
A Spark 1.6.0 új API-t vezetett be mapWithState (…) amely megoldja a teljesítmény által okozott általános költségeket updateStateByKey (…) . Ebben a blogban ezt a konkrét API-t fogom megvitatni egy általam írt mintaprogram segítségével. Megtalálja a kódot itt .
Mielőtt belevetném magam egy kód-áttekintésbe, kíméljünk néhány szót az ellenőrzőpontozásról. Bármilyen állapotátalakítás esetén az ellenőrzőpontozás kötelező. Az ellenőrzőpont a kulcsok állapotának helyreállítására szolgál, ha az illesztőprogram nem működik. Amikor az illesztőprogram újraindul, a kulcsok állapota visszaáll az ellenőrző fájlokból. Az ellenőrzőpontok helyei általában HDFS vagy Amazon S3 vagy bármilyen megbízható tároló. A kód tesztelése közben a helyi fájlrendszerben is tárolható.
hogyan lehet inicializálni az osztályt a pythonban
A mintaprogramban a socket szövegfolyamot hallgatjuk a host = localhost és a port = 9999 címen. Ez a bejövő adatfolyamot kódolja (szavak, előfordulások száma) és követi a szavak számát az 1.6.0 API segítségével mapWithState (…) . Ezenkívül a frissítés nélküli kulcsokat a StateSpec.timeout API. HDFS-ben ellenőrzünk, és az ellenőrzési gyakoriság 20 másodpercenként történik.
Először hozzunk létre egy Spark Streaming munkamenetet,
Létrehozunk egy ellenőrzőpontDir a HDFS-ben, majd hívja meg az objektum metódust getOrCreate (…) . Az getOrCreate Az API ellenőrzi a ellenőrzőpontDir hogy megnézze, vannak-e korábbi állapotok visszaállításra, ha léteznek ilyenek, akkor újra létrehozza a Spark Streaming munkamenetet, és frissíti a kulcsok állapotát a fájlokban tárolt adatokból, mielőtt új adatokkal folytatja. Ellenkező esetben új Spark Streaming munkamenetet hoz létre.
Az getOrCreate veszi az ellenőrzőpont könyvtár nevét és egy függvényt (amelyet megneveztünk createFunc ) amelynek aláírása legyen () => StreamingContext .
A php a karakterláncot tömbgé változtatja
Vizsgáljuk meg a benne lévő kódot createFunc .
2. sor: Streaming kontextust hozunk létre a feladat nevével a „TestMapWithStateJob” értékre és kötegelt intervallum = 5 másodperc.
5. sor: Állítsa be az ellenőrzőpont könyvtárat.
8. sor: Állítsa be az állapot specifikációt az osztály használatával org.apache.streaming.StateSpec tárgy. Először beállítottuk azt a funkciót, amely nyomon fogja követni az állapotot, majd meghatározzuk az így kapott DStream-ek partícióinak számát, amelyeket a későbbi transzformációk során generálni kell. Végül beállítottuk az időkorlátot (30 másodpercre), ahol ha egy kulcs frissítése nem érkezik 30 másodpercen belül, akkor a kulcs állapota törlődik.
12. sor #: Állítsa be a foglalatfolyamot, simítsa ki a bejövő kötegelt adatokat, hozzon létre egy kulcs-érték párost, hívjon mapWithState , állítsa az ellenőrzési pont intervallumát 20 másodpercre, és végül nyomtassa ki az eredményeket.
A Spark keretrendszer th e createFunc minden kulcshoz az előző értékkel és az aktuális állapottal. Kiszámoljuk az összeget, és frissítjük az állapotot a kumulatív összeggel, végül visszaadjuk a kulcs összegét.
saltstack vs báb vs chef
Github források -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala
Van egy kérdésünk? Kérjük, említse meg a megjegyzések részben, és mi kapcsolatba lépünk Önnel.
Kapcsolódó hozzászólások:
Kezdő lépések az Apache Spark & Scala szolgáltatással