Halmozott állapotalapú átalakulás az Apache Spark Streaming szolgáltatásban



Ez a blogbejegyzés a Spark Streaming állapotát érintő átalakításokat tárgyalja. Tudjon meg mindent a Hadoop Spark karrier összesített nyomon követéséről és fejlesztéséről.

Közreműködött: Prithviraj Bose

Előző blogomban az Apache Spark Streaming átfogó koncepciójának segítségével átfogó átalakításokat tárgyaltam. Elolvashatja itt .





Ebben a bejegyzésben az Apache Spark Streaming kumulatív állapotjelző műveleteit fogom megvitatni. Ha még nem ismeri a Spark Streaming szolgáltatást, akkor erősen ajánlom, hogy olvassa el az előző blogomat, hogy megértse, hogyan működik az ablakolás.

Az állapotalapú átalakítás típusai a szikra streamingben (folytatás…)

> Halmozott követés

Használtuk a reducByKeyAndWindow (…) API a kulcsok állapotának nyomon követésére, azonban az ablak korlátozásokat vet fel bizonyos felhasználási esetekre. Mi van, ha a kulcsok állapotát végig akarjuk halmozni, ahelyett, hogy időablakra korlátoznánk? Ebben az esetben használnunk kellene updateStateByKey (…) TŰZ.



Ezt az API-t a Spark 1.3.0-ban vezették be, és nagyon népszerű volt. Ennek az API-nak azonban van némi teljesítménye, teljesítménye romlik, ahogy az állapotok idővel nőnek. Írtam egy mintát az API használatának bemutatására. Megtalálja a kódot itt .

A Spark 1.6.0 új API-t vezetett be mapWithState (…) amely megoldja a teljesítmény által okozott általános költségeket updateStateByKey (…) . Ebben a blogban ezt a konkrét API-t fogom megvitatni egy általam írt mintaprogram segítségével. Megtalálja a kódot itt .

Mielőtt belevetném magam egy kód-áttekintésbe, kíméljünk néhány szót az ellenőrzőpontozásról. Bármilyen állapotátalakítás esetén az ellenőrzőpontozás kötelező. Az ellenőrzőpont a kulcsok állapotának helyreállítására szolgál, ha az illesztőprogram nem működik. Amikor az illesztőprogram újraindul, a kulcsok állapota visszaáll az ellenőrző fájlokból. Az ellenőrzőpontok helyei általában HDFS vagy Amazon S3 vagy bármilyen megbízható tároló. A kód tesztelése közben a helyi fájlrendszerben is tárolható.



hogyan lehet inicializálni az osztályt a pythonban

A mintaprogramban a socket szövegfolyamot hallgatjuk a host = localhost és a port = 9999 címen. Ez a bejövő adatfolyamot kódolja (szavak, előfordulások száma) és követi a szavak számát az 1.6.0 API segítségével mapWithState (…) . Ezenkívül a frissítés nélküli kulcsokat a StateSpec.timeout API. HDFS-ben ellenőrzünk, és az ellenőrzési gyakoriság 20 másodpercenként történik.

Először hozzunk létre egy Spark Streaming munkamenetet,

Spark-streaming-session

Létrehozunk egy ellenőrzőpontDir a HDFS-ben, majd hívja meg az objektum metódust getOrCreate (…) . Az getOrCreate Az API ellenőrzi a ellenőrzőpontDir hogy megnézze, vannak-e korábbi állapotok visszaállításra, ha léteznek ilyenek, akkor újra létrehozza a Spark Streaming munkamenetet, és frissíti a kulcsok állapotát a fájlokban tárolt adatokból, mielőtt új adatokkal folytatja. Ellenkező esetben új Spark Streaming munkamenetet hoz létre.

Az getOrCreate veszi az ellenőrzőpont könyvtár nevét és egy függvényt (amelyet megneveztünk createFunc ) amelynek aláírása legyen () => StreamingContext .

A php a karakterláncot tömbgé változtatja

Vizsgáljuk meg a benne lévő kódot createFunc .

2. sor: Streaming kontextust hozunk létre a feladat nevével a „TestMapWithStateJob” értékre és kötegelt intervallum = 5 másodperc.

5. sor: Állítsa be az ellenőrzőpont könyvtárat.

8. sor: Állítsa be az állapot specifikációt az osztály használatával org.apache.streaming.StateSpec tárgy. Először beállítottuk azt a funkciót, amely nyomon fogja követni az állapotot, majd meghatározzuk az így kapott DStream-ek partícióinak számát, amelyeket a későbbi transzformációk során generálni kell. Végül beállítottuk az időkorlátot (30 másodpercre), ahol ha egy kulcs frissítése nem érkezik 30 másodpercen belül, akkor a kulcs állapota törlődik.

12. sor #: Állítsa be a foglalatfolyamot, simítsa ki a bejövő kötegelt adatokat, hozzon létre egy kulcs-érték párost, hívjon mapWithState , állítsa az ellenőrzési pont intervallumát 20 másodpercre, és végül nyomtassa ki az eredményeket.

A Spark keretrendszer th e createFunc minden kulcshoz az előző értékkel és az aktuális állapottal. Kiszámoljuk az összeget, és frissítjük az állapotot a kumulatív összeggel, végül visszaadjuk a kulcs összegét.

saltstack vs báb vs chef

Github források -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Van egy kérdésünk? Kérjük, említse meg a megjegyzések részben, és mi kapcsolatba lépünk Önnel.

Kapcsolódó hozzászólások:

Kezdő lépések az Apache Spark & ​​Scala szolgáltatással

Állapotbeli átalakulások a szikrázó közvetítéssel