Skip to main content

Creating Datasets

Why Would You Create Datasets?

The dataset entity is one the most important entities in the metadata model. They represent collections of data that are typically represented as Tables or Views in a database (e.g. BigQuery, Snowflake, Redshift etc.), Streams in a stream-processing environment (Kafka, Pulsar etc.), bundles of data found as Files or Folders in data lake systems (S3, ADLS, etc.). For more information about datasets, refer to Dataset.

Goal Of This Guide

This guide will show you how to create a dataset named realestate_db.sales with three columns.

Prerequisites

For this tutorial, you need to deploy DataHub Quickstart and ingest sample data. For detailed steps, please refer to Datahub Quickstart Guide.

Create Datasets With GraphQL (Not Supported)

🚫 Creating a dataset via graphql is currently not supported. Please check out API feature comparison table for more information,

Create Datasets With Python SDK

The following code creates a Hive dataset named realestate_db.sales with three fields and a URN of urn:li:dataset:(urn:li:dataPlatform:hive,realestate_db.sales,PROD):

# Inlined from /metadata-ingestion/examples/library/dataset_schema.py
# Imports for urn construction utility methods
from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter

# Imports for metadata model classes
from datahub.metadata.schema_classes import (
AuditStampClass,
DateTypeClass,
OtherSchemaClass,
SchemaFieldClass,
SchemaFieldDataTypeClass,
SchemaMetadataClass,
StringTypeClass,
)

event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD"),
aspect=SchemaMetadataClass(
schemaName="customer", # not used
platform=make_data_platform_urn("hive"), # important <- platform must be an urn
version=0, # when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0
hash="", # when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string
platformSchema=OtherSchemaClass(rawSchema="__insert raw schema here__"),
lastModified=AuditStampClass(
time=1640692800000, actor="urn:li:corpuser:ingestion"
),
fields=[
SchemaFieldClass(
fieldPath="address.zipcode",
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="VARCHAR(50)", # use this to provide the type of the field in the source system's vernacular
description="This is the zipcode of the address. Specified using extended form and limited to addresses in the United States",
lastModified=AuditStampClass(
time=1640692800000, actor="urn:li:corpuser:ingestion"
),
),
SchemaFieldClass(
fieldPath="address.street",
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="VARCHAR(100)",
description="Street corresponding to the address",
lastModified=AuditStampClass(
time=1640692800000, actor="urn:li:corpuser:ingestion"
),
),
SchemaFieldClass(
fieldPath="last_sold_date",
type=SchemaFieldDataTypeClass(type=DateTypeClass()),
nativeDataType="Date",
description="Date of the last sale date for this property",
created=AuditStampClass(
time=1640692800000, actor="urn:li:corpuser:ingestion"
),
lastModified=AuditStampClass(
time=1640692800000, actor="urn:li:corpuser:ingestion"
),
),
],
),
)

# Create rest emitter
rest_emitter = DatahubRestEmitter(gms_server="http://localhost:8080")
rest_emitter.emit(event)

Note that the name property of make_dataset_urn sets the display name of the dataset.

After creating the dataset, you can perform various manipulations, such as adding lineage and custom properties. Here are some steps to start with, but for more detailed guidance, please refer to the What's Next section.

Add Lineage

The following code creates a lineage from fct_users_deleted to realestate_db.sales:

import datahub.emitter.mce_builder as builder
from datahub.emitter.rest_emitter import DatahubRestEmitter

# Construct a lineage object.
lineage_mce = builder.make_lineage_mce(
[
builder.make_dataset_urn("hive", "fct_users_deleted"), # Upstream
],
builder.make_dataset_urn("hive", "realestate_db.sales"), # Downstream
)

# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter("http://localhost:8080")

# Emit metadata!
emitter.emit_mce(lineage_mce)

For more information on adding lineages, please refer to how to add lineage on a dataset using PythonSDK.

Add custom properties

You can also set custom properties using the following code:

# Inlined from /metadata-ingestion/examples/library/dataset_add_custom_properties.py
import logging
import time
from pprint import pprint

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper

# read-modify-write requires access to the DataHubGraph (RestEmitter is not enough)
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

# Imports for metadata model classes
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetPropertiesClass,

)

log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

dataset_urn = make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")

# Some helpful variables to fill out objects later
now = int(time.time() * 1000) # milliseconds since epoch
current_timestamp = AuditStampClass(time=now, actor="urn:li:corpuser:ingestion")
custom_properties_to_add = {"key": "value"}

gms_endpoint = "http://localhost:8080"
graph = DataHubGraph(config=DatahubClientConfig(server=gms_endpoint))

# check if there are existing custom properties. If you want to overwrite the current ones, you can comment out this part.
if graph.get_aspect(entity_urn=dataset_urn, aspect_type=DatasetPropertiesClass):
existing_custom_properties = graph.get_aspect(entity_urn=dataset_urn, aspect_type=DatasetPropertiesClass).customProperties
custom_properties_to_add.update(existing_custom_properties)

custom_properties_class = DatasetPropertiesClass(
customProperties=custom_properties_to_add,
)
event = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=custom_properties_class,
)
graph.emit(event)
log.info(f"Custom Properties {custom_properties_to_add} added to dataset {dataset_urn}")

Note that this code checks the existing custom properties of the target dataset and updates them. If you want to overwrite the current ones, you can simply comment out this part:

if graph.get_aspect(entity_urn=dataset_urn, aspect_type=DatasetPropertiesClass):
existing_custom_properties = graph.get_aspect(entity_urn=dataset_urn, aspect_type=DatasetPropertiesClass).customProperties
custom_properties_to_add.update(existing_custom_properties)

This will create custom properties like the ones shown below as an outcome.

custom-properties-added

We're using the MetdataChangeProposalWrapper to change entities in this example. For more information about the MetadataChangeProposal, please refer to MetadataChangeProposal & MetadataChangeLog Events.

Expected Outcomes

You can now see realestate_db.sales dataset has been created.

dataset-created

What's Next?

Now that you created a dataset, how about enriching it? Here are some guides that you can check out.