MetaData ETL

最近整合ETL MetaData

大致分为4步

1 config 整合能根据配置信息来访问不同源和目标。方便维护时迁移和临时调整

2  业务逻辑整合所有的业务指标和逻辑在动态配置表里面实现。

3 column mapping 的实现 ETL 时column mapping是个头疼的事情。有新增加字段时每次都要人工去刷新mapping。能动态的根据源头来实现mapping

4 具有一定排错,对于常见错误能有一定的排错。比如snapshot延时或者创建失败时能根据错误信息做出相对措施。保证ETL的稳定。

1 先把配置自动化掉 用script 组件实现

 

代码
/*
Microsoft SQL Server Integration Services Script Task
Write scripts using Microsoft Visual C# 2008.
The ScriptMain is the entry point class of the script.
*/

using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.Data.SqlClient;


namespace ST_f2fae4c9cab446c588a765c3131803fe.csproj
{
[System.AddIn.AddIn(
"ScriptMain", Version = "1.0", Publisher = "", Description = "")]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{

#region VSTA generated code
enum ScriptResults
{
Success
= Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
Failure
= Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
};
#endregion

/*
The execution engine calls this method when the task executes.
To access the object model, use the Dts property. Connections, variables, events,
and logging features are available as members of the Dts property as shown in the following examples.

To reference a variable, call Dts.Variables["MyCaseSensitiveVariableName"].Value;
To post a log entry, call Dts.Log("This is my log text", 999, null);
To fire an event, call Dts.Events.FireInformation(99, "test", "hit the help message", "", 0, true);

To use the connections collection use something like the following:
ConnectionManager cm = Dts.Connections.Add("OLEDB");
cm.ConnectionString = "Data Source=localhost;Initial Catalog=AdventureWorks;Provider=SQLNCLI10;Integrated Security=SSPI;Auto Translate=False;";

Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.

To open Help, press F1.
*/

public void Main()
{
// TODO: Add your code here
#region MyRegion
//Dim sqlReader As SqlDataReader

//Dim sqlConn As New SqlClient.SqlConnection("Data Source=192.168.1.214;Initial Catalog=ETLMetaData;User ID=ETLBI_User;Password=123!@#abc")

//Dim cmd As New SqlCommand("SELECT Status,Db from dbo.etl_configure where id=143 ", sqlConn)

//sqlConn.Open()

//sqlReader = cmd.ExecuteReader()


//Do While sqlReader.Read
// Dts.Variables("Status").Value = sqlReader.Item("Status")
// Dts.Variables("DB").Value = sqlReader.Item("DB")
//Loop

//sqlConn.Close()

//MsgBox(Dts.Variables("Status").Value)

//MsgBox(Dts.Variables("DB").Value)
#endregion

// system packagename variabales
string PackageName = Dts.Variables["PackageName"].Value.ToString();
//
string OnlineServerIP="";
string OnlineMainDB="";
string BackServerIp="";
string BackDB="";
short Isenable;


//MessageBox.Show(Dts.Variables["PackageName"].Value.ToString());
//try
//{
System.Data.SqlClient.SqlConnection conn
= new System.Data.SqlClient.SqlConnection
(
"Data Source=192.168.1.214;Initial Catalog=ETLMetaData;User ID=ETLBI_User;Password=123!@#abc");

System.Data.SqlClient.SqlCommand cmd
= new System.Data.SqlClient.SqlCommand(
(
"SELECT Status,OnlineServerIP,OnlineMainDB,Tabname,BackServerIp,BackDB,Condition,Isenable,TargetServerIp,TargetDB,TargetTabname from dbo.ETL_MetaData where PackageName='"
+ PackageName + "'" ), conn);

conn.Open();

System.Data.SqlClient.SqlDataReader reader
= cmd.ExecuteReader();


while (reader.Read())
{

// get variables
Dts.Variables["Status"].Value = reader.GetInt32(0);
OnlineServerIP
= reader.GetString(1);
OnlineMainDB
= reader.GetString(2);
Dts.Variables[
"Tabname"].Value = reader.GetString(3);
BackServerIp
= reader.GetString(4);
BackDB
= reader.GetString(5);
Dts.Variables[
"Condition"].Value = reader.GetString(6);
Isenable
= reader.GetInt16(7);
Dts.Variables[
"TargetServerIp"].Value = reader.GetString(8);
Dts.Variables[
"TargetDB"].Value = reader.GetString(9);
Dts.Variables[
"TargetTabname"].Value = reader.GetString(10);

}

conn.Close();
cmd.Dispose();
// status =2
if (int.Parse(Dts.Variables["Status"].Value.ToString()) == 2)
{
Dts.Variables[
"ServerIp"].Value = OnlineServerIP;
Dts.Variables[
"DB"].Value = OnlineMainDB;
}

// status =1
if (int.Parse(Dts.Variables["Status"].Value.ToString()) == 1)
{
Dts.Variables[
"ServerIp"].Value = BackServerIp;
Dts.Variables[
"DB"].Value = BackDB;
}

//MessageBox.Show(Dts.Variables["Status"].Value.ToString());
//MessageBox.Show(Dts.Variables["DB"].Value.ToString());


//}
//catch (Exception ex)
//{
// Dts.TaskResult = (int)ScriptResults.Failure;

// throw;
//}

Dts.TaskResult
= (int)ScriptResults.Success;
}
}
}

 

作者: 徐郞顾 发表于 2010-12-13 17:09 原文链接

推荐.NET配套的通用数据层ORM框架:CYQ.Data 通用数据层框架
新浪微博粉丝精灵,刷粉丝、刷评论、刷转发、企业商家微博营销必备工具"