当前位置:网站首页>Spark based data factory: from design to implementation

Spark based data factory: from design to implementation

2021-09-15 05:51:08 Sea of heart ice

 

 

 

In the era of big data processing and artificial intelligence , Data factory (Data Factory) It is undoubtedly a very important big data processing platform . There are also mature related products on the market , such as Azure Data Factory, Not only powerful , And relying on Microsoft's cloud computing platform Azure, It provides powerful computing power for big data processing , Make big data processing more stable and efficient . Because my project is also related to big data processing , So I was thinking , Whether you can design and build a data factory yourself , In order to find solutions to some technical pain points , And introduce some interesting new features . therefore , I use my spare time , Gradually built a system based on Spark Data factory , And named it Abacuza(Abacus It's China. “ Abacus ” It means , Metaphor it is a platform specialized in data computing , Use “ Abacus ” A variant of the word , It can also be regarded as reflecting some Chinese elements ). It's based on Spark, In fact, from the perspective of the whole architecture ,Abacuza It doesn't have to be based on Spark, Just customize the plug-in of some data processing engine , therefore ,Spark It's just Abacuza A plug-in for , Of course ,Spark It is the mainstream data processing engine ,Abacuza Make it the default data processing plug-in . Abacuza It's open source. , The project address is :https://github.com/daxnet/abacuza. Hand made ? Yes , you 're right , From the front-end interface to the back-end development , From code to continuous integration , Then to deployment scripts and SDK Publication mirrored with container , They all came out step by step . There is a simple tutorial on the project home page , I'll give you a detailed introduction later . How to use Abacuza Before , Let's first understand its overall architecture and design idea . Although at present Abacuza There are still many functions not completed , But it does not affect the execution of the whole data processing process .

The overall architecture

Abacuza Like other data factory platforms , Its business process is divided into three steps : Data read in 、 Data processing 、 Results output .Abacuza The overall architecture diagram clearly reflects this business process : 

 be based on Spark Data factory (Data Factory): From design to implementation _.net

 

Data input section

The input of data is determined by the input endpoint (Input Endpoints) To define the .Abacuza Support input of multiple data types :CSV file 、JSON file 、TXT text file 、Microsoft SQL Server( Not fully realized yet ) as well as S3 Object storage path , In the future, you can continue to expand the input endpoint , To support pipeline based (Pipeline) Data processing flow of , thus , Users don't need to use C# perhaps Scala To write logical code for data processing , Just one set JSON Document carried out Pipeline Definition is fine .

Data processing part

When the data input has been defined ,Abacuza Will be based on Input Endpoint Set up , Read data into , Then it is handed over to the back-end data processing cluster (Cluster) To deal with .Abacuza Different types of clusters can be supported in the form of plug-ins , As mentioned above ,Apache Spark yes Abacuza A data processing cluster supported by , You can see in the above architecture diagram ,Abacuza Cluster Service Manage these clusters , Task scheduler (Job Scheduler) Will pass Abacuza Cluster Service Assign the data processing task to the specified type of cluster for processing . about Spark for , The specific data processing logic is implemented by the user's own code .Spark Native support Scala, You can also use PySpark,Abacuza Use Microsoft .NET for Spark Project implementation starts from .NET To Spark The binding of (Binding), Users can use C# To write the Spark Data processing logic , I'll introduce it in detail in the later exercise . Then with Scala comparison , adopt .NET for Spark Use C# Will the data processing program written have performance problems ? Um. , There will be some performance problems , Please look at the chart below. ( picture source : Microsoft .NET for Spark Official website ): 

 be based on Spark Data factory (Data Factory): From design to implementation _.net_02

In this Benchmark in , Process the same amount of data ,Scala Used 375 second ,.NET It took 406 second ,Python Use 433 second , Although with Scala There is a gap between , But compared to Python Better . But don't worry , If in your application scenario , Performance comes first , that Abacuza Of Job Runner The mechanism allows you to use Scala Write data processing program , And then upload it to Spark The cluster to perform ( That is, you don't need to rely on .NET and C#).

Data output section

Similar to the data input section , The data output mode after processing is determined by the output endpoint (Output Endpoints) To define the .Abacuza It also supports a variety of data output modes : Print the results to the log 、 Output the results to the external file system and to the file system where the current project is located S3 Object storage path . Whether it's the data input part or the output part , These endpoints can be customized , And through ASP.NET Core Plug in system and docker-compose perhaps Kubernetes Of volume/Block Storage To achieve dynamic loading .

Related concepts and operation mechanism

Abacuza There are the following concepts :

  1. colony (Cluster): A cluster is a complete big data processing platform , such as Apache Spark

  2. Cluster type (Cluster Type): Define the type of cluster , for example , Running on the localhost Of Spark Clustering and running in the cloud Spark All clusters are Spark colony , Then their cluster type is spark.

  3. Cluster connection (Cluster Connection): Defined Abacuza How data factories access clusters , Similar to the connection string of database system

  4. Task executor (Job Runner): Defines how data processing tasks should be submitted to the cluster for execution . It can contain specific data processing business logic

  5. Input endpoint (Input Endpoint): Defines the raw data ( Data that needs to be processed ) The source of the

  6. Output endpoint (Output Endpoint): Defines the output mode of data after processing

  7. project (Project): A logical definition of a type of data processing task , It includes multiple input endpoints 、 One output endpoint and multiple data processing versions (Revision) Information about , It also defines which task executor should be used to perform data processing tasks

  8. Data processing version (Revision): It belongs to a specific project , Represents the data processing results of different batches

When a user is ready to use Abacuza When completing a task of big data processing , Generally, the following steps will be followed :

  1. Use the user name / password ( For the time being, only user name and password login are supported ) Sign in Abacuza Management interface of

  2. Based on an already installed colony ( such as Apache Spark), Configure its Cluster type and Cluster connection , Used to define Abacuza Communication mode with the cluster ( Clusters and cluster connections define where data should be processed (where))

  3. Definition Task executor , In the task executor , Set the to run data processing tasks Cluster type , When the data processing task is submitted ,Abacuza Cluster Service Will be based on the selected Cluster type , Choose one according to a certain algorithm colony Data processing . The logic of data processing is also defined in the task executor ,( such as , from Scala、C# perhaps Python Written applications , It can be uploaded to spark Run on a cluster of type ). In short , The task executor defines how the data should be processed (how

  4. Create a new project , In this project , adopt Input endpoint To set the data source to be processed , adopt Output endpoint To set the storage location of the processed data , And set the... Used in the project Task executor . after , The user clicks Submit Button , Submit the data to the cluster for processing . After processing , stay Data processing version View the results in the list

Technology selection

Abacuza Adopt microservice architecture style , Each individual microservice runs in a container , At present, the experimental stage adopts docker-compose Container choreography , Will join in the future Kubernetes Support . Will now Abacuza The framework and related technologies used are briefly listed :

  1. Spark Execute program selection Microsoft .NET for Spark, On the one hand, I am right .NET I'm familiar with the technology stack , On the other hand ,.NET for Spark Has a good stream data processing SDK API, And can easily integrate ML.NET Business scenarios for machine learning

  2. All microservices are run in .NET 5 Under the ASP.NET Core Web API Realization , The back-end database of each microservice adopts MongoDB

  3. For task scheduling Abacuza Job Service Microservice use Quartz.NET Realize regular task scheduling , Used to submit data processing tasks and update task status . The back-end also uses PostgreSQL database

  4. Between the storage layer and the service layer Redis Do data caching , Reduce MongoDB Query load

  5. Default supported Spark Cluster use Apache Livy To provide it with RESTful API Interface

  6. The file object is stored in MinIO S3

  7. API The gateway adopts Ocelot frame

  8. Transient fault handling of microservices :Polly frame

  9. Identity authentication and authorization adopt ASP.NET Core Identity Integrated IdentityServer4 Solution

  10. Reverse proxy :nginx

  11. Front page :Angular 12、Angular powered Bootstrap、Bootstrap、AdminLTE

Weak complement : My front-end technology is not as skilled as my back-end technology , So there will be a lot of problems on the front page , The style is not so professional and beautiful , Front end experts, please ignore these details .;) Abacuza The plug-in design is adopted , Users can extend the following components as needed :

 be based on Spark Data factory (Data Factory): From design to implementation _ Data processing _03

  • Realize your own data processing colony as well as Cluster connection : So you don't have to stick to the use of Apache Spark

  • Realize one's own Input endpoint and Output endpoint : So you can customize the input and output parts of the data

  • Realize one's own Task executor : So you can choose not to use based on .NET for Spark Solutions for , You can use it yourself Scala perhaps Python To write data processing programs

stay Abacuza In the management interface , You can easily see the plug-ins that have been loaded in the current system :   therefore ,Abacuza The data factory should be able to meet most business scenarios of big data processing . The whole platform itself is based on .NET Development , And through NuGet Distributed Abacuza SDK, Therefore, extending these components is very simple , You can see the detailed introduction in the following walkthrough .

Deployment Topology

Here are Abacuza Deployment topology : 

 be based on Spark Data factory (Data Factory): From design to implementation _spark_04

 

The whole deployment structure is relatively simple :5 The first major microservices are based on Ocelot Realized API Gateway In charge of agency ,Ocelot Can integrate IdentityServer4, stay Gateway Complete user authentication at the level of (Gateway The authorization at the level has not been realized yet ). be based on IdentityServer4 Realized Identity Service Not deployed in API Gateway Backend , Because in this architecture , Its authentication and authorization strategy is different from that of general microservices .API Gateway、Identity Service And based on Angular Realized web app All by nginx Reverse proxy , To the outside world ( Client browser ) Provide a unified access endpoint . All back-end services run on docker in , And can be deployed in Kubernetes in .

rehearse : stay Abacuza Up operation Word Count Program

Word Count yes Spark The first case procedure officially recommended , Its task is to count the number of occurrences of each word in the input file ..NET for Spark There is the same Word Count Case study . Here it is , I still use Word Count Case study , How to Abacuza Run the data handler on .

precondition

You need a Windows、MacOS perhaps Linux The computer , It has .NET 5 SDK、docker as well as docker-compose( If it is Windows perhaps MacOS, Install docker The desktop version of ), At the same time, make sure that git Client command line .

establish Word Count Data handler

use first dotnet Create a console application from the command line , Then add the relevant reference :

 

$ dotnet new console -f net5.0 -n WordCountApp

$ cd WordCountApp

$ dotnet add package Microsoft.Spark --version 1.0.0

$ dotnet add package Abacuza.JobRunners.Spark.SDK --prerelease

Then add a new one to the project class file , Achieve one WordCountRunner class :

 

using Abacuza.JobRunners.Spark.SDK;

using Microsoft.Spark.Sql;

 

namespace WordCountApp

{

   public class WordCountRunner : SparkRunnerBase

   {

      public WordCountRunner(string[] args) : base(args)

      {

      }

 

      protected override DataFrame RunInternal(SparkSession sparkSession, DataFrame dataFrame)

            => dataFrame

               .Select(Functions.Split(Functions.Col("value"), " ").Alias("words"))

               .Select(Functions.Explode(Functions.Col("words"))

               .Alias("word"))

               .GroupBy("word")

               .Count()

               .OrderBy(Functions.Col("count").Desc());

   }

}

I'm going to modify Program.cs file , stay Main Call in function WordCountRunner:

 

static void Main(string[] args)

{

   new WordCountRunner(args).Run();

}

then , On the command line ,WordCountApp.csproj In the directory , Use the following command to generate a command based on Linux x64 Compiled output of the platform :

 

$ dotnet publish -c Release -f net5.0 -r linux-x64 -o published

Last , Use ZIP Tools , take published All files under ( barring published The catalog itself ) All packed into one ZIP Compressed package . for example , stay Linux Next , You can use the following command to published All files in the directory are typed into one ZIP package :

 

$ zip -rj WordCountApp.zip published/.

Word Count The program has been written , Then we start Abacuza, And run this... In it WordCountApp.

function Word Count Program

You can use git clone https://github.com/daxnet/abacuza.git command , take Abacuza Download the source code to the local , And then in Abacuza Under the root directory of , Use the following command to compile :

1

$ docker-compose -f docker-compose.build.yaml build

After compilation , Edit... With a text editor template.env file , Set the of this machine in it IP Address ( Out of commission localhost perhaps 127.0.0.1, Because in a container environment ,localhost and 127.0.0.1 Represents the current container itself , Instead of the host running the container ), The port number can be defaulted :

 be based on Spark Data factory (Data Factory): From design to implementation _docker_05

then , Use the following command to start Abacuza:

1

$ docker-compose --env-file template.env up

After successful startup , have access to docker ps Command to view the running container : 

 be based on Spark Data factory (Data Factory): From design to implementation _.net_06

 

Access with browser http://< Yours IP Address >:9320, You can open Abacuza Login screen , enter one user name super, password P@ssw0rd To complete the login , Get into Dashboard( at present Dashboard Has yet to be completed ). Then in the menu on the left , Click on Cluster Connections, Then click... In the upper right corner Add Connection Button :

 be based on Spark Data factory (Data Factory): From design to implementation _spark_07

In the pop-up dialog , Input Cluster connection The name and description of , Cluster type choice spark, In the settings bar , Input is used to connect Spark Clustered JSON Configuration information . Because we started locally Spark In the container , Use this machine directly IP address , If your Spark The cluster is deployed on other machines , You can also use other IP Address . After configuring this information , Click on Save Button save :

 be based on Spark Data factory (Data Factory): From design to implementation _ Data processing _08

The next step is to create Task executor . stay Abacuza Management interface , Click on the left Job Runners menu , Then click... In the upper right corner Add Job Runner Button : 

 be based on Spark Data factory (Data Factory): From design to implementation _spark_09

 

In the pop-up dialog , Input Task executor Name and description of , Cluster type choice spark, Then when the task executor starts executing , Will choose any type as spark Cluster to process data .  be based on Spark Data factory (Data Factory): From design to implementation _spark_10

After filling in these basic information , Click on Save Button , The details page of the task executor will be displayed , For further settings . stay Payload template in , Enter the following JSON Text :

1

2

3

4

5

6

7

8

9

10

11

{

  "file": "${jr:binaries:microsoft-spark-3-0_2.12-1.0.0.jar}",

  "className": "org.apache.spark.deploy.dotnet.DotnetRunner",

  "args": [

    "${jr:binaries:WordCountApp.zip}",

    "WordCountApp",

    "${proj:input-defs}",

    "${proj:output-defs}",

    "${proj:context}"

  ]

}

Briefly introduce each parameter :

  • file: It's specified in Spark Where the program to be run on the cluster is located JAR package , Use Microsoft's... Directly here Spark JAR

  • className: Specifies the program to run in JAR Name in package , Fixed use here org.apache.spark.deploy.dotnet.DotnetRunner

  • ${jr:binaries:WordCountApp.zip} By className designated DotnetRunner It will call the binary file in the current task executor WordCountApp.zip To perform data processing tasks

  • WordCountApp by ZIP The name of the executable program in the package

  • ${proj:input-defs} Indicates that the input file and its configuration will reference the currently executing data processing project Medium Input endpoint The definition of

  • ${proj:output-defs} Indicates that the output file and its configuration will reference the currently executing data processing project Medium Output endpoint The definition of

  • ${proj:context} Express Spark Relevant information will be read from the current project and passed to the task executor

In the above configuration , Two... Are quoted binary file :microsoft-spark-3-0_2.12-1.0.0.jar and WordCountApp.zip. therefore , We need to upload these two files to the task executor . Still in the edit interface of the task executor , stay Binaries In the list , Click the plus button , Attach these two files to the task executor . Be careful :microsoft-spark-3-0_2.12-1.0.0.jar The file is located in the... Used above published Directory , and WordCountApp.zip Is generated in the above ZIP Compressed package .

 be based on Spark Data factory (Data Factory): From design to implementation _docker_11

When the configuration is complete , Click on Save & Close Button , Save task executor . Next , Create a data processing project , In the menu on the left , Click on Projects, Then click... In the upper right corner Add Project Button : 

 be based on Spark Data factory (Data Factory): From design to implementation _.net_12

 

In the pop-up Add Project In the dialog box , Enter the name of the project 、 describe , Then select the input endpoint and the output endpoint , And the task executor responsible for processing the project data :

 be based on Spark Data factory (Data Factory): From design to implementation _docker_13

 

Here it is , We set the input endpoint as a text file (Text Files), The output endpoint is set to console (Console), That is, output directly to the log . These configurations can also be changed in subsequent project editing pages . A project can contain multiple input endpoints , But there can only be one output endpoint . Click on Save Button save settings , here Abacuza The project details page opens , stay INPUT Under tabs , Add a text file that needs to count the number of word occurrences :  be based on Spark Data factory (Data Factory): From design to implementation _ data _14

stay OUTPUT Under tabs , Confirm that the output endpoint is set to Console:

 be based on Spark Data factory (Data Factory): From design to implementation _.net_15

Then click... In the upper right corner or lower right corner Submit Button , Submit data processing tasks , here , The tab will automatically switch to REVISIONS, And update Job The state of :  be based on Spark Data factory (Data Factory): From design to implementation _docker_16

Wait a moment , If the data processing is successful ,Job Status From RUNNING Turn into COMPLETED: be based on Spark Data factory (Data Factory): From design to implementation _ data _17

 

Click on Actions The file button in the column , You can view the log output of data processing :

 be based on Spark Data factory (Data Factory): From design to implementation _spark_18

As you can see from the log file ,Abacuza According to the data processing program we wrote , Count out the input file input.txt The number of occurrences of each word in . The same information can be seen through the container's log output : 

 be based on Spark Data factory (Data Factory): From design to implementation _spark_19

summary

This paper introduces the data factory built by hand (Data Factory) Design and implementation , A case is developed to demonstrate the whole process of data processing completed by the data factory . After that, there are many functions that can be improved :Dashboard、 Optimization of authentication authorization 、 Management of users and groups 、 The third party IdP Integration of 、Pipeline And so on

 
 
 
 

版权声明
本文为[Sea of heart ice]所创,转载请带上原文链接,感谢
https://chowdera.com/2021/09/20210909120722760E.html

随机推荐