partners 2013 linkedin use cases for teradata connectors for hadoop
Post on 20-Oct-2014
769 views
DESCRIPTION
TRANSCRIPT
Eric Sun: [email protected]/in/ericsun
Jason Chen: [email protected]/in/jason8chen
LINKEDIN USE CASES FOR TERADATA CONNECTORS FOR HADOOP
Copyright © Teradata 2013
• UDA > Need for high volume data movement > Challenges with high volume data movement
• TDCH & SQL-H> Data movement between Teradata and Hadoop > Architecture, key features, and various packages
• LinkedIn POC > Architecture big picture > Use cases > POC environment > POC results and learning > Wish list of enhancements
• Next steps and Q&A
Agenda
Copyright © Teradata 2013
TERADATA UNIFIED DATA ARCHITECTURE
AUDIO & VIDEO IMAGES TEXT WEB & SOCIAL MACHINE LOGS CRM SCM ERP
DISCOVERY PLATFORM
CAPTURE | STORE | REFINE
INTEGRATED DATA WAREHOUSE
LANGUAGES MATH & STATS DATA MINING BUSINESS INTELLIGENCE APPLICATIONSVIEWPOINT SUPPORT
Engineers
Data Scientists
Business Analysts
Front-Line WorkersCustomers / PartnersQuants
Operational SystemsExecutives
Aster Connector for Hadoop
Teradata Connector for Hadoop
Aster Teradata Connector
SQL-H
Teradata Studio Smart Loader for Hadoop
SQL-H
Data Fabric
Copyright © Teradata 2013
Data Movement Challenges• Data Movement supposed to be Easy
- So businesses can spend more time on analytics - But it is not easy as businesses would like in reality - Challenges are even greater with massively parallel systems
• Data Movement between Teradata and Hadoop > Two massively parallel systems > Any high volume data movement
– Should exploit as much underlying parallelism as appropriate – Single–threaded or single-node processing architecture will not cut it – Move data along the path in compressed form for as long as possible
> Various popular Hadoop data formats – Should be supported to avoid the need for staging & intermediate files – Automatic data type/format conversion to minimize manual work by users
> Constraints in a production environment – Should be accommodated as much as possible – E.g., limitation on concurrent sessions imposed by mixed workload control
Copyright © Teradata 2013
Can technologies work with each other?
Copyright © Teradata 2013
Big Data Army Together
TERADATA CONNECTOR FOR HADOOP
Copyright © Teradata 2013
MapReduce
HCatHive
Teradata
Hadoop
Sqoop
Text Sequence RC
Pig
I/O Format
File I/O FormatDB I/O FormatTeradata
I/O Format …
Hadoop DFS
Teradata DB
TDExportImportTools
TDCH Architecture
Copyright © Teradata 2013
TDCH Technical Highlights
• Build on MapReduce - For execution and deployment scalability
- Proven scalability for up to thousands of nodes
- For integration with various data formats - Text, Sequence, RC, ORC (soon), Avro (soon), …
- For integration with other MR based tools - Sqoop, Hive, Hcatalog, Pig (future), and possibly others
• Built for Usability & Integration > Simple command-line interface > Simple application programming interface for developers > Metadata-based data extraction, load, and conversion
– Built-in support for reading/writing data in Hive and Hcat tables – Built-in serialization/de-serialization support for various file formats
Copyright © Teradata 2013
TDCH Export Methods (Hadoop Teradata)
Various Implementations
• Batch-insert- Each mapper starts a session to insert data via JDBC batch
execution
• Multiple-fastload- Each mapper starts a separate fastload job to load data via
JDBC fastload
• Internal-fastload- Each mapper starts a session to load data via JDBC fastload
protocol but all sessions are operating as a single fastload job
Copyright © Teradata 2013
TDCH Import Methods (Teradata Hadoop)
Various Implementations• Split-by-value
- Each mapper starts a session to retrieve data in a given value range from a source table in Teradata
• Split-by-hash- Each mapper starts a session to retrieve data in a given hash value range
from a source table in Teradata
• Split-by-partition- Each mapper starts a session to retrieve a subset of partitions from a
source table in Teradata if the source table is already a partitioned table- If the source table is not a partitioned table, a partitioned staging table will
be created with a partition key that is the same as the distribution key
• Split-by-amp- Each mapper gets data from an individual amp - TD 14.10 required; this method makes use of table operators
Copyright © Teradata 2013
> Teradata Connector for Hadoop – For users who would like to use a simple command line interface
> Sqoop Connectors for Teradata – For users who would like to use the Sqoop command line interface – Sqoop connector for TD from Hortonworks uses TDCH under the cover – New Sqoop connector for TD from Cloudera uses TDCH under the cover
> TD Studio Smart Loader for Hadoop – For users who would like to use the Teradata Studio GUI – TD Studio Smart Loader for Hadoop uses TDCH under the cover for data
movement between Terada and Hadoop
Various Packages for End Users
TERADATA SQL-H
Copyright © Teradata 2013
• SQL-H > Build on table operators > Enable dynamic SQL access to Hadoop data > Can list existing Hadoop database and files > SQL requests parsed and executed by Teradata> Can join data in Hadoop with tables in Teradata
• Why is this important?> Enables analysis of Hadoop data in Teradata> Allow standard ANSI SQL access to Hadoop data> Lowers costs by making data analysts self-sufficient> Leverage existing BI tool investments just like Aster SQL-H does
• Released with Teradata Database 14.10
Teradata SQL-H
Copyright © Teradata 2013
Teradata SQL-H Example
SELECT CAST(Price AS DECIMAL (8,2)) ,CAST(Make AS VARCHAR(20)) ,CAST(Model AS VARCHAR(20))FROM LOAD_FROM_HCATALOG( USING SERVER('sdll4364.labs.teradata.com') PORT('9083') USERNAME ('hive') DBNAME('default') TABLENAME('CarPriceData') COLUMNS('*') TEMPLETON_PORT('1880')) as CarPriceInfo;
The SQL-H Table Operator query is launched on the Teradata side.
Data conversion is conducted within Teradata after the data has been transferred from Hadoop.
Copyright © Teradata 2013
BI ToolsHCat
Hive
Teradata Hadoop
Sqoop
Text Sequence RC
Pig
HDFSTeradata DB
Teradata SQL
Teradata Tools
MapReduce
ETL Tools
TD Connector for Hadoop (TDCH)
SQL-H
TDCH: Scalable, high performance bi-directional data movement
Connectors Designed for Two Different Audiences
Copyright © Teradata 2013
• Hadoop > Excellent scalability > Rapidly evolving ecosystem > Not yet as enterprise-ready as one would like
– Lacking support for effective performance management
• Challenge (and opportunity) > Enterprise tools and apps to fill the gap > Provide the instrumentation and functionality
– For fine-grain (parallel systems) performance management
• TDCH is improving > with richer instrumentation and functionality > to fill the performance management gap as much as possible
The Challenge with Hadoop
Copyright © Teradata 2013
LinkedIn Overall Data Flow
Copyright © Teradata 2013
LinkedIn Data System - Hadoop
Most data in Avro format Access via Pig & Hive
Most High-volume ETL processes run here Specialized batch processing
Algorithmic data mining
Copyright © Teradata 2013
LinkedIn Data System - Teradata
Integrated Data WarehouseHourly ETL
Well-modeled Schemas
Standard BI Tools
Interactive Querying(Low Latency)
Workload Management
Copyright © Teradata 2013
• Hadoop is the main platform for data staging, data exploration, click stream ETL, and machine learning;• Teradata is the main platform for data warehouse, BI and
relational data discovery;
• Hadoop holds multi-PB data; TD holds hundred-TB data;• Data need to flow between Hadoop and Teradata;
• Analytical processes and applications need to leverage the most appropriate platform to deliver the data intelligence:> Are all the data needed there? (1-week/3-month/3-year…)> Which programming interfaces are available? (SQL/HQL/Pig…)> How fast I need/How slow I can tolerate?> How to share the results? Who will consume them?
LinkedIn Use Cases
Copyright © Teradata 2013
LinkedIn TDCH POC Environment
Copyright © Teradata 2013
• Copy Hourly/Daily Clickstream Data from HDFS to TD• Copy Scoring & Machine Learning Result from HDFS to TD
> Challenges: Big volume and tight SLA> Steps:
1. Converted data files from Avro to many ^Z-delimited *.gz files via Pig first (flatten map/bag/tuple, and remove special unicode chars)
2. Quickly load *.gz files using Teradata Connector into the staging table with the help of internal.fastload protocol
3. TDCH execute INSERT DML to copy records from the staging table into the final fact table
> Other Options:1. Combine many *.gz into a few, download to NFS, load via TPT 2. Download many *.gz via webHDFS to NFS, load via TPT
LinkedIn Use Cases - Export
Copyright © Teradata 2013
LinkedIn Use Cases - Export
Copyright © Teradata 2013
• Publish dimension and aggregate tables from TD to HDFS> Challenges: Heavy query workload on TD and tight SLA.
Traditional JDBC data dump does not yield high throughput to extract all the dimensional tables within the limited window.
> Steps:1. Full dump for small to medium size dimensional tables2. Timestamp-based incremental for big dimensional tables
Then use M/R job to merge the incremental file with the existing dimensional file on HDFSSave the new dimensional file using LiAvroStorage() as #LATEST copy, and retire the previous version
3. Date-partition-based incremental dump for aggregate tables> Other Options:
1. Home-grown M/R job to extract using split.by.partition and write to LiAvroStorage() directory
2. Write Custom TPT OUTMOD Adapter to convert EXPORT operator’s data parcel to Avro, upload via webHDFS
LinkedIn Use Cases - Import
Copyright © Teradata 2013
Mapper
LinkedIn Use Cases - Import
All AMP Scan(Single AMP Scan
in 14.10)
Execute SELECT statement for each mapper, spool the result and return the cursor to JDBC client
Netw
ork
Transport # (batch size)
of records over network De-Serialize from JDBC stream
Serialize to Hadoop Format
1
2
Cle
an
Up
Loop…Loop…
3
Mapper 2 Network De Se Clean Up
Mapper 3 Network De Se Clean Up
Mapper 4 Network De Se Clean Up
L
Copyright © Teradata 2013
High Level Findings
# sessions is subject to TD workload rules
Network latency plays a big factor to E2E speed
split.by.value is not tested
Copyright © Teradata 2013
Test Data Set:> About 275M rows and 250 bytes/row> 2700MB in TD with BLC and 2044MB as GZip text in HDFS> 64664MB as uncompressed text
• Import uses split-by-hash & Export uses internal-fastload* Import will spend the first a couple of minutes to spool data
* Export M/R job may combine the specified # of mappers to smaller #
LinkedIn POC Benchmark Reference
Import
# Mappers 32 64 128
Import Time 960s 758s 330s
Throughput 67MB/sec 85MB/sec 190MB/sec
Export
# Mappers 15 28 52
Import Time 970s 870s 420s
Throughput 67MB/sec 75MB/sec 154MB/sec
Copyright © Teradata 2013
• TDCH is very easy to setup and use• TDCH provides good throughput for JDBC-based bulk data
movement (import and export)• TDCH simplifies the bulk data exchange interface, so more
robust ETL system can rely on TDCH
• Network latency can be a big performance factor• In production environment, it is not practical to execute
TDCH with too many mappers (e.g. over 150+) for TDCH• Depends on the data set, using too many mappers will not
result in performance gain (because the overhead is high)• Many factors can impact E2E performance, debug is hard
> Some mappers can run for much longer than the others even with the similar number of records to process
> Multiple mappers can run on the same DD – is that wrong?
LinkedIn POC Findings
Copyright © Teradata 2013
• LinkedIn can have simpler methods to move and access data seamlessly throughout their environment using the Teradata Connector for Hadoop. • This leads to reduced costs and operation complexity
because > Command is invoked from Hadoop gateway machine, so the
security verification is taken care by SSH session and Kerberos token already.
> The data synchronization between HDFS and Teradata is faster with the help of both systems’ parallel capability.
> Less ETL jobs are needed for the move data movement, hence easier to support and troubleshoot.
LinkedIn Business Benefits/Results
Copyright © Teradata 2013
hadoop com.teradata.hadoop.tool.TeradataExportTool \ -D mapred.job.queue.name=marathon \ -D mapred.job.name=tdch.export_from_rcfile_to_td.table_name.no_blc \ -D mapred.map.max.attempts=1 \ -D mapred.child.java.opts="-Xmx1G -Djava.security.egd=file:/dev/./urandom" \ -D mapreduce.job.max.split.locations=256 \ -libjars $LIB_JARS \ -url jdbc:teradata://DWDEV/database=DWH,CHARSET=UTF8 \ -username DataCopy -password $DEV_DATACOPY_PASS \ -classname com.teradata.jdbc.TeraDriver \ -queryband "App=TDCH;ClientHost=$HOSTNAME;PID=$$;BLOCKCOMPRESSION=NO;" \ -fileformat rcfile -jobtype hive -method internal.fastload \ -sourcepaths /user/$USER/tdch/example_table_name.rc \ -debughdfsfile /user/$USER/tdch/mapper_debug_info \ -nummappers 32 \ -targettable dwh.tdch_example_table_name \ -sourcetableschema "COL_PK BIGINT, SORT_ID SMALLINT, COL_FLAG TINYINT, ORDER_ID INT, ORDER_STATE STRING, ..., ORDER_UPDATE_TIME TIMESTAMP"
Sample Code - Export
Copyright © Teradata 2013
Sample Code - Import
hadoop com.teradata.hadoop.tool.TeradataImportTool \ -D mapred.job.queue.name=marathon \ -D mapred.job.name=tdch.import_from_td_to_textfile.table_name \ -D mapred.map.max.attempts=1 \ -D mapred.child.java.opts="-Xmx1G -Djava.security.egd=file:/dev/./urandom" \ -D mapred.output.compress=true \ -D mapred.output.compression.codec=org.apache.hadoop.io.compress.LzoCodec \ -libjars $LIB_JARS \ -url jdbc:teradata://DWDEV/database=DWH,CHARSET=UTF8 \ -username DataCopy -password $DEV_DATACOPY_PASS \ -classname com.teradata.jdbc.TeraDriver \ -queryband "App=TDCH;ClientHost=$HOSTNAME;PID=$$;" \ -fileformat textfile -jobtype hdfs –method split.by.hash \ -targetpaths /user/$USER/tdch/example_table_name.text \ -debughdfsfile /user/$USER/tdch/mapper_debug_info \ -nummappers 32 \ -sourcetable dwh.tdch_example_table_name
Copyright © Teradata 2013
-debughdfsfile option (sample output)
mapper id is: task_201307092106_634296_m_000000initialization time of this mapper is: 1382143274810elapsed time of connection created of this mapper is: 992total elapsed time of query execution and first record returned is:221472total elapsed time of data processing and HDFS write operation is:296364end time of this mapper is: 1382143848579
mapper id is: task_201307092106_634273_m_000025initialization time of this mapper is: 1382143015468elapsed time of connection created of this mapper is: 463total elapsed time of data processing and send it to Teradata is:701876end time of this mapper is: 1382143720637
Mapper Level Performance Info
Copyright © Teradata 2013
TDCH injects task attempt id into QueryBand
Select SubStr( RegExp_SubStr(QueryBand, '=attempt_[[:digit:]]+_[[:digit:]]+_.*[[:digit:]]+'), 10 ) MR_Task, SessionID, QueryID, min(StartTime), min(FirstStepTime), min(FirstRespTime), sum(NumResultRows), cast(sum(SpoolUsage) as bigint) SpoolUsage, sum(TotalIOCount), max(MaxIOAmpNumber) from DBC.DBQLogTblwhere StatementGroup not like 'Other' and NumResultRows > 0 and UserName = 'DataCopy' and CollectTimeStamp >= timestamp '2013-10-17 09:10:11' and QueryBand like '%attemp_id=attempt_201310161616_118033_%'group by 1,2,3order by 1, SessionID, QueryID;
* This feature does not work for internal-fastload yet
Track Mapper Session in DBQL
Copyright © Teradata 2013
• 1ms ~ 70ms network round trip time (traceroute) can be the indicator of suboptimal latency, which will significantly affect TDCH throughput• If the network just has bad latency without dropping many
packets, increase the TCP window buffer from its default value 64K to 6MB~8MB can improve TDCH performance• The result varies based the network and data set’s size
> When data set size for each mapper is small, no visible improvement is observed
> When data set size for each mapper is big, on a network with high latency, the TDCH throughput can improve 30% ~ 100% with big TCP window
> TCP window size has impact on both Import and Export jobs
(Import) jdbc:teradata://tdpid/database=DBC,TCP=RECEIVE8192000(Export) jdbc:teradata://tdpid/database=DBC,TCP=SEND8192000
Adjust TCP Send/Receive Window
Copyright © Teradata 2013
• Compression over the wire/network protocol> If JDBC and FASTLOAD session can compress records and then
transmit, the TDCH speed can be 3~10 times faster.> Otherwise, a pair of data import/export proxy agents can help to
buffer, consolidate and compress the network traffic
• Split-By-Partition enhancement> TDCH can create many partitions for stage table to avoid data
skew (e.g. # partitions = # AMPs)> But it can then effectively loop these partitions through 32 or 64
sessions without further consuming too much spool
• Avro format support with simple capability of mapping element/attribute from map/record/array… to columns• Auto map TD data types to RC & Avro primitive types• Easier way to use special chars as parameter in command• More meaningful error message for mapper failure• Better and granular performance trace and debug info
Wish List based on LinkedIn POC
Copyright © Teradata 2013
• Turn learning into TDCH enhancements > Data formats:
– Support Avro and Optimized RC > Metadata access:
– Use dictionary tables through views without extra privileges > Performance management:
– Instrument for fine-grain monitoring and efficient trouble shooting
• Start proof-of-concept work with SQL-H > Was in the original plan but ran out of time > Will start the SQL-H POC after release upgrade to TD 14.10
What is Next?
Copyright © Teradata 2013
• Many have contributed to this effort …
• LinkedIn: > Eric Sun, Jerry Wang, Mark Wagner, Mohammad Islam
• Teradata: > Bob Hahn, Zoom Han, Ariff Kassam, David Kunz, Ming Lei, Paul
Lett, Mark Li, Deron Ma, Hau Nguyen, Xu Sun, Darrick Sogabe, Rick Stellwagen, Todd Sylvester, Sherry Wang, Wenny Wang, Nick Xie, Dehui Zhang, …
Acknowledgement
Copyright © Teradata 2013
PARTNERS Mobile App
InfoHub Kiosks
teradata-partners.com
Email: [email protected] Email: [email protected]