Image for post
Image for post
Douglas Silva Leal | Solutions Analyst | everis Brasil

Usando thread com pyspark

Quando falamos em utilizar thread em um script pyspark, pode parecer confuso. Levei tempo para perceber que uma das respostas para meu problema de tuning estava no quanto tempo gastava para gravar dados no HDFS. Então, pensei que seria possível aplicar um tipo de paralelismo onde, dado meu caso de uso, fosse possível gerar resultados simultaneamente, independentemente do modo como o programa se distribui.

Como o próprio site da apache diz a respeito do spark:

“Spark é uma estrutura rápida e poderosa que fornece uma API para executar o processamento distribuído em massa em conjuntos de dados resilientes”.

Mas, por que alguém tentaria utilizar threads em um processo distribuído?
Imagine linhas de código, chamadas de funções para ser mais específico. Criamos uma estrutura onde uma chamada é feita linha após linha, aguardando o resultado da função anterior para iniciar. Agora, imagine que você possui um arquivo e precisa realizar dois tipos de processamento, bem conhecidos por sinal: wordCount e charCount. Pode parecer algo simples mas, aplicando esse exemplo em um cenário de projeto maior e mais complexo, a utilização de threads se torna algo muito trivial, principalmente quando falamos de ações que não possuem dependências entre si, como a gravação de diversos parquet’s distintos oriundos de um único dataframe, ou a criação de um novo fluxo, além do principal, aplicando regras e tratativas em cima de um rdd até sua etapa final de gravação no HDFS, deixando o fluxo ser executado normalmente. O processo está distribuído, porém, não executando paralelamente (a nível de linhas de código), quando aplicamos threads dependendo do seu caso de uso, o resultado pode ser animal.

Olhamos para mais uma explicação

Thread, no seu conceito, é:

“Um pequeno programa que trabalha como um subsistema, sendo uma forma de um processo se autodividir em duas ou mais tarefas. É o termo em inglês para Linha ou Encadeamento de Execução. Essas tarefas múltiplas podem ser executadas simultaneamente para rodar mais rápido do que um programa em um único bloco ou praticamente juntas, mas que são tão rápidas que parecem estar trabalhando em conjunto ao mesmo tempo.”
(Fonte:
O que é Thread)

Em outras palavras, thread faz com que suas tarefas sejam paralelizadas executando-as simultaneamente, sendo assim, mais rápidas que uma tarefa executada linha após linha. Imagine que, dado uma regra funcional você possa executar tarefas em paralelo, como filtrar e separar dados ruins e bons de um dataframe, além disso, realizar determinadas atividades tais como .withColumn() , .map() e/ou .write(). São arquivos gerados a partir de uma única variável, que não possuem nenhum grau de dependência.

O Spark, por si só, já trabalha de maneira distribuída, porém, quando falamos em linhas de código, o processo não é inteligente o suficiente para iniciar a primeira função e em paralelo as seguintes funções para que, sem interferências, sejam processadas ao mesmo tempo. Mas, é inteligente o suficiente para alocar recursos em tarefas menores e, assim, liberar espaço logo que finalizar um processo.Por isso, quando utilizamos threads, essas funções executam em paralelo utilizando os recursos dinamicamente. (Principalmente quando alternamos o scheduler.mode de FIFO para FAIR.)

Como aplicar threads em um processo pyspark?

O script abaixo traz um exemplo simples de como aplicar threads em um escopo utilizando pyspark, carregando um arquivo e processando duas funções em paralelo.

[https://gist.github.com/lealdouglas/9067bed8bb0cf1deda647892299ad4b8]

· No primeiro momento, importamos o pacote Threading do python.

import threading

Um ponto valioso na utilização de threads é quando combinamos sua execução logo após de um .cache() .Quando falamos de ação/transformação em spark lembramos que, dado um método, as possibilidades da ação chamar as mesmas transformações inúmeras vezes são altas quando não desenhado da melhor forma. Pior, então, em um processo paralelo, em que acessamos · o mesmo registro simultaneamente. Colocando sua variável na memória, as tarefas usarão os dados paralelamente sem precisar carregá-los de novo.

file = sc.textFile(“/inFiles/shakespeare.txt”).flatMap(lambda line: line.split(“ “)).cache()

· Instanciamos variáveis threads com funções passando a variável (no nosso caso um rdd, como exemplo) como parâmetro de entrada.

# Instanciando variáveis threads com funções na memória.
T1 = threading.Thread(target=wordCount, args=(file,))
T2 = threading.Thread(target=charCount, args=(file,))

· Iniciamos a execução com .start() e paramos com .join() , caso contrário, podemos deixá-lo executando paralelamente até finalizar, junto ao fluxo principal.

# Iniciando execução das threads.
T1.start()
T2.start()

# Pausando execução das threads para seguir o fluxo principal.
T1.join()
T2.join()

Com o script pronto, é possível visualizar sua execução e as quebras que os processos paralelos realizam dentro do fluxo.

No exemplo abaixo, estou realizando cinco tarefas em paralelo, onde duas estão gravando dados no HDFS e uma está realizando um .count():

Image for post
Image for post

Considerações finais:

A primeira aparição de thread em meu script spark foi quando precisamos aplicar tuning, além da documentação, e diminuir o tempo de execução. Algumas considerações foram levantadas após esse trabalho:

· É necessário conhecer bem seu fluxo para avaliar, de fato, a utilização de threads, o que pode ou não tornar seu processo mais rápido. Em cenários pequenos de teste, o tempo de execução de um fluxo em paralelo era o mesmo que rodando linha após linha;

Image for post
Image for post

· Caso não esteja utilizando classes e objetos, ou uma variável global, não será possível utilizar o resultado dentro e fora de uma thread, principalmente se ela está sendo executada em paralelo junto a seu fluxo principal, isso porque a variável pode estar sendo criada no mesmo momento do uso.

Se possível, utilize .cache()! Threads que utilizam uma variável já carregada na memória podem reduzir ainda mais seu tempo de processamento, isso · porque o tempo de ação/transformação é menor para um dataframe estando em memória.

· Crie Pool’s! Utilizando job scheduling do spark, é possível visualizar suas tarefas rodando em paralelo no applicationMaster alternando a maneira como o spark trabalha, spark.scheduler.mode=FAIR(Default:FIFO) no spark-submit ou dentro do script com o SparkContext, além de outras possibilidades,como mostra a imagem a seguir:

Image for post
Image for post

Conclusão

Por fim, após um ano trabalhando com spark, pude perceber o quão poderoso ele é. E, também, que ainda é algo novo na mão de muitos e que poucos, de fato, compreendem como ele funciona. Com muitas horas de pesquisa e testes árduos, senti que as informações podem ser explicadas de maneira simples e sucintas demais para que qualquer leigo no assunto possa entender e aplicar (a documentação não vai te ensinar como fazer um projeto do zero). O mundo de big data nunca esteve tão fácil de ser colocado em prática e, após esse tempo de experiência, fui motivado a compartilhar temas no qual foram difíceis de difundir no começo dessa caminhada.

Por isso, daqui para frente, espero compartilhar com vocês uma série de artigos para trabalhar detalhes de maneira clara, simples e completa, e que seja uma via de mão dupla para que, juntos, possamos compartilhar ideias e experiências. Por isso, envie seus comentários.

Referências:

https://www.supergloo.com/fieldnotes/spark-fair-scheduler-
https://spark.apache.org/docs/2.2.0/job-scheduling.html
https://raw.githubusercontent.com/bbejeck/hadoop-algorithms/master/src/shakespeare.txt

Written by

Exponential intelligence for exponential companies

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store