Google Cloud Platform tutorial
Start Free Trial
GCP provides $300 for free trial, I will show how to apply for it.
Go to cloud.google.com
and click Get started for free
:
Set basic info:
Add payment info:
Add a visa credit card:
After a few seconds, the account is ready to use:
create a new project
NEW PROJECT
to create a new project:
Input the name:
create a service account
A service account is convenient for authentication:
Input the name and grant Owner
Role to the service account:
Create a JSON
key for the service account, and download the key file:
setup auth for gcloud
Set the env for the credentials:
export GOOGLE_APPLICATION_CREDENTIALS=/Users/larry/Software/google-cloud-sdk/pkslow-gcp-4cdeea1781db.json
Auth for the service account:
$ gcloud auth activate-service-account admin-all@pkslow-gcp.iam.gserviceaccount.com --key-file=${GOOGLE_APPLICATION_CREDENTIALS}
Activated service account credentials for: [admin-all@pkslow-gcp.iam.gserviceaccount.com]
Set the project and check the auth list:
$ gcloud config set project pkslow-gcp
Updated property [core/project].
$ gcloud auth list
Credentialed Accounts
ACTIVE ACCOUNT
* admin-all@pkslow-gcp.iam.gserviceaccount.com
To set the active account, run:
$ gcloud config set account `ACCOUNT`
List the projects:
$ gcloud projects list
PROJECT_ID NAME PROJECT_NUMBER
pkslow-gcp pkslow-gcp 605905500307
gcloud create vm
Enable the compute
service and create the vm:
$ gcloud compute instances create pkslow-vm \
--project=pkslow-gcp \
--zone=us-west1-a \
--machine-type=e2-micro \
--network-interface=network-tier=PREMIUM,subnet=default \
--maintenance-policy=MIGRATE \
--service-account=admin-all@pkslow-gcp.iam.gserviceaccount.com \
--scopes=https://www.googleapis.com/auth/cloud-platform \
--tags=http-server,https-server \
--create-disk=auto-delete=yes,boot=yes,device-name=instance-1,image=projects/centos-cloud/global/images/centos-8-v20211105,mode=rw,size=20,type=projects/pkslow-gcp/zones/us-west1-a/diskTypes/pd-standard \
--no-shielded-secure-boot \
--shielded-vtpm \
--shielded-integrity-monitoring \
--reservation-affinity=any
API [compute.googleapis.com] not enabled on project [605905500307]. Would you like to enable and
retry (this will take a few minutes)? (y/N)? y
Enabling service [compute.googleapis.com] on project [605905500307]...
Operation "operations/acf.p2-605905500307-508eedac-aaf1-4be1-a353-0499eb4ea714" finished successfully.
Created [https://www.googleapis.com/compute/v1/projects/pkslow-gcp/zones/us-west1-a/instances/pkslow-vm].
WARNING: Some requests generated warnings:
- The resource 'projects/centos-cloud/global/images/centos-8-v20211105' is deprecated. A suggested replacement is 'projects/centos-cloud/global/images/centos-8'.
NAME ZONE MACHINE_TYPE PREEMPTIBLE INTERNAL_IP EXTERNAL_IP STATUS
pkslow-vm us-west1-a e2-micro 10.138.0.2 34.83.214.176 RUNNING
Check the instances list:
$ gcloud compute instances list
NAME ZONE MACHINE_TYPE PREEMPTIBLE INTERNAL_IP EXTERNAL_IP STATUS
pkslow-vm us-west1-a e2-micro 10.138.0.2 34.83.214.176 RUNNING
We can also check on the gcp console:
Dataflow quick start for python
Enable related GCP services
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
Create a Cloud Storage bucket
gsutil mb -c STANDARD -l US gs://pkslow-gcp-dataflow
Apache Beam quick start with python
Install python SDK:
pip install apache-beam[gcp]==2.48.0
Read local file with DirectRunner
Create a python file apache_beam_direct_runner_local.py
:
import logging
import re
import apache_beam as beam
def run():
with beam.Pipeline() as pipeline:
print('pipeline start')
result = (pipeline
| 'Read Text' >> beam.io.ReadFromText('/Users/larry/IdeaProjects/pkslow-samples/LICENSE')
| 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'count' >> beam.combiners.Count.PerElement()
| beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
| beam.io.WriteToText('direct_runner_local_output.txt'))
print(result)
print('pipeline done')
if __name__ == '__main__':
# logging.getLogger().setLevel(logging.INFO)
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
run()
The application will read a local file and count the words, then write the result to local file.
Run the code:
$ /usr/local/bin/python3.9 apache_beam_direct_runner_local.py
2023-06-18 18:16:45 INFO Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
pipeline start
PCollection[WriteToText/Write/WriteImpl/FinalizeWrite.None]
pipeline done
2023-06-18 18:16:45 INFO Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
2023-06-18 18:16:45 INFO ==================== <function annotate_downstream_side_inputs at 0x12911e550> ====================
2023-06-18 18:16:45 INFO ==================== <function fix_side_input_pcoll_coders at 0x12911e670> ====================
2023-06-18 18:16:45 INFO ==================== <function pack_combiners at 0x12911eb80> ====================
2023-06-18 18:16:45 INFO ==================== <function lift_combiners at 0x12911ec10> ====================
2023-06-18 18:16:45 INFO ==================== <function expand_sdf at 0x12911edc0> ====================
2023-06-18 18:16:45 INFO ==================== <function expand_gbk at 0x12911ee50> ====================
2023-06-18 18:16:45 INFO ==================== <function sink_flattens at 0x12911ef70> ====================
2023-06-18 18:16:45 INFO ==================== <function greedily_fuse at 0x12911f040> ====================
2023-06-18 18:16:46 INFO ==================== <function read_to_impulse at 0x12911f0d0> ====================
2023-06-18 18:16:46 INFO ==================== <function impulse_to_input at 0x12911f160> ====================
2023-06-18 18:16:46 INFO ==================== <function sort_stages at 0x12911f3a0> ====================
2023-06-18 18:16:46 INFO ==================== <function add_impulse_to_dangling_transforms at 0x12911f4c0> ====================
2023-06-18 18:16:46 INFO ==================== <function setup_timer_mapping at 0x12911f310> ====================
2023-06-18 18:16:46 INFO ==================== <function populate_data_channel_coders at 0x12911f430> ====================
2023-06-18 18:16:46 INFO Creating state cache with size 104857600
2023-06-18 18:16:46 INFO Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x129391cd0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
2023-06-18 18:16:46 INFO Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
2023-06-18 18:16:46 INFO Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
2023-06-18 18:16:46 INFO Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
2023-06-18 18:16:46 INFO Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
2023-06-18 18:16:46 INFO Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
2023-06-18 18:16:46 INFO Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
2023-06-18 18:16:46 INFO Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
2023-06-18 18:16:46 INFO Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
2023-06-18 18:16:46 INFO Renamed 1 shards in 0.02 seconds.
It will generate a result file:
Apache: 4
License: 29
Version: 2
January: 1
http: 2
www: 2
apache: 2
org: 2
Read gs file with DirectRunner
Create a python file apache_beam_word_count.py
the same as example.
Run the code:
$ /usr/local/bin/python3.9 apache_beam_word_count.py --output outputs
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function annotate_downstream_side_inputs at 0x1279590d0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function fix_side_input_pcoll_coders at 0x1279591f0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function pack_combiners at 0x127959700> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x127959790> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_sdf at 0x127959940> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_gbk at 0x1279599d0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sink_flattens at 0x127959af0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function greedily_fuse at 0x127959b80> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function read_to_impulse at 0x127959c10> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function impulse_to_input at 0x127959ca0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x127959ee0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function add_impulse_to_dangling_transforms at 0x12795e040> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function setup_timer_mapping at 0x127959e50> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function populate_data_channel_coders at 0x127959f70> ====================
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x127bcc130> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.02 seconds.
It will generate the result file:
KING: 243
LEAR: 236
DRAMATIS: 1
PERSONAE: 1
king: 65
of: 447
Britain: 2
OF: 15
FRANCE: 10
The source code and output files:
Run on Dataflow service
/usr/local/bin/python3.9 apache_beam_word_count.py \
--region us-west1 \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://pkslow-gcp-dataflow/results/outputs \
--runner DataflowRunner \
--project pkslow-gcp \
--temp_location gs://pkslow-gcp-dataflow/tmp/
The whole execution log:
$ /usr/local/bin/python3.9 apache_beam_word_count.py \
> --region us-west1 \
> --input gs://dataflow-samples/shakespeare/kinglear.txt \
> --output gs://pkslow-gcp-dataflow/results/outputs \
> --runner DataflowRunner \
> --project pkslow-gcp \
> --temp_location gs://pkslow-gcp-dataflow/tmp/
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/opt/python@3.9/bin/python3.9', '-m', 'pip', 'download', '--dest', '/var/folders/zl/jrl7_w6n2vx_5t2wtzggntxm0000gn/T/tmp4vp1oih5', 'apache-beam==2.48.0', '--no-deps', '--no-binary', ':all:']
[notice] A new release of pip available: 22.3.1 -> 23.1.2
[notice] To update, run: python3.9 -m pip install --upgrade pip
INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binary distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/opt/python@3.9/bin/python3.9', '-m', 'pip', 'download', '--dest', '/var/folders/zl/jrl7_w6n2vx_5t2wtzggntxm0000gn/T/tmp4vp1oih5', 'apache-beam==2.48.0', '--no-deps', '--only-binary', ':all:', '--python-version', '39', '--implementation', 'cp', '--abi', 'cp39', '--platform', 'manylinux2014_x86_64']
[notice] A new release of pip available: 22.3.1 -> 23.1.2
[notice] To update, run: python3.9 -m pip install --upgrade pip
INFO:apache_beam.runners.portability.stager:Staging binary distribution of the SDK from PyPI: apache_beam-2.48.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.48.0
INFO:root:Using provided Python SDK container image: gcr.io/cloud-dataflow/v1beta3/python39:2.48.0
INFO:root:Python SDK container image set to "gcr.io/cloud-dataflow/v1beta3/python39:2.48.0" for Docker environment
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function pack_combiners at 0x11f7168b0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x11f7170d0> ====================
INFO:apache_beam.runners.dataflow.internal.apiclient:Defaulting to the temp_location as staging_location: gs://pkslow-gcp-dataflow/tmp/
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://pkslow-gcp-dataflow/tmp/beamapp-larry-0618102713-207274-vsslzg6s.1687084033.207576/pickled_main_session...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://pkslow-gcp-dataflow/tmp/beamapp-larry-0618102713-207274-vsslzg6s.1687084033.207576/pickled_main_session in 1 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://pkslow-gcp-dataflow/tmp/beamapp-larry-0618102713-207274-vsslzg6s.1687084033.207576/dataflow_python_sdk.tar...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://pkslow-gcp-dataflow/tmp/beamapp-larry-0618102713-207274-vsslzg6s.1687084033.207576/dataflow_python_sdk.tar in 1 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://pkslow-gcp-dataflow/tmp/beamapp-larry-0618102713-207274-vsslzg6s.1687084033.207576/apache_beam-2.48.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://pkslow-gcp-dataflow/tmp/beamapp-larry-0618102713-207274-vsslzg6s.1687084033.207576/apache_beam-2.48.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl in 21 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://pkslow-gcp-dataflow/tmp/beamapp-larry-0618102713-207274-vsslzg6s.1687084033.207576/pipeline.pb...
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://pkslow-gcp-dataflow/tmp/beamapp-larry-0618102713-207274-vsslzg6s.1687084033.207576/pipeline.pb in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job
clientRequestId: '20230618102713208255-2760'
createTime: '2023-06-18T10:27:44.690792Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-18_03_27_42-4147829452393375490'
location: 'us-west1'
name: 'beamapp-larry-0618102713-207274-vsslzg6s'
projectId: 'pkslow-gcp'
stageStates: []
startTime: '2023-06-18T10:27:44.690792Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)>
INFO:apache_beam.runners.dataflow.internal.apiclient:Created job with id: [2023-06-18_03_27_42-4147829452393375490]
INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: 2023-06-18_03_27_42-4147829452393375490
INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobs/us-west1/2023-06-18_03_27_42-4147829452393375490?project=pkslow-gcp
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2023-06-18_03_27_42-4147829452393375490 is in state JOB_STATE_PENDING
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:43.166Z: JOB_MESSAGE_BASIC: Dataflow Runner V2 auto-enabled. Use --experiments=disable_runner_v2 to opt out.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:45.434Z: JOB_MESSAGE_DETAILED: Autoscaling is enabled for job 2023-06-18_03_27_42-4147829452393375490. The number of workers will be between 1 and 1000.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:45.505Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically enabled for job 2023-06-18_03_27_42-4147829452393375490.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:47.267Z: JOB_MESSAGE_BASIC: Worker configuration: n1-standard-1 in us-west1-a.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:47.778Z: JOB_MESSAGE_DETAILED: Expanding SplittableParDo operations into optimizable parts.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:47.808Z: JOB_MESSAGE_DETAILED: Expanding CollectionToSingleton operations into optimizable parts.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:47.856Z: JOB_MESSAGE_DETAILED: Expanding CoGroupByKey operations into optimizable parts.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:47.882Z: JOB_MESSAGE_DEBUG: Combiner lifting skipped for step Write/Write/WriteImpl/GroupByKey: GroupByKey not followed by a combiner.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:47.932Z: JOB_MESSAGE_DETAILED: Expanding GroupByKey operations into optimizable parts.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:47.955Z: JOB_MESSAGE_DEBUG: Annotating graph with Autotuner information.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:47.981Z: JOB_MESSAGE_DETAILED: Fusing adjacent ParDo, Read, Write, and Flatten operations
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.006Z: JOB_MESSAGE_DETAILED: Fusing consumer Write/Write/WriteImpl/InitializeWrite into Write/Write/WriteImpl/DoOnce/Map(decode)
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.026Z: JOB_MESSAGE_DETAILED: Fusing consumer Write/Write/WriteImpl/DoOnce/FlatMap(<lambda at core.py:3634>) into Write/Write/WriteImpl/DoOnce/Impulse
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.060Z: JOB_MESSAGE_DETAILED: Fusing consumer Write/Write/WriteImpl/DoOnce/Map(decode) into Write/Write/WriteImpl/DoOnce/FlatMap(<lambda at core.py:3634>)
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.086Z: JOB_MESSAGE_DETAILED: Fusing consumer Read/Read/Map(<lambda at iobase.py:908>) into Read/Read/Impulse
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.116Z: JOB_MESSAGE_DETAILED: Fusing consumer ref_AppliedPTransform_Read-Read-SDFBoundedSourceReader-ParDo-SDFBoundedSourceDoFn-_7/PairWithRestriction into Read/Read/Map(<lambda at iobase.py:908>)
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.142Z: JOB_MESSAGE_DETAILED: Fusing consumer ref_AppliedPTransform_Read-Read-SDFBoundedSourceReader-ParDo-SDFBoundedSourceDoFn-_7/SplitWithSizing into ref_AppliedPTransform_Read-Read-SDFBoundedSourceReader-ParDo-SDFBoundedSourceDoFn-_7/PairWithRestriction
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.166Z: JOB_MESSAGE_DETAILED: Fusing consumer Split into ref_AppliedPTransform_Read-Read-SDFBoundedSourceReader-ParDo-SDFBoundedSourceDoFn-_7/ProcessElementAndRestrictionWithSizing
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.192Z: JOB_MESSAGE_DETAILED: Fusing consumer PairWithOne into Split
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.221Z: JOB_MESSAGE_DETAILED: Fusing consumer GroupAndSum/GroupByKey+GroupAndSum/Combine/Partial into PairWithOne
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.247Z: JOB_MESSAGE_DETAILED: Fusing consumer GroupAndSum/GroupByKey/Write into GroupAndSum/GroupByKey+GroupAndSum/Combine/Partial
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.274Z: JOB_MESSAGE_DETAILED: Fusing consumer GroupAndSum/Combine into GroupAndSum/GroupByKey/Read
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.307Z: JOB_MESSAGE_DETAILED: Fusing consumer GroupAndSum/Combine/Extract into GroupAndSum/Combine
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.331Z: JOB_MESSAGE_DETAILED: Fusing consumer Format into GroupAndSum/Combine/Extract
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.353Z: JOB_MESSAGE_DETAILED: Fusing consumer Write/Write/WriteImpl/WindowInto(WindowIntoFn) into Format
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.380Z: JOB_MESSAGE_DETAILED: Fusing consumer Write/Write/WriteImpl/WriteBundles into Write/Write/WriteImpl/WindowInto(WindowIntoFn)
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.418Z: JOB_MESSAGE_DETAILED: Fusing consumer Write/Write/WriteImpl/Pair into Write/Write/WriteImpl/WriteBundles
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.440Z: JOB_MESSAGE_DETAILED: Fusing consumer Write/Write/WriteImpl/GroupByKey/Write into Write/Write/WriteImpl/Pair
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.467Z: JOB_MESSAGE_DETAILED: Fusing consumer Write/Write/WriteImpl/Extract into Write/Write/WriteImpl/GroupByKey/Read
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.520Z: JOB_MESSAGE_DEBUG: Workflow config is missing a default resource spec.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.539Z: JOB_MESSAGE_DEBUG: Adding StepResource setup and teardown to workflow graph.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.565Z: JOB_MESSAGE_DEBUG: Adding workflow start and stop steps.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.593Z: JOB_MESSAGE_DEBUG: Assigning stage ids.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.727Z: JOB_MESSAGE_DEBUG: Executing wait step start34
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.781Z: JOB_MESSAGE_BASIC: Executing operation Read/Read/Impulse+Read/Read/Map(<lambda at iobase.py:908>)+ref_AppliedPTransform_Read-Read-SDFBoundedSourceReader-ParDo-SDFBoundedSourceDoFn-_7/PairWithRestriction+ref_AppliedPTransform_Read-Read-SDFBoundedSourceReader-ParDo-SDFBoundedSourceDoFn-_7/SplitWithSizing
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.806Z: JOB_MESSAGE_BASIC: Executing operation Write/Write/WriteImpl/DoOnce/Impulse+Write/Write/WriteImpl/DoOnce/FlatMap(<lambda at core.py:3634>)+Write/Write/WriteImpl/DoOnce/Map(decode)+Write/Write/WriteImpl/InitializeWrite
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.825Z: JOB_MESSAGE_DEBUG: Starting worker pool setup.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:27:48.846Z: JOB_MESSAGE_BASIC: Starting 1 workers in us-west1-a...
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2023-06-18_03_27_42-4147829452393375490 is in state JOB_STATE_RUNNING
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:28:39.136Z: JOB_MESSAGE_DETAILED: Autoscaling: Raised the number of workers to 1 based on the rate of progress in the currently running stage(s).
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:29:05.873Z: JOB_MESSAGE_DETAILED: Workers have started successfully.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:32.740Z: JOB_MESSAGE_DETAILED: All workers have finished the startup processes and began to receive work requests.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:34.737Z: JOB_MESSAGE_BASIC: Finished operation Write/Write/WriteImpl/DoOnce/Impulse+Write/Write/WriteImpl/DoOnce/FlatMap(<lambda at core.py:3634>)+Write/Write/WriteImpl/DoOnce/Map(decode)+Write/Write/WriteImpl/InitializeWrite
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:34.801Z: JOB_MESSAGE_DEBUG: Value "Write/Write/WriteImpl/DoOnce/Map(decode).None" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:34.830Z: JOB_MESSAGE_DEBUG: Value "Write/Write/WriteImpl/InitializeWrite.None" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:34.887Z: JOB_MESSAGE_BASIC: Executing operation Write/Write/WriteImpl/WriteBundles/View-python_side_input0-Write/Write/WriteImpl/WriteBundles
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:34.921Z: JOB_MESSAGE_BASIC: Executing operation Write/Write/WriteImpl/FinalizeWrite/View-python_side_input0-Write/Write/WriteImpl/FinalizeWrite
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:34.953Z: JOB_MESSAGE_BASIC: Finished operation Write/Write/WriteImpl/WriteBundles/View-python_side_input0-Write/Write/WriteImpl/WriteBundles
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:34.960Z: JOB_MESSAGE_BASIC: Executing operation Write/Write/WriteImpl/PreFinalize/View-python_side_input0-Write/Write/WriteImpl/PreFinalize
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:34.972Z: JOB_MESSAGE_BASIC: Finished operation Write/Write/WriteImpl/FinalizeWrite/View-python_side_input0-Write/Write/WriteImpl/FinalizeWrite
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:35.005Z: JOB_MESSAGE_BASIC: Finished operation Write/Write/WriteImpl/PreFinalize/View-python_side_input0-Write/Write/WriteImpl/PreFinalize
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:35.012Z: JOB_MESSAGE_DEBUG: Value "Write/Write/WriteImpl/WriteBundles/View-python_side_input0-Write/Write/WriteImpl/WriteBundles.out" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:35.037Z: JOB_MESSAGE_DEBUG: Value "Write/Write/WriteImpl/FinalizeWrite/View-python_side_input0-Write/Write/WriteImpl/FinalizeWrite.out" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:35.064Z: JOB_MESSAGE_DEBUG: Value "Write/Write/WriteImpl/PreFinalize/View-python_side_input0-Write/Write/WriteImpl/PreFinalize.out" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:35.621Z: JOB_MESSAGE_BASIC: Finished operation Read/Read/Impulse+Read/Read/Map(<lambda at iobase.py:908>)+ref_AppliedPTransform_Read-Read-SDFBoundedSourceReader-ParDo-SDFBoundedSourceDoFn-_7/PairWithRestriction+ref_AppliedPTransform_Read-Read-SDFBoundedSourceReader-ParDo-SDFBoundedSourceDoFn-_7/SplitWithSizing
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:35.676Z: JOB_MESSAGE_DEBUG: Value "ref_AppliedPTransform_Read-Read-SDFBoundedSourceReader-ParDo-SDFBoundedSourceDoFn-_7-split-with-sizing-out3" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:35.742Z: JOB_MESSAGE_BASIC: Executing operation GroupAndSum/GroupByKey/Create
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:36.156Z: JOB_MESSAGE_BASIC: Finished operation GroupAndSum/GroupByKey/Create
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:36.216Z: JOB_MESSAGE_DEBUG: Value "GroupAndSum/GroupByKey/Session" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:36.279Z: JOB_MESSAGE_BASIC: Executing operation ref_AppliedPTransform_Read-Read-SDFBoundedSourceReader-ParDo-SDFBoundedSourceDoFn-_7/ProcessElementAndRestrictionWithSizing+Split+PairWithOne+GroupAndSum/GroupByKey+GroupAndSum/Combine/Partial+GroupAndSum/GroupByKey/Write
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:37.999Z: JOB_MESSAGE_BASIC: Finished operation ref_AppliedPTransform_Read-Read-SDFBoundedSourceReader-ParDo-SDFBoundedSourceDoFn-_7/ProcessElementAndRestrictionWithSizing+Split+PairWithOne+GroupAndSum/GroupByKey+GroupAndSum/Combine/Partial+GroupAndSum/GroupByKey/Write
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:38.054Z: JOB_MESSAGE_BASIC: Executing operation GroupAndSum/GroupByKey/Close
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:38.112Z: JOB_MESSAGE_BASIC: Finished operation GroupAndSum/GroupByKey/Close
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:38.175Z: JOB_MESSAGE_BASIC: Executing operation Write/Write/WriteImpl/GroupByKey/Create
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:38.290Z: JOB_MESSAGE_BASIC: Finished operation Write/Write/WriteImpl/GroupByKey/Create
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:38.349Z: JOB_MESSAGE_DEBUG: Value "Write/Write/WriteImpl/GroupByKey/Session" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:38.404Z: JOB_MESSAGE_BASIC: Executing operation GroupAndSum/GroupByKey/Read+GroupAndSum/Combine+GroupAndSum/Combine/Extract+Format+Write/Write/WriteImpl/WindowInto(WindowIntoFn)+Write/Write/WriteImpl/WriteBundles+Write/Write/WriteImpl/Pair+Write/Write/WriteImpl/GroupByKey/Write
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:39.502Z: JOB_MESSAGE_BASIC: Finished operation GroupAndSum/GroupByKey/Read+GroupAndSum/Combine+GroupAndSum/Combine/Extract+Format+Write/Write/WriteImpl/WindowInto(WindowIntoFn)+Write/Write/WriteImpl/WriteBundles+Write/Write/WriteImpl/Pair+Write/Write/WriteImpl/GroupByKey/Write
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:39.558Z: JOB_MESSAGE_BASIC: Executing operation Write/Write/WriteImpl/GroupByKey/Close
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:39.612Z: JOB_MESSAGE_BASIC: Finished operation Write/Write/WriteImpl/GroupByKey/Close
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:39.660Z: JOB_MESSAGE_BASIC: Executing operation Write/Write/WriteImpl/GroupByKey/Read+Write/Write/WriteImpl/Extract
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:41.917Z: JOB_MESSAGE_BASIC: Finished operation Write/Write/WriteImpl/GroupByKey/Read+Write/Write/WriteImpl/Extract
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:41.967Z: JOB_MESSAGE_DEBUG: Value "Write/Write/WriteImpl/Extract.None" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:42.013Z: JOB_MESSAGE_BASIC: Executing operation Write/Write/WriteImpl/FinalizeWrite/View-python_side_input1-Write/Write/WriteImpl/FinalizeWrite
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:42.036Z: JOB_MESSAGE_BASIC: Executing operation Write/Write/WriteImpl/PreFinalize/View-python_side_input1-Write/Write/WriteImpl/PreFinalize
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:42.059Z: JOB_MESSAGE_BASIC: Finished operation Write/Write/WriteImpl/FinalizeWrite/View-python_side_input1-Write/Write/WriteImpl/FinalizeWrite
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:42.115Z: JOB_MESSAGE_DEBUG: Value "Write/Write/WriteImpl/FinalizeWrite/View-python_side_input1-Write/Write/WriteImpl/FinalizeWrite.out" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:42.141Z: JOB_MESSAGE_BASIC: Finished operation Write/Write/WriteImpl/PreFinalize/View-python_side_input1-Write/Write/WriteImpl/PreFinalize
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:42.196Z: JOB_MESSAGE_DEBUG: Value "Write/Write/WriteImpl/PreFinalize/View-python_side_input1-Write/Write/WriteImpl/PreFinalize.out" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:42.246Z: JOB_MESSAGE_BASIC: Executing operation Write/Write/WriteImpl/PreFinalize
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:44.921Z: JOB_MESSAGE_BASIC: Finished operation Write/Write/WriteImpl/PreFinalize
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:44.978Z: JOB_MESSAGE_DEBUG: Value "Write/Write/WriteImpl/PreFinalize.None" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:45.031Z: JOB_MESSAGE_BASIC: Executing operation Write/Write/WriteImpl/FinalizeWrite/View-python_side_input2-Write/Write/WriteImpl/FinalizeWrite
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:45.083Z: JOB_MESSAGE_BASIC: Finished operation Write/Write/WriteImpl/FinalizeWrite/View-python_side_input2-Write/Write/WriteImpl/FinalizeWrite
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:45.132Z: JOB_MESSAGE_DEBUG: Value "Write/Write/WriteImpl/FinalizeWrite/View-python_side_input2-Write/Write/WriteImpl/FinalizeWrite.out" materialized.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:45.185Z: JOB_MESSAGE_BASIC: Executing operation Write/Write/WriteImpl/FinalizeWrite
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:46.669Z: JOB_MESSAGE_BASIC: Finished operation Write/Write/WriteImpl/FinalizeWrite
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:46.718Z: JOB_MESSAGE_DEBUG: Executing success step success32
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:46.780Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:47.056Z: JOB_MESSAGE_DEBUG: Starting worker pool teardown.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:30:47.079Z: JOB_MESSAGE_BASIC: Stopping worker pool...
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:31:30.978Z: JOB_MESSAGE_DETAILED: Autoscaling: Resized worker pool from 1 to 0.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:31:31.014Z: JOB_MESSAGE_BASIC: Worker pool stopped.
INFO:apache_beam.runners.dataflow.dataflow_runner:2023-06-18T10:31:31.041Z: JOB_MESSAGE_DEBUG: Tearing down pending resources...
INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2023-06-18_03_27_42-4147829452393375490 is in state JOB_STATE_DONE
It uploaded the package to the temp folder on GS, and then it created a Dataflow
job to execute.
It wrote the final result to the GS folder.
The code on pkslow-samples/python/apache-beam
References:
GCP Dataflow Apache Beam with Python
Understanding the Dataflow Quickstart for Python Tutorial
Explain Apache Beam python syntax
Print timestamp for logging in Python
Where does PIP Store / Save Python 3 Modules / Packages on Windows 8?
Dataproc on GCP
We can create Dataproc on GCE or GKE, now let create on GCE for quickstart.
Input the name and the location:
We can also configure the node type. After all the parameters configured, click CREATE
.
run python on spark locally
Firstly, we need to install the pyspark
:
$ pip install pyspark
Verify the lib:
$ pip show pyspark
Name: pyspark
Version: 3.4.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.9/site-packages
Requires: py4j
Required-by:
Write a simple wordcount code:
import sys
from pyspark.sql import SparkSession
from operator import add
import os
def run_spark(filename):
spark = SparkSession.builder.appName('pkslow-local').getOrCreate()
lines = spark.read.text(filename).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
output.sort(key=lambda item: item[1])
for (word, count) in output:
print("%s: %i" % (word, count))
spark.stop()
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
sys.exit(-1)
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3.9'
filename = sys.argv[1]
run_spark(filename)
Run the code:
$ python3.9 spark_local.py /Users/larry/IdeaProjects/pkslow-samples/LICENSE
issue: python in worker has different version 3.10 than that in driver 3.9
Set the env PYSPARK_PYTHON
:
os.environ['PYSPARK_PYTHON'] = '/usr/local/bin/python3.9'
Or can set the system env before you run the code.
Submit Python job to Dataproc
Create a python file spark_gs.py
:
#!/usr/bin/env python
import pyspark
import sys
if len(sys.argv) != 3:
raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>")
inputUri = sys.argv[1]
outputUri = sys.argv[2]
sc = pyspark.SparkContext()
lines = sc.textFile(sys.argv[1])
words = lines.flatMap(lambda line: line.split())
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2)
wordCounts.saveAsTextFile(sys.argv[2])
Submit the jobs with gcloud
command:
$ gcloud dataproc jobs submit pyspark spark_gs.py --cluster=test --region=us-central1 -- gs://dataflow-samples/shakespeare/kinglear.txt gs://pkslow-gcp-dataproc/result/
Job [89ea141a0d764c5eae2874f9912061ee] submitted.
Waiting for job output...
23/06/24 03:38:41 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/06/24 03:38:41 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/06/24 03:38:41 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/06/24 03:38:41 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
23/06/24 03:38:41 INFO org.sparkproject.jetty.util.log: Logging initialized @3650ms to org.sparkproject.jetty.util.log.Slf4jLog
23/06/24 03:38:41 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_372-b07
23/06/24 03:38:41 INFO org.sparkproject.jetty.server.Server: Started @3801ms
23/06/24 03:38:41 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@7dfb5455{HTTP/1.1, (http/1.1)}{0.0.0.0:41809}
23/06/24 03:38:42 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at test-m/10.128.0.8:8032
23/06/24 03:38:42 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at test-m/10.128.0.8:10200
23/06/24 03:38:44 INFO org.apache.hadoop.conf.Configuration: resource-types.xml not found
23/06/24 03:38:44 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils: Unable to find 'resource-types.xml'.
23/06/24 03:38:45 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1687337392105_0002
23/06/24 03:38:46 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at test-m/10.128.0.8:8030
23/06/24 03:38:49 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Ignoring exception of type GoogleJsonResponseException; verified object already exists with desired state.
23/06/24 03:38:53 INFO org.apache.hadoop.mapred.FileInputFormat: Total input files to process : 1
23/06/24 03:39:12 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem: Successfully repaired 'gs://pkslow-gcp-dataproc/result/' directory.
23/06/24 03:39:13 INFO org.sparkproject.jetty.server.AbstractConnector: Stopped Spark@7dfb5455{HTTP/1.1, (http/1.1)}{0.0.0.0:0}
Job [89ea141a0d764c5eae2874f9912061ee] finished successfully.
done: true
driverControlFilesUri: gs://dataproc-staging-us-central1-605905500307-bt2ibh6v/google-cloud-dataproc-metainfo/1ceb8e02-6124-498b-97ac-24f6ec27ed95/jobs/89ea141a0d764c5eae2874f9912061ee/
driverOutputResourceUri: gs://dataproc-staging-us-central1-605905500307-bt2ibh6v/google-cloud-dataproc-metainfo/1ceb8e02-6124-498b-97ac-24f6ec27ed95/jobs/89ea141a0d764c5eae2874f9912061ee/driveroutput
jobUuid: ea41c320-7196-32c1-ac04-f500924bf8a3
placement:
clusterName: test
clusterUuid: 1ceb8e02-6124-498b-97ac-24f6ec27ed95
pysparkJob:
args:
- gs://dataflow-samples/shakespeare/kinglear.txt
- gs://pkslow-gcp-dataproc/result/
mainPythonFileUri: gs://dataproc-staging-us-central1-605905500307-bt2ibh6v/google-cloud-dataproc-metainfo/1ceb8e02-6124-498b-97ac-24f6ec27ed95/jobs/89ea141a0d764c5eae2874f9912061ee/staging/spark_gs.py
reference:
jobId: 89ea141a0d764c5eae2874f9912061ee
projectId: pkslow-gcp
status:
state: DONE
stateStartTime: '2023-06-24T03:39:17.489713Z'
statusHistory:
- state: PENDING
stateStartTime: '2023-06-24T03:38:36.302890Z'
- state: SETUP_DONE
stateStartTime: '2023-06-24T03:38:36.358310Z'
- details: Agent reported job success
state: RUNNING
stateStartTime: '2023-06-24T03:38:36.579046Z'
yarnApplications:
- name: spark_gs.py
progress: 1.0
state: FINISHED
trackingUrl: http://test-m:8088/proxy/application_1687337392105_0002/
After the jobs is completed, check the result:
$ gsutil ls gs://pkslow-gcp-dataproc/result
gs://pkslow-gcp-dataproc/result/
gs://pkslow-gcp-dataproc/result/_SUCCESS
gs://pkslow-gcp-dataproc/result/part-00000
gs://pkslow-gcp-dataproc/result/part-00001
$ gsutil cat gs://pkslow-gcp-dataproc/result/part-00000
('LEAR', 222)
('PERSONAE', 1)
('king', 29)
('of', 439)
('(KING', 1)
('OF', 15)
('FRANCE:', 1)
('DUKE', 3)
('(BURGUNDY:)', 1)
('(CORNWALL:)', 1)
('(KENT:)', 1)
('(GLOUCESTER:)', 1)
('son', 21)
('Gloucester.', 9)
('bastard', 4)
('CURAN', 6)
('courtier.', 1)
...
We can also check the execution details on console:
References:
Use the Cloud Storage connectot with Spark
Airflow on GCP (Cloud Composer)
Cloud Composer
is a fully managed Apache Airflow service on GCP. It empowers us to author, schedule and monitor pipelines.
Create Composer 2
I created a Composer service with small resources:
In fact, it’s running on GKE with Autopilot
mode:
$ gcloud container clusters list
NAME LOCATION MASTER_VERSION MASTER_IP MACHINE_TYPE NODE_VERSION NUM_NODES STATUS
us-central1-pkslow-composer-d7602fc1-gke us-central1 1.25.8-gke.1000 35.225.201.14 e2-medium 1.25.8-gke.1000 3 RUNNING
We can open the Airflow UI and open the DAGs folder in the console:
Airflow UI: https://cc713e0579414acf973265adfedb5e86-dot-us-central1.composer.googleusercontent.com/home
The DAGs:
Create and submit DAGs
Install the Python airflow package:
$ pip install apache-airflow
Write the code for DAG:
from datetime import datetime
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
dag_test = DAG(
dag_id="pkslow_test_dag",
start_date=datetime(2023, 6, 13, 5, 12, 0),
schedule_interval='*/10 * * * *',
)
generate_random_number = BashOperator(
task_id="generate_random_number",
bash_command='echo $RANDOM',
dag=dag_test,
)
def print_message():
print("This is a Python Operator by www.pkslow.com.")
python_task = PythonOperator(
task_id="print_message",
python_callable=print_message,
dag=dag_test
)
t0 = EmptyOperator(task_id="t0")
t1 = EmptyOperator(task_id="t1")
t2 = EmptyOperator(task_id="t2")
t3 = EmptyOperator(task_id="t3")
t4 = EmptyOperator(task_id="t4")
t5 = EmptyOperator(task_id="t5")
t6 = EmptyOperator(task_id="t6")
chain(generate_random_number, python_task, t0, t1, [t2, t3], [t4, t5], t6)
Submit the DAG to the GS with gsutil
:
$ gsutil cp bash_python_operator.py gs://us-central1-pkslow-composer-d7602fc1-bucket/dags/
Copying file://bash_python_operator.py [Content-Type=text/x-python]...
/ [1 files][ 920.0 B/ 920.0 B]
Operation completed over 1 objects/920.0 B.
It will be automatically detected and rendered:
References:
Manage DAG and task dependencies in Airflow
airflow + dataflow
Install the Apache Beam
package for airflow:
pip install apache-airflow-providers-apache-beam[google]
Define the DAG:
from airflow import DAG
import airflow
from datetime import timedelta
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
default_args = {
'owner': 'Airflow',
# 'start_date':airflow.utils.dates.days_ago(1),
'retries': 1,
'retry_delay': timedelta(seconds=50),
'dataflow_default_options': {
'project': 'pkslow-gcp',
'region': 'us-west1',
'runner': 'DataflowRunner'
}
}
dag = DAG(
dag_id='pkslow_dataflow',
default_args=default_args,
schedule_interval='@daily',
start_date=airflow.utils.dates.days_ago(1),
catchup=False,
description="DAG for data ingestion and transformation"
)
dataflow_task = BeamRunPythonPipelineOperator(
task_id='pkslow_dataflow_job',
py_file='gs://us-central1-pkslow-composer-d7602fc1-bucket/py_scripts/apache_beam_word_count.py',
pipeline_options={
'input': 'gs://dataflow-samples/shakespeare/kinglear.txt',
'output': 'gs://pkslow-gcp-dataflow/results/outputs',
'temp_location': 'gs://pkslow-gcp-dataflow/tmp/',
},
dag=dag
)
The Python script apache_beam_word_count.py
is here: Dataflow on GCP. And we need to pass the parameter as pipeline_options
.
Upload the Python script:
$ gsutil cp apache-beam/apache_beam_word_count.py gs://us-central1-pkslow-composer-d7602fc1-bucket/py_scripts/
Upload the DAG:
$ gsutil cp airflow_dataflow.py gs://us-central1-pkslow-composer-d7602fc1-bucket/dags/
After a while, check on the Airflow UI:
Trigger the job and check the log and result:
Result:
References:
Google Cloud Dataflow Operators
Launching Dataflow pipelines with Cloud Composer
airflow+dataproc
Define DAG:
import airflow
from airflow import DAG
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
PROJECT_ID = "pkslow-gcp"
CLUSTER_NAME = 'test'
REGION = 'us-central1'
PYSPARK_URI = f'gs://pkslow-gcp-dataproc/python-scripts/spark_gs.py'
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": PYSPARK_URI, "args": [
'gs://dataflow-samples/shakespeare/kinglear.txt', 'gs://pkslow-gcp-dataproc/result/'
]},
}
dag = DAG(
dag_id='pkslow_dataproc',
schedule_interval='@daily',
start_date=airflow.utils.dates.days_ago(1),
catchup=False,
description="DAG for data ingestion and transformation"
)
pyspark_task = DataprocSubmitJobOperator(
task_id="pkslow_pyspark_task",
job=PYSPARK_JOB,
region=REGION,
project_id=PROJECT_ID,
dag=dag
)
Upload python script:
$ gsutil cp spark/spark_gs.py gs://pkslow-gcp-dataproc/python-scripts/
Copying file://spark/spark_gs.py [Content-Type=text/x-python]...
/ [1 files][ 550.0 B/ 550.0 B]
Operation completed over 1 objects/550.0 B.
Upload DAG:
$ gsutil cp airflow/airflow_dataproc.py gs://us-central1-pkslow-composer-d7602fc1-bucket/dags/
Copying file://airflow/airflow_dataproc.py [Content-Type=text/x-python]...
/ [1 files][ 1015 B/ 1015 B]
Operation completed over 1 objects/1015.0 B.
Trigger the job:
Check the result:
Can check the execution details on Dataproc:
References: